http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java deleted file mode 100644 index 2bb1f93..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java +++ /dev/null @@ -1,621 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreDataSerializer; -import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreData.ProtoTransformedWaveletDelta; -import org.waveprotocol.box.server.shutdown.LifeCycle; -import org.waveprotocol.box.server.shutdown.ShutdownPriority; -import org.waveprotocol.box.server.shutdown.Shutdownable; -import org.waveprotocol.box.server.waveserver.AppliedDeltaUtil; -import org.waveprotocol.box.server.waveserver.ByteStringMessage; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.nio.channels.Channels; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; - -/** - * A flat file based implementation of DeltasAccess. This class provides a storage backend for the - * deltas in a single wavelet. - * - * The file starts with a header. The header contains the version of the file protocol. After the - * version, the file contains a sequence of delta records. Each record contains a header followed - * by a WaveletDeltaRecord. - * - * A particular FileDeltaCollection instance assumes that it's <em>the only one</em> reading and - * writing a particular wavelet. The methods are <em>not</em> multithread-safe. - * - * See this document for design specifics: - * https://sites.google.com/a/waveprotocol.org/wave-protocol/protocol/design-proposals/wave-store-design-for-wave-in-a-box - * - * @author [email protected] (Joseph Gentle) - */ -public class FileDeltaCollection implements DeltasAccess { - public static final String DELTAS_FILE_SUFFIX = ".deltas"; - public static final String INDEX_FILE_SUFFIX = ".index"; - - private static final byte[] FILE_MAGIC_BYTES = new byte[]{'W', 'A', 'V', 'E'}; - private static final int FILE_PROTOCOL_VERSION = 1; - private static final int FILE_HEADER_LENGTH = 8; - - private static final int DELTA_PROTOCOL_VERSION = 1; - - private static final Log LOG = Log.get(FileDeltaCollection.class); - - private final WaveletName waveletName; - private final RandomAccessFile file; - private final DeltaIndex index; - - private HashedVersion endVersion; - private boolean isOpen; - - - final private LifeCycle lifeCycle = new LifeCycle(FileDeltaCollection.class.getSimpleName(), - ShutdownPriority.Storage, new Shutdownable() { - @Override - public void shutdown() throws Exception { - close(); - } - }); - - /** - * A single record in the delta file. - */ - private class DeltaHeader { - /** Length in bytes of the header */ - public static final int HEADER_LENGTH = 12; - - /** The protocol version of the remaining fields. For now, must be 1. */ - public final int protoVersion; - - /** The length of the applied delta segment, in bytes. */ - public final int appliedDeltaLength; - public final int transformedDeltaLength; - - public DeltaHeader(int protoVersion, int appliedDeltaLength, int transformedDeltaLength) { - this.protoVersion = protoVersion; - this.appliedDeltaLength = appliedDeltaLength; - this.transformedDeltaLength = transformedDeltaLength; - } - - public void checkVersion() throws IOException { - if (protoVersion != DELTA_PROTOCOL_VERSION) { - throw new IOException("Invalid delta header"); - } - } - } - - /** - * Opens a file delta collection. - * - * @param waveletName name of the wavelet to open - * @param basePath base path of files - * @return an open collection - * @throws IOException - */ - public static FileDeltaCollection open(WaveletName waveletName, String basePath) - throws IOException { - Preconditions.checkNotNull(waveletName, "null wavelet name"); - - RandomAccessFile deltaFile = FileUtils.getOrCreateFile(deltasFile(basePath, waveletName)); - setOrCheckFileHeader(deltaFile); - DeltaIndex index = new DeltaIndex(indexFile(basePath, waveletName)); - - FileDeltaCollection collection = new FileDeltaCollection(waveletName, deltaFile, index); - - index.openForCollection(collection); - collection.initializeEndVersionAndTruncateTrailingJunk(); - - return collection; - } - - /** - * Delete the delta files from disk. - * - * @throws PersistenceException - */ - public static void delete(WaveletName waveletName, String basePath) throws PersistenceException { - String error = ""; - File deltas = deltasFile(basePath, waveletName); - - if (deltas.exists()) { - if (!deltas.delete()) { - error += "Could not delete deltas file: " + deltas.getAbsolutePath() + ". "; - } - } - - File index = indexFile(basePath, waveletName); - if (index.exists()) { - if (!index.delete()) { - error += "Could not delete index file: " + index.getAbsolutePath(); - } - } - if (!error.isEmpty()) { - throw new PersistenceException(error); - } - } - - /** - * Create a new file delta collection for the given wavelet. - * - * @param waveletName name of the wavelet - * @param deltaFile the file of deltas - * @param index index into deltas - */ - public FileDeltaCollection(WaveletName waveletName, RandomAccessFile deltaFile, - DeltaIndex index) { - this.waveletName = waveletName; - this.file = deltaFile; - this.index = index; - this.isOpen = true; - lifeCycle.start(); - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - @Override - public HashedVersion getEndVersion() { - return endVersion; - } - - @Override - public WaveletDeltaRecord getDelta(long version) throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - return seekToRecord(version) ? readRecord() : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - return seekToEndRecord(version) ? readRecord() : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) - throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - return seekToRecord(version) ? readAppliedDeltaFromRecord() : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - return seekToRecord(version) ? readTransformedDeltaFromRecord() : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - ByteStringMessage<ProtocolAppliedWaveletDelta> applied = getAppliedDelta(version); - - return (applied != null) ? AppliedDeltaUtil.getHashedVersionAppliedAt(applied) : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public HashedVersion getResultingVersion(long version) throws IOException { - lifeCycle.enter(); - try { - checkIsOpen(); - TransformedWaveletDelta transformed = getTransformedDelta(version); - - return (transformed != null) ? transformed.getResultingVersion() : null; - } finally { - lifeCycle.leave(); - } - } - - @Override - public void close() throws IOException { - file.close(); - index.close(); - endVersion = null; - isOpen = false; - } - - @Override - public void append(Collection<WaveletDeltaRecord> deltas) throws PersistenceException { - lifeCycle.enter(); - checkIsOpen(); - try { - file.seek(file.length()); - - WaveletDeltaRecord lastDelta = null; - for (WaveletDeltaRecord delta : deltas) { - index.addDelta(delta.getTransformedDelta().getAppliedAtVersion(), - delta.getTransformedDelta().size(), - file.getFilePointer()); - writeDelta(delta); - lastDelta = delta; - } - - // fsync() before returning. - file.getChannel().force(true); - endVersion = lastDelta.getTransformedDelta().getResultingVersion(); - } catch (IOException e) { - throw new PersistenceException(e); - } finally { - lifeCycle.leave(); - } - } - - @Override - public boolean isEmpty() { - checkIsOpen(); - return index.length() == 0; - } - - /** - * Creates a new iterator to move over the positions of the deltas in the file. - * - * Each pair returned is ((version, numOperations), offset). - * @throws IOException - */ - Iterable<Pair<Pair<Long,Integer>, Long>> getOffsetsIterator() throws IOException { - checkIsOpen(); - - return new Iterable<Pair<Pair<Long, Integer>, Long>>() { - @Override - public Iterator<Pair<Pair<Long, Integer>, Long>> iterator() { - return new Iterator<Pair<Pair<Long, Integer>, Long>>() { - Pair<Pair<Long, Integer>, Long> nextRecord; - long nextPosition = FILE_HEADER_LENGTH; - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public Pair<Pair<Long, Integer>, Long> next() { - Pair<Pair<Long, Integer>, Long> record = nextRecord; - nextRecord = null; - return record; - } - - @Override - public boolean hasNext() { - // We're using hasNext to prime the next call to next(). This works because in practice - // any call to next() is preceeded by at least one call to hasNext(). - // We need to actually read the record here because hasNext() should return false - // if there's any incomplete data at the end of the file. - try { - if (file.length() <= nextPosition) { - // End of file. - return false; - } - } catch (IOException e) { - throw new RuntimeException("Could not get file position", e); - } - - if (nextRecord == null) { - // Read the next record - try { - file.seek(nextPosition); - TransformedWaveletDelta transformed = readTransformedDeltaFromRecord(); - nextRecord = Pair.of(Pair.of(transformed.getAppliedAtVersion(), - transformed.size()), nextPosition); - nextPosition = file.getFilePointer(); - } catch (IOException e) { - // The next entry is invalid. There was probably a write error / crash. - LOG.severe("Error reading delta file for " + waveletName + " starting at " + - nextPosition, e); - return false; - } - } - - return true; - } - }; - } - }; - } - - @VisibleForTesting - static final File deltasFile(String basePath, WaveletName waveletName) { - String waveletPathPrefix = FileUtils.waveletNameToPathSegment(waveletName); - return new File(basePath, waveletPathPrefix + DELTAS_FILE_SUFFIX); - } - - @VisibleForTesting - static final File indexFile(String basePath, WaveletName waveletName) { - String waveletPathPrefix = FileUtils.waveletNameToPathSegment(waveletName); - return new File(basePath, waveletPathPrefix + INDEX_FILE_SUFFIX); - } - - /** - * Checks that a file has a valid deltas header, adding the header if the - * file is shorter than the header. - */ - private static void setOrCheckFileHeader(RandomAccessFile file) throws IOException { - Preconditions.checkNotNull(file); - file.seek(0); - - if (file.length() < FILE_HEADER_LENGTH) { - // The file is new. Insert a header. - file.write(FILE_MAGIC_BYTES); - file.writeInt(FILE_PROTOCOL_VERSION); - } else { - byte[] magic = new byte[4]; - file.readFully(magic); - if (!Arrays.equals(FILE_MAGIC_BYTES, magic)) { - throw new IOException("Delta file magic bytes are incorrect"); - } - - int version = file.readInt(); - if (version != FILE_PROTOCOL_VERSION) { - throw new IOException(String.format("File protocol version mismatch - expected %d got %d", - FILE_PROTOCOL_VERSION, version)); - } - } - } - - private void checkIsOpen() { - Preconditions.checkState(isOpen, "Delta collection closed"); - } - - /** - * Seek to the start of a delta record. Returns false if the record doesn't exist. - */ - private boolean seekToRecord(long version) throws IOException { - Preconditions.checkArgument(version >= 0, "Version can't be negative"); - long offset = index.getOffsetForVersion(version); - return seekTo(offset); - } - - /** - * Seek to the start of a delta record given its end version. - * Returns false if the record doesn't exist. - */ - private boolean seekToEndRecord(long version) throws IOException { - Preconditions.checkArgument(version >= 0, "Version can't be negative"); - long offset = index.getOffsetForEndVersion(version); - return seekTo(offset); - } - - private boolean seekTo(long offset) throws IOException { - if (offset == DeltaIndex.NO_RECORD_FOR_VERSION) { - // There's no record for the specified version. - return false; - } else { - file.seek(offset); - return true; - } - } - - /** - * Read a record and return it. - */ - private WaveletDeltaRecord readRecord() throws IOException { - DeltaHeader header = readDeltaHeader(); - - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - readAppliedDelta(header.appliedDeltaLength); - TransformedWaveletDelta transformedDelta = readTransformedWaveletDelta( - header.transformedDeltaLength); - - return new WaveletDeltaRecord(AppliedDeltaUtil.getHashedVersionAppliedAt(appliedDelta), - appliedDelta, transformedDelta); - } - - /** - * Reads a record, and only parses & returns the applied data field. - */ - private ByteStringMessage<ProtocolAppliedWaveletDelta> readAppliedDeltaFromRecord() - throws IOException { - DeltaHeader header = readDeltaHeader(); - - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - readAppliedDelta(header.appliedDeltaLength); - file.skipBytes(header.transformedDeltaLength); - - return appliedDelta; - } - - /** - * Reads a record, and only parses & returns the transformed data field. - */ - private TransformedWaveletDelta readTransformedDeltaFromRecord() throws IOException { - DeltaHeader header = readDeltaHeader(); - - file.skipBytes(header.appliedDeltaLength); - TransformedWaveletDelta transformedDelta = readTransformedWaveletDelta( - header.transformedDeltaLength); - - return transformedDelta; - } - - - // *** Low level data reading methods - - /** Read a header from the file. Does not move the file pointer before reading. */ - private DeltaHeader readDeltaHeader() throws IOException { - int version = file.readInt(); - if (version != DELTA_PROTOCOL_VERSION) { - throw new IOException("Delta header invalid"); - } - int appliedDeltaLength = file.readInt(); - int transformedDeltaLength = file.readInt(); - DeltaHeader deltaHeader = new DeltaHeader(version, appliedDeltaLength, transformedDeltaLength); - deltaHeader.checkVersion(); - // Verify the file size. - long remaining = file.length() - file.getFilePointer(); - long missing = (appliedDeltaLength + transformedDeltaLength) - remaining; - if (missing > 0) { - throw new IOException("File is corrupted, missing " + missing + " bytes"); - } - return deltaHeader; - } - - /** - * Write a header to the current location in the file - */ - private void writeDeltaHeader(DeltaHeader header) throws IOException { - file.writeInt(header.protoVersion); - file.writeInt(header.appliedDeltaLength); - file.writeInt(header.transformedDeltaLength); - } - - /** - * Read the applied delta at the current file position. After method call, - * file position is directly after applied delta field. - */ - private ByteStringMessage<ProtocolAppliedWaveletDelta> readAppliedDelta(int length) - throws IOException { - if (length == 0) { - return null; - } - - byte[] bytes = new byte[length]; - file.readFully(bytes); - try { - return ByteStringMessage.parseProtocolAppliedWaveletDelta(ByteString.copyFrom(bytes)); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - } - - /** - * Write an applied delta to the current position in the file. Returns number of bytes written. - */ - private int writeAppliedDelta(ByteStringMessage<ProtocolAppliedWaveletDelta> delta) - throws IOException { - if (delta != null) { - byte[] bytes = delta.getByteArray(); - file.write(bytes); - return bytes.length; - } else { - return 0; - } - } - - /** - * Read a {@link TransformedWaveletDelta} from the current location in the file. - */ - private TransformedWaveletDelta readTransformedWaveletDelta(int transformedDeltaLength) - throws IOException { - if(transformedDeltaLength < 0) { - throw new IOException("Invalid delta length"); - } - - byte[] bytes = new byte[transformedDeltaLength]; - file.readFully(bytes); - ProtoTransformedWaveletDelta delta; - try { - delta = ProtoTransformedWaveletDelta.parseFrom(bytes); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - return ProtoDeltaStoreDataSerializer.deserialize(delta); - } - - /** - * Write a {@link TransformedWaveletDelta} to the file at the current location. - * @return length of written data - */ - private int writeTransformedWaveletDelta(TransformedWaveletDelta delta) throws IOException { - long startingPosition = file.getFilePointer(); - ProtoTransformedWaveletDelta protoDelta = ProtoDeltaStoreDataSerializer.serialize(delta); - OutputStream stream = Channels.newOutputStream(file.getChannel()); - protoDelta.writeTo(stream); - return (int) (file.getFilePointer() - startingPosition); - } - - /** - * Read a delta to the file. Does not move the file pointer before writing. Returns number of - * bytes written. - */ - private long writeDelta(WaveletDeltaRecord delta) throws IOException { - // We'll write zeros in place of the header and come back & write it at the end. - long headerPointer = file.getFilePointer(); - file.write(new byte[DeltaHeader.HEADER_LENGTH]); - - int appliedLength = writeAppliedDelta(delta.getAppliedDelta()); - int transformedLength = writeTransformedWaveletDelta(delta.getTransformedDelta()); - - long endPointer = file.getFilePointer(); - file.seek(headerPointer); - writeDeltaHeader(new DeltaHeader(DELTA_PROTOCOL_VERSION, appliedLength, transformedLength)); - file.seek(endPointer); - - return endPointer - headerPointer; - } - - /** - * Reads the last complete record in the deltas file and truncates any trailing junk. - */ - private void initializeEndVersionAndTruncateTrailingJunk() throws IOException { - long numRecords = index.length(); - if (numRecords >= 1) { - endVersion = getDeltaByEndVersion(numRecords).getResultingVersion(); - } else { - endVersion = null; - } - // The file's position should be at the end. Truncate any - // trailing junk such as from a partially completed write. - file.setLength(file.getFilePointer()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java deleted file mode 100644 index 2677391..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaStore.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.box.stat.Timed; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * A flat file based implementation of DeltaStore. - * - * The delta store lives at some base directory. The directory structure looks like this: - * base/encoded-wave-id/encoded-wavelet-id.delta - * base/encoded-wave-id/encoded-wavelet-id.index - * - * See design doc: - * https://sites.google.com/a/waveprotocol.org/wave-protocol/protocol/design-proposals/wave-store-design-for-wave-in-a-box - * - - * @author [email protected] (Joseph Gentle) - */ -public class FileDeltaStore implements DeltaStore { - private static final Log LOG = Log.get(FileDeltaStore.class); - /** - * The directory in which the wavelets are stored - */ - final private String basePath; - - @Inject - public FileDeltaStore(Config config) { - this.basePath = config.getString("core.delta_store_directory"); - } - - @Timed - @Override - public FileDeltaCollection open(WaveletName waveletName) throws PersistenceException { - try { - return FileDeltaCollection.open(waveletName, basePath); - } catch (IOException e) { - throw new PersistenceException("Failed to open deltas for wavelet " + waveletName, e); - } - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException { - FileDeltaCollection.delete(waveletName, basePath); - } - - @Timed - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) throws PersistenceException { - String waveDirectory = FileUtils.waveIdToPathSegment(waveId); - File waveDir = new File(basePath, waveDirectory); - if (!waveDir.exists()) { - return ImmutableSet.of(); - } - - File[] deltaFiles = waveDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(FileDeltaCollection.DELTAS_FILE_SUFFIX); - } - }); - - ImmutableSet.Builder<WaveletId> results = ImmutableSet.builder(); - for(File deltaFile : deltaFiles) { - String name = deltaFile.getName(); - String encodedWaveletId = - name.substring(0, name.lastIndexOf(FileDeltaCollection.DELTAS_FILE_SUFFIX)); - WaveletId waveletId = FileUtils.waveletIdFromPathSegment(encodedWaveletId); - FileDeltaCollection deltas = open(WaveletName.of(waveId, waveletId)); - HashedVersion endVersion = deltas.getEndVersion(); - if (endVersion != null && endVersion.getVersion() > 0) { - results.add(waveletId); - } - try { - deltas.close(); - } catch (IOException e) { - LOG.info("Failed to close deltas file " + name, e); - } - } - - return results.build(); - } - - @Timed - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() { - File baseDir = new File(basePath); - if (!baseDir.exists()) { - return ExceptionalIterator.Empty.create(); - } - - File[] waveDirs = baseDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - try { - FileUtils.waveIdFromPathSegment(name); - return true; - } catch (IllegalArgumentException e) { - return false; - } - } - }); - - final ImmutableSet.Builder<WaveId> results = ImmutableSet.builder(); - for(File waveDir : waveDirs) { - results.add(FileUtils.waveIdFromPathSegment(waveDir.getName())); - } - - return new ExceptionalIterator<WaveId, PersistenceException>() { - private final Iterator<WaveId> iterator = results.build().iterator(); - private boolean nextFetched = false; - private WaveId nextWaveId = null; - - private void fetchNext() throws PersistenceException { - while(!nextFetched) { - if (iterator.hasNext()) { - nextWaveId = iterator.next(); - if (!lookup(nextWaveId).isEmpty()) { - nextFetched = true; - } - } else { - nextFetched = true; - nextWaveId = null; - } - } - } - - @Override - public boolean hasNext() throws PersistenceException { - fetchNext(); - return nextWaveId != null; - } - - @Override - public WaveId next() throws PersistenceException { - fetchNext(); - if (nextWaveId == null) { - throw new NoSuchElementException(); - } else { - nextFetched = false; - return nextWaveId; - } - } - - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java deleted file mode 100644 index bf20f0a..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileSignerInfoStore.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.commons.codec.binary.Hex; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.SignerInfoStore; -import org.waveprotocol.wave.crypto.CertPathStore; -import org.waveprotocol.wave.crypto.DefaultCertPathStore; -import org.waveprotocol.wave.crypto.SignatureException; -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; - -/** - * A flat file based implementation of {@link SignerInfoStore} - * - * @author [email protected] (Tad Glines) - */ -public class FileSignerInfoStore implements SignerInfoStore { - private static final String SIGNER_FILE_EXTENSION = ".signer"; - private final String signerInfoStoreBasePath; - private final CertPathStore certPathStore = new DefaultCertPathStore(); - - private static final Log LOG = Log.get(FileSignerInfoStore.class); - - @Inject - public FileSignerInfoStore(Config config) { - this.signerInfoStoreBasePath = config.getString("core.signer_info_store_directory"); - } - - private String signerIdToFileName(byte[] id) { - return signerInfoStoreBasePath + File.separator + new String(Hex.encodeHex(id)) - + SIGNER_FILE_EXTENSION; - } - - @Override - public void initializeSignerInfoStore() throws PersistenceException { - FileUtils.performDirectoryChecks(signerInfoStoreBasePath, SIGNER_FILE_EXTENSION, - "signer info store", LOG); - } - - @Override - public SignerInfo getSignerInfo(byte[] signerId) throws SignatureException { - synchronized(certPathStore) { - SignerInfo signerInfo = certPathStore.getSignerInfo(signerId); - File signerFile = new File(signerIdToFileName(signerId)); - if (signerInfo == null) { - if (signerFile.exists()) { - FileInputStream file = null; - try { - file = new FileInputStream(signerFile); - ProtocolSignerInfo data = ProtocolSignerInfo.newBuilder().mergeFrom(file).build(); - signerInfo = new SignerInfo(data); - } catch (SignatureException | IOException e) { - throw new SignatureException("Failed to parse signer info from file: " - + signerFile.getAbsolutePath(), e); - } finally { - FileUtils.closeAndIgnoreException(file, signerFile, LOG); - } - } - } - return signerInfo; - } - } - - @Override - public void putSignerInfo(ProtocolSignerInfo protoSignerInfo) throws SignatureException { - synchronized(certPathStore) { - SignerInfo signerInfo = new SignerInfo(protoSignerInfo); - File signerFile = new File(signerIdToFileName(signerInfo.getSignerId())); - FileOutputStream file = null; - try { - file = new FileOutputStream(signerFile); - file.write(protoSignerInfo.toByteArray()); - file.flush(); - certPathStore.putSignerInfo(protoSignerInfo); - } catch (IOException e) { - throw new SignatureException("Failed to write signer info to file: " - + signerFile.getAbsolutePath(), e); - } finally { - FileUtils.closeAndIgnoreException(file, signerFile, LOG); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileUtils.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileUtils.java deleted file mode 100644 index 3f63824..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileUtils.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.file; - -import com.google.common.base.Preconditions; - -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.UnsupportedEncodingException; - -/** - * Utility methods for file stores. - * - * @author [email protected] (Joseph Gentle) - */ -public class FileUtils { - private static final String SEPARATOR = "_"; - - /** - * Converts an arbitrary string into a format that can be stored safely on the filesystem. - * - * @param str the string to encode - * @return the encoded string - */ - public static String toFilenameFriendlyString(String str) { - byte[] bytes; - try { - bytes = str.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen. - throw new IllegalStateException("UTF-8 not supported", e); - } - - return new String(Hex.encodeHex(bytes)); - } - - /** - * Decodes a string that was encoded using toFilenameFriendlyString. - * - * @param encoded the encoded string - * @return the decoded string - * @throws DecoderException the string's encoding is invalid - */ - public static String fromFilenameFriendlyString(String encoded) throws DecoderException { - byte[] bytes = Hex.decodeHex(encoded.toCharArray()); - try { - return new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - // This should never happen. - throw new IllegalStateException("UTF-8 not supported", e); - } - } - - /** Decode a path segment pair. Throws IllegalArgumentException if the encoding is invalid */ - private static Pair<String, String> decodePathSegmentPair(String pathSegment) { - String[] components = pathSegment.split(SEPARATOR); - Preconditions.checkArgument(components.length == 2, "WaveId path name invalid"); - try { - return new Pair<String, String>(fromFilenameFriendlyString(components[0]), - fromFilenameFriendlyString(components[1])); - } catch (DecoderException e) { - throw new IllegalArgumentException("Wave path component encoding invalid"); - } - } - - /** - * Creates a filename-friendly pathname for the given waveId. - * - * The format is DOMAIN + '_' + ID where both the domain and the id are encoded - * to a pathname friendly format. - * - * @param waveId the waveId to encode - * @return a path segment which corresponds to the waveId - */ - public static String waveIdToPathSegment(WaveId waveId) { - String domain = toFilenameFriendlyString(waveId.getDomain()); - String id = toFilenameFriendlyString(waveId.getId()); - return domain + SEPARATOR + id; - } - - /** - * Converts a path segment created using waveIdToPathSegment back to a wave id - * - * @param pathSegment - * @return the decoded WaveId - * @throws IllegalArgumentException the encoding on the path segment is invalid - */ - public static WaveId waveIdFromPathSegment(String pathSegment) { - Pair<String, String> segments = decodePathSegmentPair(pathSegment); - return WaveId.of(segments.first, segments.second); - } - - /** - * Creates a filename-friendly path segment for a waveId. - * - * The format is "domain_id", encoded in a pathname friendly format. - * @param waveletId - * @return the decoded WaveletId - */ - public static String waveletIdToPathSegment(WaveletId waveletId) { - String domain = toFilenameFriendlyString(waveletId.getDomain()); - String id = toFilenameFriendlyString(waveletId.getId()); - return domain + SEPARATOR + id; - } - - /** - * Converts a path segment created using waveIdToPathSegment back to a wave id. - * - * @param pathSegment - * @return the decoded waveletId - * @throws IllegalArgumentException the encoding on the path segment is invalid - */ - public static WaveletId waveletIdFromPathSegment(String pathSegment) { - Pair<String, String> segments = decodePathSegmentPair(pathSegment); - return WaveletId.of(segments.first, segments.second); - } - - /** - * Creates a filename-friendly path segment for a wavelet name. - * - * @return the filename-friendly path segment representing the wavelet - */ - public static String waveletNameToPathSegment(WaveletName waveletName) { - return waveIdToPathSegment(waveletName.waveId) - + File.separatorChar - + waveletIdToPathSegment(waveletName.waveletId); - } - - /** - * Get a file for random binary access. If the file doesn't exist, it will be created. - * - * Calls to write() will not flush automatically. Call file.getChannel().force(true) to force - * writes to flush to disk. - * - * @param fileRef the file to open - * @return an opened RandomAccessFile wrapping the requested file - * @throws IOException an error occurred opening or creating the file - */ - public static RandomAccessFile getOrCreateFile(File fileRef) throws IOException { - if (!fileRef.exists()) { - fileRef.getParentFile().mkdirs(); - fileRef.createNewFile(); - } - - RandomAccessFile file; - try { - file = new RandomAccessFile(fileRef, "rw"); - } catch (FileNotFoundException e) { - // This should never happen. - throw new IllegalStateException("Java said the file exists, but it can't open it", e); - } - - return file; - } - - /** Create and return a new temporary directory */ - public static File createTemporaryDirectory() throws IOException { - // We want a temporary directory. createTempFile will make a file with a - // good temporary path. Its a bit nasty, but we'll create the file, then - // delete it and create a directory with the same name. - - File dir = File.createTempFile("fedoneattachments", null); - - if (!dir.delete() || !dir.mkdir()) { - throw new IOException("Could not make temporary directory for attachment store: " - + dir); - } - - return dir.getAbsoluteFile(); - } - - /** - * Close the closeable and log, but ignore, any exception thrown. - */ - public static void closeAndIgnoreException(Closeable closeable, File file, Log LOG) { - if (closeable != null) { - try { - closeable.close(); - } catch (IOException e) { - // This should never happen in practice. But just in case... log it. - LOG.warning("Failed to close file: " + file.getAbsolutePath(), e); - } - } - } - - /** - * Create dir if it doesn't exist, and perform checks to make sure that the dir's contents are - * listable, that files can be created in the dir, and that files in the dir are readable. - */ - public static void performDirectoryChecks(String dir, final String extension, String dirType, - Log LOG) throws PersistenceException { - File baseDir = createDirIfNotExists(dir, dirType); - - // Make sure we can read files by trying to read one of the files. - File[] files = baseDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(extension); - } - }); - - if (files == null) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) does not appear to be readable!", dirType, dir)); - } - - /* - * If file list isn't empty, try opening the first file in the list to make sure it - * is readable. If the first file is readable, then it is likely that the rest will - * be readable as well. - */ - if (files.length > 0) { - try { - FileInputStream file = new FileInputStream(files[0]); - file.read(); - } catch (IOException e) { - throw new PersistenceException( - String.format( - "Failed to read '%s' in configured %s directory '%s'. " - + "The directory's contents do not appear to be readable.", - dirType, files[0].getName(), dir), - e); - } - } - - // Make sure the dir is writable. - try { - File tmp = File.createTempFile("tempInitialization", ".temp", baseDir); - FileOutputStream stream = new FileOutputStream(tmp); - stream.write(new byte[]{'H','e','l','l','o'}); - stream.close(); - tmp.delete(); - } catch (IOException e) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) does not appear to be writable!", dirType, dir), e); - } - } - - /** - * Creates a directory if it doesn't exist. - * - * @param dir the directory location. - * @param dirType the directory type description (only useful for logs). - * @return the File directory (it's created if doesn't exist). - * @throws PersistenceException if the directory doesn't exist and cannot be - * created or when the path described by <code>dir</code> is not a - * directory. - */ - public static File createDirIfNotExists(String dir, String dirType) throws PersistenceException { - File baseDir = new File(dir); - - // Make sure the dir exists. - if (!baseDir.exists()) { - // It doesn't so try and create it. - if (!baseDir.mkdirs()) { - throw new PersistenceException(String.format( - "Configured %s directory (%s) doesn't exist and could not be created!", dirType, dir)); - } - } - - // Make sure the dir is a directory. - if (!baseDir.isDirectory()) { - throw new PersistenceException(String.format( - "Configured %s path (%s) isn't a directory!", dirType, dir)); - } - return baseDir; - } - - /** - * Opens a file and read the content to a string. - * - * @param fileName the file to read. - * @return a string with the content of the file. - * @throws IOException if the method failed to open the file for reading. - */ - public static String getStringFromFile(String fileName) throws IOException { - StringBuilder stringBuilder = new StringBuilder(); - File file = new File(fileName); - FileReader fileReader = new FileReader(file); - BufferedReader bufferedReader = new BufferedReader(fileReader); - - String line; - while ((line = bufferedReader.readLine()) != null) { - stringBuilder.append(line); - } - fileReader.close(); - - return stringBuilder.toString(); - } - - public static boolean isDirExistsAndNonEmpty(String dir) { - File baseDir = new File(dir); - if (!(baseDir.exists() && baseDir.isDirectory()) || baseDir.list().length == 0) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java deleted file mode 100644 index eaaab39..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/FSIndexDirectory.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.file.FileUtils; -import org.waveprotocol.box.server.waveserver.IndexException; - -import java.io.File; -import java.io.IOException; - -/** - * File system based {@link IndexDirectory}. - * - * @author A. Kaplanov - */ -public class FSIndexDirectory implements IndexDirectory { - - private Directory directory; - - @Inject - public FSIndexDirectory(Config config) { - String directoryName = config.getString("core.index_directory"); - if (directory == null) { - File file; - try { - file = FileUtils.createDirIfNotExists(directoryName, ""); - } catch (PersistenceException e) { - throw new IndexException("Cannot create index directory " + directoryName, e); - } - try { - directory = FSDirectory.open(file); - } catch (IOException e) { - throw new IndexException("Cannot open index directory " + directoryName, e); - } - } - } - - @Override - public Directory getDirectory() throws IndexException { - return directory; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java deleted file mode 100644 index 1ce1438..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/IndexDirectory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import org.apache.lucene.store.Directory; -import org.waveprotocol.box.server.waveserver.IndexException; - -/** - * Provides the directory to store/access the index files. - * - * @author A. Kaplanov - */ -public interface IndexDirectory { - - Directory getDirectory() throws IndexException; -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java deleted file mode 100644 index 4e3d295..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/lucene/RAMIndexDirectory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.persistence.lucene; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMDirectory; -import org.waveprotocol.box.server.waveserver.IndexException; - -/** - * RAM based {@link IndexDirectory}. - * - * @author A. Kaplanov - */ -public class RAMIndexDirectory implements IndexDirectory { - - private final Directory directory = new RAMDirectory(); - - @Override - public Directory getDirectory() throws IndexException { - return directory; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java deleted file mode 100644 index da6a85e..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.memory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.waveserver.ByteStringMessage; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.util.Collection; -import java.util.Map; - -/** - * An in-memory implementation of DeltasAccess - * - * @author [email protected] (Joseph Gentle) - */ -public class MemoryDeltaCollection implements DeltasAccess { - private final Map<Long, WaveletDeltaRecord> deltas = Maps.newHashMap(); - private final Map<Long, WaveletDeltaRecord> endDeltas = Maps.newHashMap(); - private final WaveletName waveletName; - - private HashedVersion endVersion = null; - - public MemoryDeltaCollection(WaveletName waveletName) { - Preconditions.checkNotNull(waveletName); - this.waveletName = waveletName; - } - - @Override - public boolean isEmpty() { - return deltas.isEmpty(); - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - @Override - public HashedVersion getEndVersion() { - return endVersion; - } - - @Override - public WaveletDeltaRecord getDelta(long version) { - return deltas.get(version); - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) { - return endDeltas.get(version); - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws InvalidProtocolBufferException { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getAppliedAtVersion() : null; - } - - @Override - public HashedVersion getResultingVersion(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getTransformedDelta().getResultingVersion() : null; - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getAppliedDelta() : null; - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) { - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getTransformedDelta() : null; - } - - @Override - public void close() { - // Does nothing. - } - - @Override - public void append(Collection<WaveletDeltaRecord> newDeltas) { - for (WaveletDeltaRecord delta : newDeltas) { - // Before: ... | D | - // start end - // After: ... | D | D + 1 | - // start end - long startVersion = delta.getTransformedDelta().getAppliedAtVersion(); - Preconditions.checkState( - (startVersion == 0 && endVersion == null) || - (startVersion == endVersion.getVersion())); - deltas.put(startVersion, delta); - endVersion = delta.getTransformedDelta().getResultingVersion(); - endDeltas.put(endVersion.getVersion(), delta); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java deleted file mode 100644 index db1606a..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaStore.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.memory; - -import com.google.common.collect.ImmutableSet; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.util.CollectionUtils; - -import java.util.Map; - -/** - * A simple in-memory implementation of DeltaStore. - * - * @author [email protected] (Joseph Gentle) - */ -public class MemoryDeltaStore implements DeltaStore { - /** - * The actual data. - * Note: We don't remove map entries in the top-level map when all wavelets in a wave are deleted. - * This is a very small memory leak that won't cause problems in practice. - */ - private final Map<WaveId, Map<WaveletId, MemoryDeltaCollection>> data = - CollectionUtils.newHashMap(); - - private Map<WaveletId, MemoryDeltaCollection> getOrCreateWaveData(WaveId waveId) { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveId); - if (waveData == null) { - waveData = CollectionUtils.newHashMap(); - data.put(waveId, waveData); - } - - return waveData; - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveletName.waveId); - if (waveData == null) { - throw new FileNotFoundPersistenceException("WaveletData unknown"); - } else { - if (waveData.remove(waveletName.waveletId) == null) { - // Nothing was removed. - throw new FileNotFoundPersistenceException("Nothing was removed"); - } - } - } - - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) { - Map<WaveletId, MemoryDeltaCollection> waveData = data.get(waveId); - if (waveData == null) { - return ImmutableSet.of(); - } else { - ImmutableSet.Builder<WaveletId> builder = ImmutableSet.builder(); - for (MemoryDeltaCollection collection : waveData.values()) { - if (!collection.isEmpty()) { - builder.add(collection.getWaveletName().waveletId); - } - } - return builder.build(); - } - } - - @Override - public DeltasAccess open(WaveletName waveletName) { - Map<WaveletId, MemoryDeltaCollection> waveData = getOrCreateWaveData(waveletName.waveId); - - MemoryDeltaCollection collection = waveData.get(waveletName.waveletId); - if (collection == null) { - collection = new MemoryDeltaCollection(waveletName); - waveData.put(waveletName.waveletId, collection); - } - - return collection; - } - - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() { - ImmutableSet.Builder<WaveId> builder = ImmutableSet.builder(); - // Filter out empty waves - for (Map.Entry<WaveId, Map<WaveletId, MemoryDeltaCollection>> e : data.entrySet()) { - if (!e.getValue().isEmpty()) { - for (MemoryDeltaCollection collection : e.getValue().values()) { - if (!collection.isEmpty()) { - builder.add(e.getKey()); - break; - } - } - } - } - return ExceptionalIterator.FromIterator.create(builder.build().iterator()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryStore.java deleted file mode 100644 index de0cbda..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryStore.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.memory; - -import org.waveprotocol.box.server.account.AccountData; -import org.waveprotocol.box.server.persistence.AccountStore; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.persistence.SignerInfoStore; -import org.waveprotocol.wave.crypto.CertPathStore; -import org.waveprotocol.wave.crypto.DefaultCertPathStore; -import org.waveprotocol.wave.crypto.SignatureException; -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.model.wave.ParticipantId; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * In-memory implementation of persistence. - * - * <p> - * {@link CertPathStore} implementation just forwards to the - * {@link DefaultCertPathStore}. - * - *<p> - *{@link AccountStore} implementation stores {@link AccountData} in a map keyed by username. - * - * @author [email protected] (Lennard de Rijk) - * - */ -public class MemoryStore implements SignerInfoStore, AccountStore { - - private final CertPathStore certPathStore; - - public MemoryStore() { - certPathStore = new DefaultCertPathStore(); - accountStore = new ConcurrentHashMap<ParticipantId, AccountData>(); - } - - @Override - public void initializeSignerInfoStore() throws PersistenceException { - // Nothing to initialize - } - - @Override - public SignerInfo getSignerInfo(byte[] signerId) throws SignatureException { - return certPathStore.getSignerInfo(signerId); - } - - @Override - public void putSignerInfo(ProtocolSignerInfo protobuff) throws SignatureException { - certPathStore.putSignerInfo(protobuff); - } - - - /* - * AccountStore - */ - - private final Map<ParticipantId, AccountData> accountStore; - - @Override - public void initializeAccountStore() { - // Nothing to initialize - } - - @Override - public AccountData getAccount(ParticipantId id) { - return accountStore.get(id); - } - - @Override - public void putAccount(AccountData account) { - accountStore.put(account.getId(), account); - } - - @Override - public void removeAccount(ParticipantId id) { - accountStore.remove(id); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/migration/DeltaMigrator.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/migration/DeltaMigrator.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/migration/DeltaMigrator.java deleted file mode 100644 index 3b68937..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/migration/DeltaMigrator.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.migration; - -import com.google.common.collect.ImmutableSet; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.IOException; -import java.util.ArrayList; - -/** - * - * An utility class to copy all deltas between storages. Already existing Waves - * in the target store wont be changed. - * - * It is NOT an incremental process. - * - * @author [email protected] (Pablo Ojanguren) - * - */ -public class DeltaMigrator { - - private static final Log LOG = Log.get(DeltaMigrator.class); - - protected DeltaStore sourceStore = null; - protected DeltaStore targetStore = null; - - - public DeltaMigrator(DeltaStore sourceStore, DeltaStore targetStore) { - this.sourceStore = sourceStore; - this.targetStore = targetStore; - } - - - - public void run() { - - - LOG.info("Starting Wave migration from " + sourceStore.getClass().getSimpleName() + " to " - + targetStore.getClass().getSimpleName()); - - long startTime = System.currentTimeMillis(); - - - try { - - ExceptionalIterator<WaveId, PersistenceException> srcItr = sourceStore.getWaveIdIterator(); - - // Waves - while (srcItr.hasNext()) { - - WaveId waveId = srcItr.next(); - - ImmutableSet<WaveletId> waveletIds = sourceStore.lookup(waveId); - - if (!targetStore.lookup(waveId).isEmpty()) { - LOG.info("Skipping Wave because it's found in target store : " + waveId.toString()); - continue; - } - - LOG.info("Migrating Wave : " + waveId.toString() + " with " + waveletIds.size() - + " wavelets"); - - int waveletsTotal = waveletIds.size(); - int waveletsCount = 0; - - // Wavelets - for (WaveletId waveletId : waveletIds) { - - waveletsCount++; - - LOG.info("Migrating wavelet " + waveletsCount + "/" + waveletsTotal + " : " - + waveletId.toString()); - - DeltasAccess sourceDeltas = sourceStore.open(WaveletName.of(waveId, waveletId)); - DeltasAccess targetDeltas = targetStore.open(WaveletName.of(waveId, waveletId)); - - // Get all deltas from last version to initial version (0): reverse - // order - int deltasCount = 0; - - ArrayList<WaveletDeltaRecord> deltas = new ArrayList<WaveletDeltaRecord>(); - HashedVersion deltaResultingVersion = sourceDeltas.getEndVersion(); - - // Deltas - while (deltaResultingVersion != null && deltaResultingVersion.getVersion() != 0) { - deltasCount++; - WaveletDeltaRecord deltaRecord = - sourceDeltas.getDeltaByEndVersion(deltaResultingVersion.getVersion()); - deltas.add(deltaRecord); - // get the previous delta, this is the appliedAt - deltaResultingVersion = deltaRecord.getAppliedAtVersion(); - } - LOG.info("Appending " + deltasCount + "deltas to target"); - targetDeltas.append(deltas); - } - } // While Waves - - long endTime = System.currentTimeMillis(); - - LOG.info("Migration completed. Total time = " + (endTime - startTime) + "ms"); - - } catch (PersistenceException e) { - - throw new RuntimeException(e); - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java deleted file mode 100644 index e9a326b..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.mongodb; - -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.mongodb.WriteConcern; - -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.ByteStringMessage; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.io.IOException; -import java.util.Collection; - -/** - * A MongoDB based Delta Access implementation using a simple <b>deltas</b> - * collection, storing a delta record per each MongoDb document. - * - * @author [email protected] (Pablo Ojanguren) - * - */ -public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { - - /** Wavelet name to work with. */ - private final WaveletName waveletName; - - /** MongoDB Collection object for delta storage */ - private final DBCollection deltaDbCollection; - - /** - * Construct a new Delta Access object for the wavelet - * - * @param waveletName The wavelet name. - * @param deltaDbCollection The MongoDB deltas collection - */ - public MongoDbDeltaCollection(WaveletName waveletName, DBCollection deltaDbCollection) { - this.waveletName = waveletName; - this.deltaDbCollection = deltaDbCollection; - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - /** - * Create a new DBObject for a common query to select this wavelet - * - * @return DBObject query - */ - protected DBObject createWaveletDBQuery() { - - DBObject query = new BasicDBObject(); - query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveletName.waveId.serialise()); - query.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, waveletName.waveletId.serialise()); - - return query; - } - - @Override - public boolean isEmpty() { - - return deltaDbCollection.count(createWaveletDBQuery()) == 0; - } - - @Override - public HashedVersion getEndVersion() { - - // Search the max of delta.getTransformedDelta().getResultingVersion() - - DBObject query = createWaveletDBQuery(); - - DBObject sort = new BasicDBObject(); - sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, -1); // Descending - - DBObject field = new BasicDBObject(); - field.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION, 1); - - DBObject result = deltaDbCollection.findOne(query, field, sort); - - return result != null ? MongoDbDeltaStoreUtil - .deserializeHashedVersion((DBObject) ((DBObject) result - .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED)) - .get(MongoDbDeltaStoreUtil.FIELD_RESULTINGVERSION)) : null; - } - - @Override - public WaveletDeltaRecord getDelta(long version) throws IOException { - - DBObject query = createWaveletDBQuery(); - query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - - DBObject result = deltaDbCollection.findOne(query); - - WaveletDeltaRecord waveletDelta = null; - - if (result != null) try { - waveletDelta = MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result); - } catch (PersistenceException e) { - throw new IOException(e); - } - return waveletDelta; - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { - DBObject query = createWaveletDBQuery(); - query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, version); - - DBObject result = deltaDbCollection.findOne(query); - - WaveletDeltaRecord waveletDelta = null; - - if (result != null) - try { - MongoDbDeltaStoreUtil.deserializeWaveletDeltaRecord(result); - } catch (PersistenceException e) { - throw new IOException(e); - } - return waveletDelta; - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws IOException { - - DBObject query = createWaveletDBQuery(); - query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - - DBObject result = deltaDbCollection.findOne(query); - - if (result != null) - return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result - .get(MongoDbDeltaStoreUtil.FIELD_APPLIEDATVERSION)); - return null; - } - - @Override - public HashedVersion getResultingVersion(long version) throws IOException { - DBObject query = createWaveletDBQuery(); - query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - - DBObject result = deltaDbCollection.findOne(query); - - if (result != null) - return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result - .get(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION)); - return null; - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) - throws IOException { - - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getAppliedDelta() : null; - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { - - WaveletDeltaRecord delta = getDelta(version); - return (delta != null) ? delta.getTransformedDelta() : null; - } - - @Override - public void close() throws IOException { - // Does nothing. - } - - @Override - public void append(Collection<WaveletDeltaRecord> newDeltas) throws PersistenceException { - - for (WaveletDeltaRecord delta : newDeltas) { - // Using Journaled Write Concern - // (http://docs.mongodb.org/manual/core/write-concern/#journaled) - deltaDbCollection.insert(MongoDbDeltaStoreUtil.serialize(delta, - waveletName.waveId.serialise(), waveletName.waveletId.serialise()), - WriteConcern.JOURNALED); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java deleted file mode 100644 index 05aacbd..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.persistence.mongodb; - -import com.google.common.collect.ImmutableSet; - -import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoException; -import com.mongodb.WriteConcern; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.waveserver.DeltaStore; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; - -import java.util.List; - -/** - * A MongoDB based Delta Store implementation using a simple <b>deltas</b> - * collection, storing a delta record per each MongoDb document. - * - * @author [email protected] (Pablo Ojanguren) - * - */ -public class MongoDbDeltaStore implements DeltaStore { - - /** Name of the MongoDB collection to store Deltas */ - private static final String DELTAS_COLLECTION = "deltas"; - - /** Database connection object */ - private final DB database; - - /** - * Construct a new store - * - * @param database the database connection object - */ - public MongoDbDeltaStore(DB database) { - this.database = database; - } - - @Override - public DeltasAccess open(WaveletName waveletName) throws PersistenceException { - - return new MongoDbDeltaCollection(waveletName, getDeltaDbCollection()); - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException, - FileNotFoundPersistenceException { - - DBObject criteria = new BasicDBObject(); - criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveletName.waveId.serialise()); - criteria.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, waveletName.waveletId.serialise()); - - try { - // Using Journaled Write Concern - // (http://docs.mongodb.org/manual/core/write-concern/#journaled) - getDeltaDbCollection().remove(criteria, WriteConcern.JOURNALED); - } catch (MongoException e) { - throw new PersistenceException(e); - } - } - - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) throws PersistenceException { - - - DBObject query = new BasicDBObject(); - query.put(MongoDbDeltaStoreUtil.FIELD_WAVE_ID, waveId.serialise()); - - DBObject projection = new BasicDBObject(); - projection.put(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID, 1); - - DBCursor cursor = null; - - try { - cursor = getDeltaDbCollection().find(query, projection); - } catch (MongoException e) { - throw new PersistenceException(e); - } - - - if (cursor == null || !cursor.hasNext()) { - return ImmutableSet.of(); - } else { - ImmutableSet.Builder<WaveletId> builder = ImmutableSet.builder(); - for (DBObject waveletIdDBObject : cursor) { - builder.add(WaveletId.deserialise((String) waveletIdDBObject - .get(MongoDbDeltaStoreUtil.FIELD_WAVELET_ID))); - } - return builder.build(); - } - } - - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() - throws PersistenceException { - - ImmutableSet.Builder<WaveId> builder = ImmutableSet.builder(); - - try { - - @SuppressWarnings("rawtypes") - List results = getDeltaDbCollection().distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID); - - for (Object o : results) - builder.add(WaveId.deserialise((String) o)); - - } catch (MongoException e) { - throw new PersistenceException(e); - } - - - return ExceptionalIterator.FromIterator.create(builder.build().iterator()); - } - - /** - * Access to deltas collection - * - * @return DBCollection of deltas - */ - private DBCollection getDeltaDbCollection() { - return database.getCollection(DELTAS_COLLECTION); - } -}
