Updated Branches: refs/heads/master 56a9ccbed -> d9a968691
Fix WAVE-399 with Diff r2 Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/cf5a4e50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/cf5a4e50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/cf5a4e50 Branch: refs/heads/master Commit: cf5a4e5016f48b07fd18a179dbbea99c9fb04958 Parents: 56a9ccb Author: pablojan <pablojan@[email protected]> Authored: Wed Jan 15 17:33:29 2014 +0100 Committer: Yuri Zelikov <[email protected]> Committed: Fri Jan 17 22:36:12 2014 +0200 ---------------------------------------------------------------------- .../server/persistence/PersistenceModule.java | 3 + .../mongodb/MongoDbDeltaCollection.java | 222 +++++++++++++ .../persistence/mongodb/MongoDbDeltaStore.java | 147 +++++++++ .../mongodb/MongoDbDeltaStoreUtil.java | 309 +++++++++++++++++++ .../persistence/mongodb/MongoDbProvider.java | 25 +- third_party/runtime/mongo-driver/mongo-2.1.jar | Bin 210296 -> 0 bytes .../mongo-driver/mongo-java-driver-2.11.2.jar | Bin 0 -> 417896 bytes .../mongo-java-driver-r2.1-0-src.zip | Bin 996112 -> 0 bytes 8 files changed, 704 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/PersistenceModule.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java b/src/org/waveprotocol/box/server/persistence/PersistenceModule.java index a430570..14c6453 100644 --- a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java +++ b/src/org/waveprotocol/box/server/persistence/PersistenceModule.java @@ -139,6 +139,9 @@ public class PersistenceModule extends AbstractModule { bind(DeltaStore.class).to(MemoryDeltaStore.class).in(Singleton.class); } else if (deltaStoreType.equalsIgnoreCase("file")) { bind(DeltaStore.class).to(FileDeltaStore.class).in(Singleton.class); + } else if (deltaStoreType.equalsIgnoreCase("mongodb")) { + MongoDbProvider mongoDbProvider = getMongoDbProvider(); + bind(DeltaStore.class).toInstance(mongoDbProvider.provideMongoDbDeltaStore()); } else { throw new RuntimeException("Invalid delta store type: '" + deltaStoreType + "'"); } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java new file mode 100644 index 0000000..d61bea2 --- /dev/null +++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.persistence.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.box.server.waveserver.ByteStringMessage; +import org.waveprotocol.box.server.waveserver.DeltaStore; +import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; +import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; +import org.waveprotocol.wave.model.id.WaveletName; +import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; +import org.waveprotocol.wave.model.version.HashedVersion; + +import java.io.IOException; +import java.util.Collection; + +/** + * A MongoDB based Delta Access implementation using a simple <b>deltas</b> + * collection, storing a delta record per each MongoDb document. + * + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { + + /** Wavelet name to work with. */ + private final WaveletName waveletName; + + /** MongoDB Collection object for delta storage */ + private final DBCollection deltaDbCollection; + + + /** + * Construct a new Delta Access object for the wavelet + * + * @param waveletName The wavelet name. + * @param deltaDbCollection The MongoDB deltas collection + */ + public MongoDbDeltaCollection(WaveletName waveletName, DBCollection deltaDbCollection) { + this.waveletName = waveletName; + this.deltaDbCollection = deltaDbCollection; + } + + @Override + public WaveletName getWaveletName() { + + return waveletName; + } + + /** + * Create a new DBObject for a common query to select this wavelet + * + * @return DBObject query + */ + protected DBObject createWaveletDBQuery() { + + DBObject query = new BasicDBObject(); + query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveletName.waveId.serialise()); + query.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, waveletName.waveletId.serialise()); + + + return query; + } + + @Override + public boolean isEmpty() { + + return deltaDbCollection.count(createWaveletDBQuery()) == 0; + + } + + + @Override + public HashedVersion getEndVersion() { + + // Search the max of delta.getTransformedDelta().getResultingVersion() + + DBObject query = createWaveletDBQuery(); + + DBObject sort = new BasicDBObject(); + sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, -1); // Descending + + + DBObject field = new BasicDBObject(); + field.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION, 1); + + DBObject result = deltaDbCollection.findOne(query, field, sort); + + return result != null ? MongoDbDeltaStoreUtil + .deserializeHashedVersion((DBObject) ((DBObject) result + .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED)) + .get(MongoDbDeltaStoreUtil.FIELD_RESULTINGVERSION)) : null; + } + + @Override + public WaveletDeltaRecord getDelta(long version) throws IOException { + + + DBObject query = createWaveletDBQuery(); + query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); + + DBObject result = deltaDbCollection.findOne(query); + + WaveletDeltaRecord waveletDelta = null; + + if (result != null) try { + waveletDelta = MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result); + } catch (PersistenceException e) { + throw new IOException(e); + } + + return waveletDelta; + } + + @Override + public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { + + DBObject query = createWaveletDBQuery(); + query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, version); + + DBObject result = deltaDbCollection.findOne(query); + + WaveletDeltaRecord waveletDelta = null; + + if (result != null) + + try { + MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result); + } catch (PersistenceException e) { + throw new IOException(e); + } + + return waveletDelta; + } + + @Override + public HashedVersion getAppliedAtVersion(long version) throws IOException { + + DBObject query = createWaveletDBQuery(); + query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); + + DBObject result = deltaDbCollection.findOne(query); + + if (result != null) + return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result + .get(MongoDbDeltaStoreUtil.FIELD_APPLIEDATVERSION)); + + return null; + + } + + @Override + public HashedVersion getResultingVersion(long version) throws IOException { + + + DBObject query = createWaveletDBQuery(); + query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); + + DBObject result = deltaDbCollection.findOne(query); + + if (result != null) + return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result + .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION)); + + return null; + } + + @Override + public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) + throws IOException { + + WaveletDeltaRecord delta = getDelta(version); + return (delta != null) ? delta.getAppliedDelta() : null; + + } + + @Override + public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { + + WaveletDeltaRecord delta = getDelta(version); + return (delta != null) ? delta.getTransformedDelta() : null; + + } + + @Override + public void close() throws IOException { + // Does nothing. + } + + @Override + public void append(Collection<WaveletDeltaRecord> newDeltas) throws PersistenceException { + + for (WaveletDeltaRecord delta : newDeltas) { + + deltaDbCollection.insert(MongoDbDeltaStoreUtil.serialize(delta, + waveletName.waveId.serialise(), waveletName.waveletId.serialise())); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java new file mode 100644 index 0000000..9106886 --- /dev/null +++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.persistence.mongodb; + +import com.google.common.collect.ImmutableSet; + +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoException; + +import org.waveprotocol.box.common.ExceptionalIterator; +import org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException; +import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.box.server.waveserver.DeltaStore; +import org.waveprotocol.wave.model.id.WaveId; +import org.waveprotocol.wave.model.id.WaveletId; +import org.waveprotocol.wave.model.id.WaveletName; + +import java.util.List; + +/** + * A MongoDB based Delta Store implementation using a simple <b>deltas</b> + * collection, storing a delta record per each MongoDb document. + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public class MongoDbDeltaStore implements DeltaStore { + + /** Name of the MongoDB collection to store Deltas */ + private static final String DELTAS_COLLECTION = "deltas"; + + /** Database connection object */ + private final DB database; + + /** + * Construct a new store + * + * @param database the database connection object + */ + public MongoDbDeltaStore(DB database) { + this.database = database; + } + + @Override + public DeltasAccess open(WaveletName waveletName) throws PersistenceException { + + return new MongoDbDeltaCollection(waveletName, getDeltaDbCollection()); + } + + @Override + public void delete(WaveletName waveletName) throws PersistenceException, + FileNotFoundPersistenceException { + + DBObject criteria = new BasicDBObject(); + criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveletName.waveId.serialise()); + criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, waveletName.waveletId.serialise()); + + try { + getDeltaDbCollection().remove(criteria); + } catch (MongoException e) { + throw new PersistenceException(e); + } + } + + @Override + public ImmutableSet<WaveletId> lookup(WaveId waveId) throws PersistenceException { + + + DBObject query = new BasicDBObject(); + query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveId.serialise()); + + DBObject projection = new BasicDBObject(); + projection.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, 1); + + DBCursor cursor = null; + + try { + cursor = getDeltaDbCollection().find(query, projection); + } catch (MongoException e) { + throw new PersistenceException(e); + } + + + if (cursor == null || !cursor.hasNext()) { + return ImmutableSet.of(); + } else { + ImmutableSet.Builder<WaveletId> builder = ImmutableSet.builder(); + for (DBObject waveletIdDBObject : cursor) { + builder.add(WaveletId.deserialise((String) waveletIdDBObject + .get(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID))); + } + return builder.build(); + } + } + + @Override + public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() + throws PersistenceException { + + ImmutableSet.Builder<WaveId> builder = ImmutableSet.builder(); + + try { + + @SuppressWarnings("rawtypes") + List results = getDeltaDbCollection().distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID); + + for (Object o : results) + builder.add(WaveId.deserialise((String) o)); + + } catch (MongoException e) { + throw new PersistenceException(e); + } + + + return ExceptionalIterator.FromIterator.create(builder.build().iterator()); + } + + /** + * Access to deltas collection + * + * @return DBCollection of deltas + */ + private DBCollection getDeltaDbCollection() { + return database.getCollection(DELTAS_COLLECTION); + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java new file mode 100644 index 0000000..d603d14 --- /dev/null +++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.persistence.mongodb; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; + +import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; +import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.box.server.waveserver.ByteStringMessage; +import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; +import org.waveprotocol.wave.federation.Proto.ProtocolDocumentOperation; +import org.waveprotocol.wave.model.document.operation.DocOp; +import org.waveprotocol.wave.model.operation.wave.AddParticipant; +import org.waveprotocol.wave.model.operation.wave.BlipContentOperation; +import org.waveprotocol.wave.model.operation.wave.BlipOperation; +import org.waveprotocol.wave.model.operation.wave.NoOp; +import org.waveprotocol.wave.model.operation.wave.RemoveParticipant; +import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; +import org.waveprotocol.wave.model.operation.wave.WaveletBlipOperation; +import org.waveprotocol.wave.model.operation.wave.WaveletOperation; +import org.waveprotocol.wave.model.operation.wave.WaveletOperationContext; +import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.ParticipantId; + +/** + * A utility class to serialize/deserialize delta objects to/from MongoDB. + * The implementation approach is analog to the provided at {@link CoreWaveletOperationSerializer} + * and {@link ProtoDeltaStoreDataSerializer} + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public class MongoDbDeltaStoreUtil { + + + public static final String WAVELET_OP_WAVELET_BLIP_OPERATION = "WaveletBlipOperation"; + public static final String WAVELET_OP_REMOVE_PARTICIPANT = "RemoveParticipant"; + public static final String WAVELET_OP_ADD_PARTICIPANT = "AddParticipant"; + public static final String WAVELET_OP_NOOP = "NoOp"; + public static final String FIELD_BYTES = "bytes"; + public static final String FIELD_CONTENTOP = "contentop"; + public static final String FIELD_BLIPOP = "blipop"; + public static final String FIELD_BLIPID = "blipid"; + public static final String FIELD_PARTICIPANT = "participant"; + public static final String FIELD_TYPE = "type"; + public static final String FIELD_OPS = "ops"; + public static final String FIELD_APPLICATIONTIMESTAMP = "applicationtimestamp"; + public static final String FIELD_AUTHOR = "author"; + public static final String FIELD_ADDRESS = "address"; + public static final String FIELD_HISTORYHASH = "historyhash"; + public static final String FIELD_VERSION = "version"; + public static final String FIELD_TRANSFORMED_RESULTINGVERSION_VERSION = "transformed.resultingversion.version"; + public static final String FIELD_TRANSFORMED_APPLIEDATVERSION = "transformed.appliedatversion"; + public static final String FIELD_TRANSFORMED_RESULTINGVERSION = "transformed.resultingversion"; + public static final String FIELD_APPLIEDATVERSION = "appliedatversion"; + public static final String FIELD_RESULTINGVERSION = "resultingversion"; + public static final String FIELD_TRANSFORMED = "transformed"; + public static final String FIELD_APPLIED = "applied"; + public static final String FIELD_WAVELET_ID = "waveletid"; + public static final String FIELD_WAVE_ID = "waveid"; + + + + public static DBObject serialize(WaveletDeltaRecord waveletDelta, String waveId, String waveletId) { + + BasicDBObject _waveletDelta = new BasicDBObject(); + + // + _waveletDelta.append(FIELD_WAVE_ID, waveId); + _waveletDelta.append(FIELD_WAVELET_ID, waveletId); + + _waveletDelta.append(FIELD_APPLIEDATVERSION, serialize(waveletDelta.getAppliedAtVersion())); + _waveletDelta.append(FIELD_APPLIED, waveletDelta.getAppliedDelta().getByteArray()); + _waveletDelta.append(FIELD_TRANSFORMED, serialize(waveletDelta.getTransformedDelta())); + + return _waveletDelta; + } + + + public static DBObject serialize(HashedVersion hashedVersion) { + + BasicDBObject _hashedVersion = new BasicDBObject(); + _hashedVersion.append(FIELD_VERSION, hashedVersion.getVersion()); + _hashedVersion.append(FIELD_HISTORYHASH, hashedVersion.getHistoryHash()); + + return _hashedVersion; + + } + + + public static DBObject serialize(ParticipantId participantId) { + + BasicDBObject _participantId = new BasicDBObject(); + _participantId.append(FIELD_ADDRESS, participantId.getAddress()); + + return _participantId; + + } + + public static DBObject serialize(TransformedWaveletDelta transformedWaveletDelta) { + + BasicDBObject _transformedWaveletDelta = new BasicDBObject(); + _transformedWaveletDelta.append(FIELD_AUTHOR, serialize(transformedWaveletDelta.getAuthor())); + _transformedWaveletDelta.append(FIELD_RESULTINGVERSION, serialize(transformedWaveletDelta.getResultingVersion())); + _transformedWaveletDelta.append(FIELD_APPLICATIONTIMESTAMP, transformedWaveletDelta.getApplicationTimestamp()); + + // Calculated value to provide DB implementation of MongoDBDeltaCollection.getDelta(long version) + _transformedWaveletDelta.append(FIELD_APPLIEDATVERSION, transformedWaveletDelta.getAppliedAtVersion()); + + BasicDBList _waveletOperations = new BasicDBList(); + + for (WaveletOperation op: transformedWaveletDelta) { + _waveletOperations.add(serialize(op)); + } + + _transformedWaveletDelta.append(FIELD_OPS, _waveletOperations); + + return _transformedWaveletDelta; + + } + + + public static DBObject serialize(WaveletOperation waveletOp) { + + final BasicDBObject _op = new BasicDBObject(); + + + if (waveletOp instanceof NoOp) { + _op.append(FIELD_TYPE, WAVELET_OP_NOOP); + + } else if (waveletOp instanceof AddParticipant) { + _op.append(FIELD_TYPE, WAVELET_OP_ADD_PARTICIPANT); + _op.append(FIELD_PARTICIPANT, serialize(((AddParticipant) waveletOp).getParticipantId())); + + } else if (waveletOp instanceof RemoveParticipant) { + + _op.append(FIELD_TYPE, WAVELET_OP_REMOVE_PARTICIPANT); + _op.append(FIELD_PARTICIPANT, serialize(((RemoveParticipant) waveletOp).getParticipantId())); + + } else if (waveletOp instanceof WaveletBlipOperation) { + + final WaveletBlipOperation waveletBlipOp = (WaveletBlipOperation) waveletOp; + + _op.append(FIELD_TYPE, WAVELET_OP_WAVELET_BLIP_OPERATION); + _op.append(FIELD_BLIPID, waveletBlipOp.getBlipId()); + + if (waveletBlipOp.getBlipOp() instanceof BlipContentOperation) { + + _op.append(FIELD_BLIPOP, serialize((BlipContentOperation) waveletBlipOp.getBlipOp())); + + } else { + + throw new IllegalArgumentException("Unsupported blip operation: " + waveletBlipOp.getBlipOp()); + } + + } else { + + throw new IllegalArgumentException("Unsupported wavelet operation: " + waveletOp); + } + + return _op; + } + + + + public static DBObject serialize(BlipContentOperation blipContentOp) { + + BasicDBObject _blipContentOp = new BasicDBObject(); + _blipContentOp.append(FIELD_CONTENTOP, serialize(blipContentOp.getContentOp())); + return _blipContentOp; + } + + + public static DBObject serialize(DocOp docOp) { + + // This method relays on the provided CoreWaveletOperationSerializer, + // because of complexity of serializing DocOp's + BasicDBObject _docOp = new BasicDBObject(); + _docOp.append(FIELD_BYTES, CoreWaveletOperationSerializer.serialize(docOp).toByteArray()); + return _docOp; + } + + + + public static WaveletDeltaRecord deserializeWaveletDeltaRecord(DBObject dbObject) throws PersistenceException { + + try { + + return new WaveletDeltaRecord( + deserializeHashedVersion((DBObject) dbObject.get(FIELD_APPLIEDATVERSION)), + ByteStringMessage.parseProtocolAppliedWaveletDelta(ByteString.copyFrom((byte[]) dbObject.get(FIELD_APPLIED))), + deserializeTransformedWaveletDelta((DBObject) dbObject.get(FIELD_TRANSFORMED))); + + } catch (InvalidProtocolBufferException e) { + + throw new PersistenceException(e); + } + + } + + public static HashedVersion deserializeHashedVersion(DBObject dbObject) { + + return HashedVersion.of((Long) dbObject.get(FIELD_VERSION), (byte[]) dbObject.get(FIELD_HISTORYHASH)); + } + + + public static ParticipantId deserializeParicipantId(DBObject dbObject) { + + return ParticipantId.ofUnsafe((String) dbObject.get(FIELD_ADDRESS)); + } + + + public static TransformedWaveletDelta deserializeTransformedWaveletDelta(DBObject dbObject) throws PersistenceException { + + ParticipantId author = deserializeParicipantId((DBObject) dbObject.get(FIELD_AUTHOR)); + HashedVersion resultingVersion = deserializeHashedVersion((DBObject) dbObject.get(FIELD_RESULTINGVERSION)); + long applicationTimestamp = (Long) dbObject.get(FIELD_APPLICATIONTIMESTAMP); + + BasicDBList dbOps = (BasicDBList) dbObject.get(FIELD_OPS); + ImmutableList.Builder<WaveletOperation> operations = ImmutableList.builder(); + + int numOperations = dbOps.size(); + + // Code analog to ProtoDeltaStoreDataSerializer.deserialize + for (int i = 0; i < numOperations; i++) { + + WaveletOperationContext context; + if (i == numOperations - 1) { + context = new WaveletOperationContext(author, applicationTimestamp, 1, resultingVersion); + } else { + context = new WaveletOperationContext(author, applicationTimestamp, 1); + } + operations.add(deserializeWaveletOperation((DBObject) dbOps.get(i), context)); + } + + + return new TransformedWaveletDelta(author, resultingVersion, applicationTimestamp, operations.build()); + + } + + public static WaveletOperation deserializeWaveletOperation(DBObject dbObject, WaveletOperationContext context) throws PersistenceException { + + + String type = (String) dbObject.get(FIELD_TYPE); + + if (type.equals(WAVELET_OP_NOOP)) { + return new NoOp(context); + } else if (type.equals(WAVELET_OP_ADD_PARTICIPANT)) { + return new AddParticipant(context, deserializeParicipantId((DBObject) dbObject.get(FIELD_PARTICIPANT))); + } else if (type.equals(WAVELET_OP_REMOVE_PARTICIPANT)) { + return new RemoveParticipant(context, deserializeParicipantId((DBObject) dbObject.get(FIELD_PARTICIPANT))); + } else if (type.equals(WAVELET_OP_WAVELET_BLIP_OPERATION)) { + + return new WaveletBlipOperation((String) dbObject.get(FIELD_BLIPID), + deserializeBlipContentOperation((DBObject) dbObject.get(FIELD_BLIPOP), context)); + + + } else { + throw new IllegalArgumentException("Unsupported operation: " + type); + } + + } + + + public static BlipOperation deserializeBlipContentOperation(DBObject dbObject, + WaveletOperationContext context) throws PersistenceException { + + return new BlipContentOperation(context, deserializeDocOp((DBObject) dbObject.get(FIELD_CONTENTOP))); + } + + + private static DocOp deserializeDocOp(DBObject dbObject) throws PersistenceException { + + + try { + + return CoreWaveletOperationSerializer.deserialize(ProtocolDocumentOperation.parseFrom(((byte[]) dbObject.get(FIELD_BYTES)))); + + } catch (InvalidProtocolBufferException e) { + throw new PersistenceException(e); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java index 6f5170f..4136258 100644 --- a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java +++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.mongodb.DB; import com.mongodb.Mongo; +import com.mongodb.MongoClient; import com.mongodb.MongoException; import org.waveprotocol.box.server.persistence.PersistenceStartException; @@ -56,7 +57,7 @@ public class MongoDbProvider { private static final String DATABASE_NAME_PROPERTY = "mongoDbDatabase"; /** - * Our {@link Mongo} instance, should be accessed by getMongo unless during + * Our {@link MongoClient} instance, should be accessed by getMongo unless during * start(). */ private Mongo mongo; @@ -71,6 +72,11 @@ public class MongoDbProvider { */ private MongoDbStore mongoDbStore; + /** + * Separated store for Deltas {@link MongoDbDeltaStore} + */ + private MongoDbDeltaStore mongoDbDeltaStore; + /** Stores whether we have successfully setup a live {@link Mongo} instance. */ private boolean isRunning; @@ -94,7 +100,8 @@ public class MongoDbProvider { String host = properties.getProperty(HOST_PROPERTY); int port = Integer.parseInt(properties.getProperty(PORT_PROPERTY)); try { - mongo = new Mongo(host, port); + // New MongoDB Client, see http://docs.mongodb.org/manual/release-notes/drivers-write-concern/ + mongo = new MongoClient(host, port); } catch (UnknownHostException e) { throw new PersistenceStartException("Unable to resolve the MongoDb hostname", e); } @@ -182,4 +189,18 @@ public class MongoDbProvider { } return mongoDbStore; } + + /** + * Returns a {@link MongoDbDeltaStore} instance created from the settings in this + * provider. + */ + public MongoDbDeltaStore provideMongoDbDeltaStore() { + if (mongoDbDeltaStore == null) { + mongoDbDeltaStore = new MongoDbDeltaStore(getDatabase()); + } + + return mongoDbDeltaStore; + + } + } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-2.1.jar ---------------------------------------------------------------------- diff --git a/third_party/runtime/mongo-driver/mongo-2.1.jar b/third_party/runtime/mongo-driver/mongo-2.1.jar deleted file mode 100644 index 0936004..0000000 Binary files a/third_party/runtime/mongo-driver/mongo-2.1.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar ---------------------------------------------------------------------- diff --git a/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar b/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar new file mode 100644 index 0000000..c075289 Binary files /dev/null and b/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar differ http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip ---------------------------------------------------------------------- diff --git a/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip b/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip deleted file mode 100644 index 41f4a34..0000000 Binary files a/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip and /dev/null differ
