Updated Branches:
  refs/heads/master 56a9ccbed -> d9a968691

Fix WAVE-399 with Diff r2


Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/cf5a4e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/cf5a4e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/cf5a4e50

Branch: refs/heads/master
Commit: cf5a4e5016f48b07fd18a179dbbea99c9fb04958
Parents: 56a9ccb
Author: pablojan <pablojan@[email protected]>
Authored: Wed Jan 15 17:33:29 2014 +0100
Committer: Yuri Zelikov <[email protected]>
Committed: Fri Jan 17 22:36:12 2014 +0200

----------------------------------------------------------------------
 .../server/persistence/PersistenceModule.java   |   3 +
 .../mongodb/MongoDbDeltaCollection.java         | 222 +++++++++++++
 .../persistence/mongodb/MongoDbDeltaStore.java  | 147 +++++++++
 .../mongodb/MongoDbDeltaStoreUtil.java          | 309 +++++++++++++++++++
 .../persistence/mongodb/MongoDbProvider.java    |  25 +-
 third_party/runtime/mongo-driver/mongo-2.1.jar  | Bin 210296 -> 0 bytes
 .../mongo-driver/mongo-java-driver-2.11.2.jar   | Bin 0 -> 417896 bytes
 .../mongo-java-driver-r2.1-0-src.zip            | Bin 996112 -> 0 bytes
 8 files changed, 704 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/PersistenceModule.java
----------------------------------------------------------------------
diff --git a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java 
b/src/org/waveprotocol/box/server/persistence/PersistenceModule.java
index a430570..14c6453 100644
--- a/src/org/waveprotocol/box/server/persistence/PersistenceModule.java
+++ b/src/org/waveprotocol/box/server/persistence/PersistenceModule.java
@@ -139,6 +139,9 @@ public class PersistenceModule extends AbstractModule {
       bind(DeltaStore.class).to(MemoryDeltaStore.class).in(Singleton.class);
     } else if (deltaStoreType.equalsIgnoreCase("file")) {
       bind(DeltaStore.class).to(FileDeltaStore.class).in(Singleton.class);
+    } else if (deltaStoreType.equalsIgnoreCase("mongodb")) {
+      MongoDbProvider mongoDbProvider = getMongoDbProvider();
+      
bind(DeltaStore.class).toInstance(mongoDbProvider.provideMongoDbDeltaStore());
     } else {
       throw new RuntimeException("Invalid delta store type: '" + 
deltaStoreType + "'");
     }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
----------------------------------------------------------------------
diff --git 
a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
new file mode 100644
index 0000000..d61bea2
--- /dev/null
+++ 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.waveprotocol.box.server.persistence.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.waveserver.ByteStringMessage;
+import org.waveprotocol.box.server.waveserver.DeltaStore;
+import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord;
+import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
+import org.waveprotocol.wave.model.id.WaveletName;
+import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
+import org.waveprotocol.wave.model.version.HashedVersion;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A MongoDB based Delta Access implementation using a simple <b>deltas</b>
+ * collection, storing a delta record per each MongoDb document.
+ * 
+ * 
+ * @author [email protected] (Pablo Ojanguren)
+ * 
+ */
+public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess {
+
+  /** Wavelet name to work with. */
+  private final WaveletName waveletName;
+
+  /** MongoDB Collection object for delta storage */
+  private final DBCollection deltaDbCollection;
+
+
+  /**
+   * Construct a new Delta Access object for the wavelet
+   * 
+   * @param waveletName The wavelet name.
+   * @param deltaDbCollection The MongoDB deltas collection
+   */
+  public MongoDbDeltaCollection(WaveletName waveletName, DBCollection 
deltaDbCollection) {
+    this.waveletName = waveletName;
+    this.deltaDbCollection = deltaDbCollection;
+  }
+
+  @Override
+  public WaveletName getWaveletName() {
+
+    return waveletName;
+  }
+
+  /**
+   * Create a new DBObject for a common query to select this wavelet
+   * 
+   * @return DBObject query
+   */
+  protected DBObject createWaveletDBQuery() {
+
+    DBObject query = new BasicDBObject();
+    query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, 
waveletName.waveId.serialise());
+    query.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, 
waveletName.waveletId.serialise());
+
+
+    return query;
+  }
+
+  @Override
+  public boolean isEmpty() {
+
+    return deltaDbCollection.count(createWaveletDBQuery()) == 0;
+
+  }
+
+
+  @Override
+  public HashedVersion getEndVersion() {
+
+    // Search the max of delta.getTransformedDelta().getResultingVersion()
+
+    DBObject query = createWaveletDBQuery();
+
+    DBObject sort = new BasicDBObject();
+    sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 
-1); // Descending
+
+
+    DBObject field = new BasicDBObject();
+    field.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION, 1);
+
+    DBObject result = deltaDbCollection.findOne(query, field, sort);
+
+    return result != null ? MongoDbDeltaStoreUtil
+        .deserializeHashedVersion((DBObject) ((DBObject) result
+            .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED))
+            .get(MongoDbDeltaStoreUtil.FIELD_RESULTINGVERSION)) : null;
+  }
+
+  @Override
+  public WaveletDeltaRecord getDelta(long version) throws IOException {
+
+
+    DBObject query = createWaveletDBQuery();
+    query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
+
+    DBObject result = deltaDbCollection.findOne(query);
+
+    WaveletDeltaRecord waveletDelta = null;
+
+    if (result != null) try {
+      waveletDelta = 
MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result);
+    } catch (PersistenceException e) {
+      throw new IOException(e);
+    }
+
+    return waveletDelta;
+  }
+
+  @Override
+  public WaveletDeltaRecord getDeltaByEndVersion(long version) throws 
IOException {
+
+    DBObject query = createWaveletDBQuery();
+    
query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 
version);
+
+    DBObject result = deltaDbCollection.findOne(query);
+
+    WaveletDeltaRecord waveletDelta = null;
+
+    if (result != null)
+
+    try {
+      MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result);
+    } catch (PersistenceException e) {
+      throw new IOException(e);
+    }
+
+    return waveletDelta;
+  }
+
+  @Override
+  public HashedVersion getAppliedAtVersion(long version) throws IOException {
+
+    DBObject query = createWaveletDBQuery();
+    query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
+
+    DBObject result = deltaDbCollection.findOne(query);
+
+    if (result != null)
+      return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result
+          .get(MongoDbDeltaStoreUtil.FIELD_APPLIEDATVERSION));
+
+    return null;
+
+  }
+
+  @Override
+  public HashedVersion getResultingVersion(long version) throws IOException {
+
+
+    DBObject query = createWaveletDBQuery();
+    query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
+
+    DBObject result = deltaDbCollection.findOne(query);
+
+    if (result != null)
+      return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result
+          .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION));
+
+    return null;
+  }
+
+  @Override
+  public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long 
version)
+      throws IOException {
+
+    WaveletDeltaRecord delta = getDelta(version);
+    return (delta != null) ? delta.getAppliedDelta() : null;
+
+  }
+
+  @Override
+  public TransformedWaveletDelta getTransformedDelta(long version) throws 
IOException {
+
+    WaveletDeltaRecord delta = getDelta(version);
+    return (delta != null) ? delta.getTransformedDelta() : null;
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Does nothing.
+  }
+
+  @Override
+  public void append(Collection<WaveletDeltaRecord> newDeltas) throws 
PersistenceException {
+
+    for (WaveletDeltaRecord delta : newDeltas) {
+
+      deltaDbCollection.insert(MongoDbDeltaStoreUtil.serialize(delta,
+          waveletName.waveId.serialise(), waveletName.waveletId.serialise()));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
----------------------------------------------------------------------
diff --git 
a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
new file mode 100644
index 0000000..9106886
--- /dev/null
+++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.waveprotocol.box.server.persistence.mongodb;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException;
+
+import org.waveprotocol.box.common.ExceptionalIterator;
+import 
org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException;
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.waveserver.DeltaStore;
+import org.waveprotocol.wave.model.id.WaveId;
+import org.waveprotocol.wave.model.id.WaveletId;
+import org.waveprotocol.wave.model.id.WaveletName;
+
+import java.util.List;
+
+/**
+ * A MongoDB based Delta Store implementation using a simple <b>deltas</b>
+ * collection, storing a delta record per each MongoDb document.
+ * 
+ * @author [email protected] (Pablo Ojanguren)
+ * 
+ */
+public class MongoDbDeltaStore implements DeltaStore {
+
+  /** Name of the MongoDB collection to store Deltas */
+  private static final String DELTAS_COLLECTION = "deltas";
+
+  /** Database connection object */
+  private final DB database;
+
+  /**
+   * Construct a new store
+   * 
+   * @param database the database connection object
+   */
+  public MongoDbDeltaStore(DB database) {
+    this.database = database;
+  }
+
+  @Override
+  public DeltasAccess open(WaveletName waveletName) throws 
PersistenceException {
+
+    return new MongoDbDeltaCollection(waveletName, getDeltaDbCollection());
+  }
+
+  @Override
+  public void delete(WaveletName waveletName) throws PersistenceException,
+      FileNotFoundPersistenceException {
+
+    DBObject criteria = new BasicDBObject();
+    criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, 
waveletName.waveId.serialise());
+    criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, 
waveletName.waveletId.serialise());
+
+    try {
+      getDeltaDbCollection().remove(criteria);
+    } catch (MongoException e) {
+      throw new PersistenceException(e);
+    }
+  }
+
+  @Override
+  public ImmutableSet<WaveletId> lookup(WaveId waveId) throws 
PersistenceException {
+
+
+    DBObject query = new BasicDBObject();
+    query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveId.serialise());
+
+    DBObject projection = new BasicDBObject();
+    projection.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, 1);
+
+    DBCursor cursor = null;
+
+    try {
+      cursor = getDeltaDbCollection().find(query, projection);
+    } catch (MongoException e) {
+      throw new PersistenceException(e);
+    }
+
+
+    if (cursor == null || !cursor.hasNext()) {
+      return ImmutableSet.of();
+    } else {
+      ImmutableSet.Builder<WaveletId> builder = ImmutableSet.builder();
+      for (DBObject waveletIdDBObject : cursor) {
+        builder.add(WaveletId.deserialise((String) waveletIdDBObject
+            .get(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID)));
+      }
+      return builder.build();
+    }
+  }
+
+  @Override
+  public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator()
+      throws PersistenceException {
+
+    ImmutableSet.Builder<WaveId> builder = ImmutableSet.builder();
+
+    try {
+
+      @SuppressWarnings("rawtypes")
+      List results = 
getDeltaDbCollection().distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID);
+
+      for (Object o : results)
+        builder.add(WaveId.deserialise((String) o));
+
+    } catch (MongoException e) {
+      throw new PersistenceException(e);
+    }
+
+
+    return ExceptionalIterator.FromIterator.create(builder.build().iterator());
+  }
+
+  /**
+   * Access to deltas collection
+   * 
+   * @return DBCollection of deltas
+   */
+  private DBCollection getDeltaDbCollection() {
+    return database.getCollection(DELTAS_COLLECTION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java
----------------------------------------------------------------------
diff --git 
a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java
 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java
new file mode 100644
index 0000000..d603d14
--- /dev/null
+++ 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStoreUtil.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.waveprotocol.box.server.persistence.mongodb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import com.mongodb.BasicDBList;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+
+import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer;
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.waveserver.ByteStringMessage;
+import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord;
+import org.waveprotocol.wave.federation.Proto.ProtocolDocumentOperation;
+import org.waveprotocol.wave.model.document.operation.DocOp;
+import org.waveprotocol.wave.model.operation.wave.AddParticipant;
+import org.waveprotocol.wave.model.operation.wave.BlipContentOperation;
+import org.waveprotocol.wave.model.operation.wave.BlipOperation;
+import org.waveprotocol.wave.model.operation.wave.NoOp;
+import org.waveprotocol.wave.model.operation.wave.RemoveParticipant;
+import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
+import org.waveprotocol.wave.model.operation.wave.WaveletBlipOperation;
+import org.waveprotocol.wave.model.operation.wave.WaveletOperation;
+import org.waveprotocol.wave.model.operation.wave.WaveletOperationContext;
+import org.waveprotocol.wave.model.version.HashedVersion;
+import org.waveprotocol.wave.model.wave.ParticipantId;
+
+/**
+ * A utility class to serialize/deserialize delta objects to/from MongoDB.
+ * The implementation approach is analog to the provided at {@link 
CoreWaveletOperationSerializer}
+ * and {@link ProtoDeltaStoreDataSerializer}
+ * 
+ * @author [email protected] (Pablo Ojanguren)
+ *
+ */
+public class MongoDbDeltaStoreUtil {
+
+  
+  public static final String WAVELET_OP_WAVELET_BLIP_OPERATION = 
"WaveletBlipOperation";
+  public static final String WAVELET_OP_REMOVE_PARTICIPANT = 
"RemoveParticipant";
+  public static final String WAVELET_OP_ADD_PARTICIPANT = "AddParticipant";
+  public static final String WAVELET_OP_NOOP = "NoOp";
+  public static final String FIELD_BYTES = "bytes";
+  public static final String FIELD_CONTENTOP = "contentop";
+  public static final String FIELD_BLIPOP = "blipop";
+  public static final String FIELD_BLIPID = "blipid";
+  public static final String FIELD_PARTICIPANT = "participant";
+  public static final String FIELD_TYPE = "type";
+  public static final String FIELD_OPS = "ops";
+  public static final String FIELD_APPLICATIONTIMESTAMP = 
"applicationtimestamp";
+  public static final String FIELD_AUTHOR = "author";
+  public static final String FIELD_ADDRESS = "address";
+  public static final String FIELD_HISTORYHASH = "historyhash";
+  public static final String FIELD_VERSION = "version";
+  public static final String FIELD_TRANSFORMED_RESULTINGVERSION_VERSION = 
"transformed.resultingversion.version";
+  public static final String FIELD_TRANSFORMED_APPLIEDATVERSION = 
"transformed.appliedatversion";
+  public static final String FIELD_TRANSFORMED_RESULTINGVERSION = 
"transformed.resultingversion";
+  public static final String FIELD_APPLIEDATVERSION = "appliedatversion";
+  public static final String FIELD_RESULTINGVERSION = "resultingversion";
+  public static final String FIELD_TRANSFORMED = "transformed";
+  public static final String FIELD_APPLIED = "applied";
+  public static final String FIELD_WAVELET_ID = "waveletid";
+  public static final String FIELD_WAVE_ID = "waveid";  
+  
+
+
+  public static DBObject serialize(WaveletDeltaRecord waveletDelta, String 
waveId, String waveletId) {
+    
+    BasicDBObject _waveletDelta = new BasicDBObject();
+ 
+    // 
+    _waveletDelta.append(FIELD_WAVE_ID, waveId);
+    _waveletDelta.append(FIELD_WAVELET_ID, waveletId);
+    
+    _waveletDelta.append(FIELD_APPLIEDATVERSION, 
serialize(waveletDelta.getAppliedAtVersion()));
+    _waveletDelta.append(FIELD_APPLIED, 
waveletDelta.getAppliedDelta().getByteArray());
+    _waveletDelta.append(FIELD_TRANSFORMED, 
serialize(waveletDelta.getTransformedDelta()));
+    
+    return _waveletDelta;
+  }
+  
+  
+  public static DBObject serialize(HashedVersion hashedVersion) {
+    
+    BasicDBObject _hashedVersion = new BasicDBObject();
+    _hashedVersion.append(FIELD_VERSION, hashedVersion.getVersion());
+    _hashedVersion.append(FIELD_HISTORYHASH, hashedVersion.getHistoryHash());
+    
+    return _hashedVersion;
+    
+  }
+  
+  
+  public static DBObject serialize(ParticipantId participantId) {
+    
+    BasicDBObject _participantId = new BasicDBObject();
+    _participantId.append(FIELD_ADDRESS, participantId.getAddress());
+    
+    return _participantId;
+    
+  }
+  
+  public static DBObject serialize(TransformedWaveletDelta 
transformedWaveletDelta) {
+    
+    BasicDBObject _transformedWaveletDelta = new BasicDBObject();
+    _transformedWaveletDelta.append(FIELD_AUTHOR, 
serialize(transformedWaveletDelta.getAuthor()));
+    _transformedWaveletDelta.append(FIELD_RESULTINGVERSION, 
serialize(transformedWaveletDelta.getResultingVersion()));
+    _transformedWaveletDelta.append(FIELD_APPLICATIONTIMESTAMP, 
transformedWaveletDelta.getApplicationTimestamp());
+    
+    // Calculated value to provide DB implementation of 
MongoDBDeltaCollection.getDelta(long version)
+    _transformedWaveletDelta.append(FIELD_APPLIEDATVERSION, 
transformedWaveletDelta.getAppliedAtVersion());
+       
+    BasicDBList _waveletOperations = new BasicDBList();
+    
+    for (WaveletOperation op: transformedWaveletDelta) {
+      _waveletOperations.add(serialize(op));
+    }
+    
+    _transformedWaveletDelta.append(FIELD_OPS, _waveletOperations);
+     
+    return _transformedWaveletDelta;
+
+  }
+  
+  
+  public static DBObject serialize(WaveletOperation waveletOp) {
+    
+    final BasicDBObject _op = new BasicDBObject();
+      
+      
+    if (waveletOp instanceof NoOp) {
+      _op.append(FIELD_TYPE, WAVELET_OP_NOOP);
+   
+    } else if (waveletOp instanceof AddParticipant) {
+      _op.append(FIELD_TYPE, WAVELET_OP_ADD_PARTICIPANT);
+      _op.append(FIELD_PARTICIPANT, serialize(((AddParticipant) 
waveletOp).getParticipantId()));
+      
+    } else if (waveletOp instanceof RemoveParticipant) {
+      
+      _op.append(FIELD_TYPE, WAVELET_OP_REMOVE_PARTICIPANT);
+      _op.append(FIELD_PARTICIPANT, serialize(((RemoveParticipant) 
waveletOp).getParticipantId()));
+      
+    } else if (waveletOp instanceof WaveletBlipOperation) {
+      
+      final WaveletBlipOperation waveletBlipOp = (WaveletBlipOperation) 
waveletOp;
+   
+      _op.append(FIELD_TYPE, WAVELET_OP_WAVELET_BLIP_OPERATION);
+      _op.append(FIELD_BLIPID, waveletBlipOp.getBlipId());
+
+      if (waveletBlipOp.getBlipOp() instanceof BlipContentOperation) {
+        
+        _op.append(FIELD_BLIPOP, serialize((BlipContentOperation) 
waveletBlipOp.getBlipOp()));
+        
+      } else {
+               
+        throw new IllegalArgumentException("Unsupported blip operation: " + 
waveletBlipOp.getBlipOp());
+      } 
+      
+    } else {
+      
+      throw new IllegalArgumentException("Unsupported wavelet operation: " + 
waveletOp);
+    }
+    
+    return _op;
+  }
+  
+  
+  
+  public static DBObject serialize(BlipContentOperation blipContentOp) {
+    
+    BasicDBObject _blipContentOp = new BasicDBObject();
+    _blipContentOp.append(FIELD_CONTENTOP, 
serialize(blipContentOp.getContentOp()));
+    return _blipContentOp;
+  }
+  
+  
+  public static DBObject serialize(DocOp docOp) {
+    
+    // This method relays on the provided CoreWaveletOperationSerializer, 
+    // because of complexity of serializing DocOp's
+    BasicDBObject _docOp = new BasicDBObject();
+    _docOp.append(FIELD_BYTES, 
CoreWaveletOperationSerializer.serialize(docOp).toByteArray());
+    return _docOp;
+  }
+  
+  
+  
+  public static WaveletDeltaRecord deserializeWaveletDeltaRecord(DBObject 
dbObject) throws PersistenceException {
+    
+   try {
+   
+     return new WaveletDeltaRecord(
+          deserializeHashedVersion((DBObject) 
dbObject.get(FIELD_APPLIEDATVERSION)), 
+          
ByteStringMessage.parseProtocolAppliedWaveletDelta(ByteString.copyFrom((byte[]) 
dbObject.get(FIELD_APPLIED))),
+          deserializeTransformedWaveletDelta((DBObject) 
dbObject.get(FIELD_TRANSFORMED)));
+  
+   } catch (InvalidProtocolBufferException e) {
+     
+     throw new PersistenceException(e);
+  }   
+
+  }
+  
+  public static HashedVersion deserializeHashedVersion(DBObject dbObject) {
+    
+    return HashedVersion.of((Long) dbObject.get(FIELD_VERSION), (byte[]) 
dbObject.get(FIELD_HISTORYHASH));
+  }
+  
+  
+  public static ParticipantId deserializeParicipantId(DBObject dbObject) {
+    
+    return ParticipantId.ofUnsafe((String) dbObject.get(FIELD_ADDRESS));
+  }
+  
+  
+  public static TransformedWaveletDelta 
deserializeTransformedWaveletDelta(DBObject dbObject) throws 
PersistenceException {
+    
+    ParticipantId author = deserializeParicipantId((DBObject) 
dbObject.get(FIELD_AUTHOR));
+    HashedVersion resultingVersion = deserializeHashedVersion((DBObject) 
dbObject.get(FIELD_RESULTINGVERSION)); 
+    long applicationTimestamp = (Long) 
dbObject.get(FIELD_APPLICATIONTIMESTAMP);
+    
+    BasicDBList dbOps = (BasicDBList) dbObject.get(FIELD_OPS);
+    ImmutableList.Builder<WaveletOperation> operations = 
ImmutableList.builder();
+    
+    int numOperations = dbOps.size();
+    
+    // Code analog to ProtoDeltaStoreDataSerializer.deserialize
+    for (int i = 0; i < numOperations; i++) {
+      
+      WaveletOperationContext context;
+      if (i == numOperations - 1) {
+        context = new WaveletOperationContext(author, applicationTimestamp, 1, 
resultingVersion);
+      } else {
+        context = new WaveletOperationContext(author, applicationTimestamp, 1);
+      }
+      operations.add(deserializeWaveletOperation((DBObject) dbOps.get(i), 
context));
+    }
+
+    
+    return new TransformedWaveletDelta(author, resultingVersion, 
applicationTimestamp, operations.build());
+    
+  }
+  
+  public static WaveletOperation deserializeWaveletOperation(DBObject 
dbObject, WaveletOperationContext context) throws PersistenceException {
+    
+    
+    String type = (String) dbObject.get(FIELD_TYPE);
+    
+    if (type.equals(WAVELET_OP_NOOP)) {
+      return new NoOp(context);
+    } else if (type.equals(WAVELET_OP_ADD_PARTICIPANT)) {
+      return new AddParticipant(context, deserializeParicipantId((DBObject) 
dbObject.get(FIELD_PARTICIPANT)));
+    } else if (type.equals(WAVELET_OP_REMOVE_PARTICIPANT)) {
+      return new RemoveParticipant(context, deserializeParicipantId((DBObject) 
dbObject.get(FIELD_PARTICIPANT)));
+    } else if (type.equals(WAVELET_OP_WAVELET_BLIP_OPERATION)) {
+      
+      return new WaveletBlipOperation((String) dbObject.get(FIELD_BLIPID), 
+                                       
deserializeBlipContentOperation((DBObject) dbObject.get(FIELD_BLIPOP), 
context));
+          
+         
+    } else {
+      throw new IllegalArgumentException("Unsupported operation: " + type);
+    }
+    
+  }
+
+
+  public static BlipOperation deserializeBlipContentOperation(DBObject 
dbObject,
+      WaveletOperationContext context) throws PersistenceException {
+
+    return new BlipContentOperation(context, deserializeDocOp((DBObject) 
dbObject.get(FIELD_CONTENTOP)));
+  }
+
+
+  private static DocOp deserializeDocOp(DBObject dbObject) throws 
PersistenceException {
+   
+    
+    try {
+      
+      return 
CoreWaveletOperationSerializer.deserialize(ProtocolDocumentOperation.parseFrom(((byte[])
 dbObject.get(FIELD_BYTES))));
+    
+    } catch (InvalidProtocolBufferException e) {
+       throw new PersistenceException(e);
+    }
+   
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
----------------------------------------------------------------------
diff --git 
a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java 
b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
index 6f5170f..4136258 100644
--- a/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
+++ b/src/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 
 import com.mongodb.DB;
 import com.mongodb.Mongo;
+import com.mongodb.MongoClient;
 import com.mongodb.MongoException;
 
 import org.waveprotocol.box.server.persistence.PersistenceStartException;
@@ -56,7 +57,7 @@ public class MongoDbProvider {
   private static final String DATABASE_NAME_PROPERTY = "mongoDbDatabase";
 
   /**
-   * Our {@link Mongo} instance, should be accessed by getMongo unless during
+   * Our {@link MongoClient} instance, should be accessed by getMongo unless 
during
    * start().
    */
   private Mongo mongo;
@@ -71,6 +72,11 @@ public class MongoDbProvider {
    */
   private MongoDbStore mongoDbStore;
 
+  /**
+   * Separated store for Deltas {@link MongoDbDeltaStore}
+   */
+  private MongoDbDeltaStore mongoDbDeltaStore;
+  
   /** Stores whether we have successfully setup a live {@link Mongo} instance. 
*/
   private boolean isRunning;
 
@@ -94,7 +100,8 @@ public class MongoDbProvider {
     String host = properties.getProperty(HOST_PROPERTY);
     int port = Integer.parseInt(properties.getProperty(PORT_PROPERTY));
     try {
-      mongo = new Mongo(host, port);
+      // New MongoDB Client, see 
http://docs.mongodb.org/manual/release-notes/drivers-write-concern/
+      mongo = new MongoClient(host, port);
     } catch (UnknownHostException e) {
       throw new PersistenceStartException("Unable to resolve the MongoDb 
hostname", e);
     }
@@ -182,4 +189,18 @@ public class MongoDbProvider {
     }
     return mongoDbStore;
   }
+  
+  /**
+   * Returns a {@link MongoDbDeltaStore} instance created from the settings in 
this 
+   * provider.
+   */
+  public MongoDbDeltaStore provideMongoDbDeltaStore() {
+    if (mongoDbDeltaStore == null) {
+      mongoDbDeltaStore = new MongoDbDeltaStore(getDatabase());
+    }
+    
+    return mongoDbDeltaStore;
+    
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-2.1.jar
----------------------------------------------------------------------
diff --git a/third_party/runtime/mongo-driver/mongo-2.1.jar 
b/third_party/runtime/mongo-driver/mongo-2.1.jar
deleted file mode 100644
index 0936004..0000000
Binary files a/third_party/runtime/mongo-driver/mongo-2.1.jar and /dev/null 
differ

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar
----------------------------------------------------------------------
diff --git a/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar 
b/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar
new file mode 100644
index 0000000..c075289
Binary files /dev/null and 
b/third_party/runtime/mongo-driver/mongo-java-driver-2.11.2.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cf5a4e50/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip
----------------------------------------------------------------------
diff --git a/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip 
b/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip
deleted file mode 100644
index 41f4a34..0000000
Binary files 
a/third_party/runtime/mongo-driver/mongo-java-driver-r2.1-0-src.zip and 
/dev/null differ

Reply via email to