http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/LucenePerUserWaveViewHandlerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/LucenePerUserWaveViewHandlerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/LucenePerUserWaveViewHandlerImpl.java deleted file mode 100644 index dc114cd..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/LucenePerUserWaveViewHandlerImpl.java +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.name.Named; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.IndexWriterConfig.OpenMode; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.*; -import org.apache.lucene.search.BooleanClause.Occur; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.Version; -import org.waveprotocol.box.server.CoreSettingsNames; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.IndexExecutor; -import org.waveprotocol.box.server.persistence.lucene.IndexDirectory; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.ParticipantIdUtil; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static org.waveprotocol.box.server.waveserver.IndexFieldType.*; - -/** - * Lucene based implementation of {@link PerUserWaveViewHandler}. - * - * @author [email protected] (Yuri Zelikov) - */ -@Singleton -public class LucenePerUserWaveViewHandlerImpl implements PerUserWaveViewHandler, Closeable { - - private static class WaveSearchWarmer implements SearcherWarmer { - - private final ParticipantId sharedDomainParticipantId; - - WaveSearchWarmer(String waveDomain) { - sharedDomainParticipantId = ParticipantIdUtil.makeUnsafeSharedDomainParticipantId(waveDomain); - } - - @Override - public void warm(IndexSearcher searcher) throws IOException { - // TODO (Yuri Z): Run some diverse searches, searching against all - // fields. - - BooleanQuery participantQuery = new BooleanQuery(); - participantQuery.add( - new TermQuery(new Term(WITH.toString(), sharedDomainParticipantId.getAddress())), - Occur.SHOULD); - searcher.search(participantQuery, MAX_WAVES); - } - } - - private static final Logger LOG = Logger.getLogger(LucenePerUserWaveViewHandlerImpl.class - .getName()); - - private static final Version LUCENE_VERSION = Version.LUCENE_35; - - /** The results will be returned in the ascending order according to last modified time. */ - private static Sort LMT_ASC_SORT = new Sort(new SortField("title", SortField.LONG)); - - /** Minimum time until a new reader can be opened. */ - private static final double MIN_STALE_SEC = 0.025; - - /** Maximum time until a new reader must be opened. */ - private static final double MAX_STALE_SEC = 1.0; - - /** Defines the maximum number of waves returned by the search. */ - private static final int MAX_WAVES = 10000; - - private final StandardAnalyzer analyzer; - private final IndexWriter indexWriter; - private final NRTManager nrtManager; - private final NRTManagerReopenThread nrtManagerReopenThread; - private final ReadableWaveletDataProvider waveletProvider; - private final Executor executor; - private boolean isClosed = false; - - @Inject - public LucenePerUserWaveViewHandlerImpl(IndexDirectory directory, - ReadableWaveletDataProvider waveletProvider, - @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) String domain, - @IndexExecutor Executor executor) { - this.waveletProvider = waveletProvider; - this.executor = executor; - analyzer = new StandardAnalyzer(LUCENE_VERSION); - try { - IndexWriterConfig indexConfig = new IndexWriterConfig(LUCENE_VERSION, analyzer); - indexConfig.setOpenMode(OpenMode.CREATE_OR_APPEND); - indexWriter = new IndexWriter(directory.getDirectory(), indexConfig); - nrtManager = new NRTManager(indexWriter, new WaveSearchWarmer(domain)); - } catch (IOException ex) { - throw new IndexException(ex); - } - - nrtManagerReopenThread = new NRTManagerReopenThread(nrtManager, MAX_STALE_SEC, MIN_STALE_SEC); - nrtManagerReopenThread.start(); - } - - /** - * Closes the handler, releases resources and flushes the recent index changes - * to persistent storage. - */ - @Override - public synchronized void close() { - if (isClosed) { - throw new AlreadyClosedException("Already closed"); - } - isClosed = true; - try { - nrtManager.close(); - if (analyzer != null) { - analyzer.close(); - } - nrtManagerReopenThread.close(); - indexWriter.close(); - } catch (IOException ex) { - LOG.log(Level.SEVERE, "Failed to close the Lucene index", ex); - } - LOG.info("Successfully closed the Lucene index..."); - } - - /** - * Ensures that the index is up to date. Exits quickly if no changes were done - * to the index. - * - * @throws IOException if something goes wrong. - */ - public void forceReopen() throws IOException { - nrtManager.maybeReopen(true); - } - - @Override - public ListenableFuture<Void> onParticipantAdded(final WaveletName waveletName, - ParticipantId participant) { - Preconditions.checkNotNull(waveletName); - Preconditions.checkNotNull(participant); - - ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() { - - @Override - public Void call() throws Exception { - ReadableWaveletData waveletData; - try { - waveletData = waveletProvider.getReadableWaveletData(waveletName); - updateIndex(waveletData); - } catch (WaveServerException e) { - LOG.log(Level.SEVERE, "Failed to update index for " + waveletName, e); - throw e; - } - return null; - } - }); - executor.execute(task); - return task; - } - - @Override - public ListenableFuture<Void> onParticipantRemoved(final WaveletName waveletName, - final ParticipantId participant) { - Preconditions.checkNotNull(waveletName); - Preconditions.checkNotNull(participant); - - ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() { - - @Override - public Void call() throws Exception { - ReadableWaveletData waveletData; - try { - waveletData = waveletProvider.getReadableWaveletData(waveletName); - try { - removeParticipantfromIndex(waveletData, participant, nrtManager); - } catch (IOException e) { - LOG.log(Level.SEVERE, "Failed to update index for " + waveletName, e); - throw e; - } - } catch (WaveServerException e) { - LOG.log(Level.SEVERE, "Failed to update index for " + waveletName, e); - throw e; - } - return null; - } - }); - executor.execute(task); - return task; - } - - @Override - public ListenableFuture<Void> onWaveInit(final WaveletName waveletName) { - Preconditions.checkNotNull(waveletName); - - ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() { - - @Override - public Void call() throws Exception { - ReadableWaveletData waveletData; - try { - waveletData = waveletProvider.getReadableWaveletData(waveletName); - updateIndex(waveletData); - } catch (WaveServerException e) { - LOG.log(Level.SEVERE, "Failed to initialize index for " + waveletName, e); - throw e; - } - return null; - } - }); - executor.execute(task); - return task; - } - - private void updateIndex(ReadableWaveletData wavelet) throws IndexException { - Preconditions.checkNotNull(wavelet); - try { - // TODO (Yuri Z): Update documents instead of totally removing and adding. - removeIndex(wavelet, nrtManager); - addIndex(wavelet, nrtManager); - indexWriter.commit(); - } catch (IOException e) { - throw new IndexException(String.valueOf(wavelet.getWaveletId()), e); - } - } - - private static void addIndex(ReadableWaveletData wavelet, - NRTManager nrtManager) throws IOException { - Document doc = new Document(); - addWaveletFieldsToIndex(wavelet, doc); - nrtManager.addDocument(doc); - } - - private static void addWaveletFieldsToIndex(ReadableWaveletData wavelet, Document doc) { - doc.add(new Field(WAVEID.toString(), wavelet.getWaveId().serialise(), Field.Store.YES, - Field.Index.NOT_ANALYZED)); - doc.add(new Field(WAVELETID.toString(), wavelet.getWaveletId().serialise(), Field.Store.YES, - Field.Index.NOT_ANALYZED)); - doc.add(new Field(LMT.toString(), Long.toString(wavelet.getLastModifiedTime()), Field.Store.NO, - Field.Index.NOT_ANALYZED)); - for (ParticipantId participant : wavelet.getParticipants()) { - doc.add(new Field(WITH.toString(), participant.toString(), Field.Store.YES, - Field.Index.NOT_ANALYZED)); - } - } - - private static void removeIndex(ReadableWaveletData wavelet, NRTManager nrtManager) - throws IOException { - BooleanQuery query = new BooleanQuery(); - query.add(new TermQuery(new Term(WAVEID.toString(), wavelet.getWaveId().serialise())), - BooleanClause.Occur.MUST); - query.add(new TermQuery(new Term(WAVELETID.toString(), wavelet.getWaveletId().serialise())), - BooleanClause.Occur.MUST); - nrtManager.deleteDocuments(query); - } - - private static void removeParticipantfromIndex(ReadableWaveletData wavelet, - ParticipantId participant, NRTManager nrtManager) throws IOException { - BooleanQuery query = new BooleanQuery(); - Term waveIdTerm = new Term(WAVEID.toString(), wavelet.getWaveId().serialise()); - query.add(new TermQuery(waveIdTerm), BooleanClause.Occur.MUST); - query.add(new TermQuery(new Term(WAVELETID.toString(), wavelet.getWaveletId().serialise())), - BooleanClause.Occur.MUST); - SearcherManager searcherManager = nrtManager.getSearcherManager(true); - IndexSearcher indexSearcher = searcherManager.acquire(); - try { - TopDocs hints = indexSearcher.search(query, MAX_WAVES); - for (ScoreDoc hint : hints.scoreDocs) { - Document document = indexSearcher.doc(hint.doc); - String[] participantValues = document.getValues(WITH.toString()); - document.removeFields(WITH.toString()); - for (String address : participantValues) { - if (address.equals(participant.getAddress())) { - continue; - } - document.add(new Field(WITH.toString(), address, Field.Store.YES, - Field.Index.NOT_ANALYZED)); - } - nrtManager.updateDocument(waveIdTerm, document); - } - } catch (IOException e) { - LOG.log(Level.WARNING, "Failed to fetch from index " + wavelet.toString(), e); - } finally { - try { - searcherManager.release(indexSearcher); - } catch (IOException e) { - LOG.log(Level.WARNING, "Failed to close searcher. ", e); - } - } - } - - - @Override - public Multimap<WaveId, WaveletId> retrievePerUserWaveView(ParticipantId user) { - Preconditions.checkNotNull(user); - - Multimap<WaveId, WaveletId> userWavesViewMap = HashMultimap.create(); - BooleanQuery participantQuery = new BooleanQuery(); - participantQuery.add(new TermQuery(new Term(WITH.toString(), user.getAddress())), Occur.SHOULD); - SearcherManager searcherManager = nrtManager.getSearcherManager(true); - IndexSearcher indexSearcher = searcherManager.acquire(); - try { - TopDocs hints = indexSearcher.search(participantQuery, MAX_WAVES, LMT_ASC_SORT); - for (ScoreDoc hint : hints.scoreDocs) { - Document document = indexSearcher.doc(hint.doc); - WaveId waveId = WaveId.deserialise(document.get(WAVEID.toString())); - WaveletId waveletId = WaveletId.deserialise(document.get(WAVELETID.toString())); - userWavesViewMap.put(waveId, waveletId); - } - } catch (IOException e) { - LOG.log(Level.WARNING, "Search failed: " + user, e); - } finally { - try { - searcherManager.release(indexSearcher); - } catch (IOException e) { - LOG.log(Level.WARNING, "Failed to close searcher. " + user, e); - } - } - return userWavesViewMap; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/LuceneWaveIndexerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/LuceneWaveIndexerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/LuceneWaveIndexerImpl.java deleted file mode 100644 index 0d30e5d..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/LuceneWaveIndexerImpl.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.waveprotocol.box.server.waveserver.PerUserWaveViewBus.Listener; -import org.waveprotocol.wave.model.id.WaveletName; - -/** - * @author [email protected] (Yuri Zelikov) - */ -@Singleton -public class LuceneWaveIndexerImpl extends AbstractWaveIndexer { - - private final PerUserWaveViewBus.Listener listener; - - @Inject - public LuceneWaveIndexerImpl(WaveMap waveMap, WaveletProvider waveletProvider, Listener listener) { - super(waveMap, waveletProvider); - this.listener = listener; - } - - @Override - protected void processWavelet(WaveletName waveletName) { - listener.onWaveInit(waveletName); - } - - @Override - protected void postIndexHook() { - try { - getWaveMap().unloadAllWavelets(); - } catch (WaveletStateException e) { - throw new IndexException("Problem encountered while cleaning up", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryPerUserWaveViewHandlerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryPerUserWaveViewHandlerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryPerUserWaveViewHandlerImpl.java deleted file mode 100644 index 4dff125..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryPerUserWaveViewHandlerImpl.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -/** - * @author [email protected] (Yuri Zelikov) - */ -@Singleton -public class MemoryPerUserWaveViewHandlerImpl implements PerUserWaveViewHandler { - - private static final Log LOG = Log.get(MemoryPerUserWaveViewHandlerImpl.class); - - /** - * The period of time in minutes the per user waves view should be actively - * kept up to date after last access. - */ - private static final int PER_USER_WAVES_VIEW_CACHE_MINUTES = 5; - - /** The loading cache that holds wave viev per each online user.*/ - public LoadingCache<ParticipantId, Multimap<WaveId, WaveletId>> explicitPerUserWaveViews; - - @Inject - public MemoryPerUserWaveViewHandlerImpl(final WaveMap waveMap) { - // Let the view expire if it not accessed for some time. - explicitPerUserWaveViews = - CacheBuilder.newBuilder().expireAfterAccess(PER_USER_WAVES_VIEW_CACHE_MINUTES, TimeUnit.MINUTES) - .<ParticipantId, Multimap<WaveId, WaveletId>>build(new CacheLoader<ParticipantId, Multimap<WaveId, WaveletId>>() { - - @Override - public Multimap<WaveId, WaveletId> load(final ParticipantId user) { - Multimap<WaveId, WaveletId> userView = HashMultimap.create(); - - // Create initial per user waves view by looping over all waves - // in the waves store. - Map<WaveId, Wave> waves = waveMap.getWaves(); - for (Map.Entry<WaveId, Wave> entry : waves.entrySet()) { - Wave wave = entry.getValue(); - for (WaveletContainer c : wave) { - WaveletId waveletId = c.getWaveletName().waveletId; - try { - if (!c.hasParticipant(user)) { - continue; - } - // Add this wave to the user view. - userView.put(entry.getKey(), waveletId); - } catch (WaveletStateException e) { - LOG.warning("Failed to access wavelet " + c.getWaveletName(), e); - } - } - } - LOG.info("Initalized waves view for user: " + user.getAddress() - + ", number of waves in view: " + userView.size()); - return userView; - } - }); - } - - @Override - public ListenableFuture<Void> onParticipantAdded(WaveletName waveletName, ParticipantId user) { - Multimap<WaveId, WaveletId> perUserView = explicitPerUserWaveViews.getIfPresent(user); - if (perUserView != null) { - if (!perUserView.containsEntry(waveletName.waveId, waveletName.waveletId)) { - perUserView.put(waveletName.waveId, waveletName.waveletId); - if(LOG.isFineLoggable()) { - LOG.fine("Added wavelet: " + waveletName + " to the view of user: " + user.getAddress()); - LOG.fine("View size is now: " + perUserView.size()); - } - } - } - SettableFuture<Void> task = SettableFuture.create(); - task.set(null); - return task; - } - - @Override - public ListenableFuture<Void> onParticipantRemoved(WaveletName waveletName, ParticipantId user) { - Multimap<WaveId, WaveletId> perUserView = explicitPerUserWaveViews.getIfPresent(user); - if (perUserView != null) { - if (perUserView.containsEntry(waveletName.waveId, waveletName.waveletId)) { - perUserView.remove(waveletName.waveId, waveletName.waveletId); - LOG.fine("Removed wavelet: " + waveletName - + " from the view of user: " + user.getAddress()); - } - } - SettableFuture<Void> task = SettableFuture.create(); - task.set(null); - return task; - } - - @Override - public Multimap<WaveId, WaveletId> retrievePerUserWaveView(ParticipantId user) { - try { - return explicitPerUserWaveViews.get(user); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } - - @Override - public ListenableFuture<Void> onWaveInit(WaveletName waveletName) { - // No op. - SettableFuture<Void> task = SettableFuture.create(); - task.set(null); - return task; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryWaveIndexerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryWaveIndexerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryWaveIndexerImpl.java deleted file mode 100644 index c5c6fe4..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/MemoryWaveIndexerImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.waveprotocol.wave.model.id.WaveletName; - - -/** - * Implements the waves view initialization for memory based waves view - * provider. - * - * @author [email protected] (Yuri Zelikov) - */ -@Singleton -public class MemoryWaveIndexerImpl extends AbstractWaveIndexer { - - @Inject - public MemoryWaveIndexerImpl(WaveMap waveMap, WaveletProvider waveletProvider) { - super(waveMap, waveletProvider); - } - - @Override - protected void processWavelet(WaveletName waveletName) { - // No op. - } - - @Override - protected void postIndexHook() { - // No op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/NoOpWaveIndexerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/NoOpWaveIndexerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/NoOpWaveIndexerImpl.java deleted file mode 100644 index 2fdecb5..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/NoOpWaveIndexerImpl.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -/** - * @author [email protected] (Yuri Zelikov) - */ -public class NoOpWaveIndexerImpl implements WaveIndexer { - - @Override - public void remakeIndex() throws WaveletStateException, WaveServerException { - // No op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/NonSigningSignatureHandler.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/NonSigningSignatureHandler.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/NonSigningSignatureHandler.java deleted file mode 100644 index 8ad816e..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/NonSigningSignatureHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; -import com.google.inject.name.Named; - -import org.waveprotocol.box.server.CoreSettingsNames; -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; - -import java.util.Collections; - -/** - * Signature handler that doesn't sign deltas. - */ -public class NonSigningSignatureHandler implements SignatureHandler { - /** - * Guice {@link Provider} for the instance of {@link NonSigningSignatureHandler} - */ - @Singleton - public static class NonSigningSignatureHandlerProvider implements Provider<SignatureHandler> { - private final String waveDomain; - private NonSigningSignatureHandler signer = null; - - @Inject - public NonSigningSignatureHandlerProvider( - @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) String waveDomain) { - this.waveDomain = waveDomain; - } - - @Override - public NonSigningSignatureHandler get() { - synchronized (this) { - if (signer == null) { - signer = new NonSigningSignatureHandler(waveDomain); - } - } - return signer; - } - } - - - private final String domain; - - public NonSigningSignatureHandler(String domain) { - this.domain = domain; - } - - @Override - public String getDomain() { - return domain; - } - - @Override - public SignerInfo getSignerInfo() { - return null; - } - - @Override - public Iterable<ProtocolSignature> sign(ByteStringMessage<ProtocolWaveletDelta> delta) { - return Collections.emptyList(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewBus.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewBus.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewBus.java deleted file mode 100644 index 96ee6e7..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewBus.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.util.concurrent.ListenableFuture; - -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.wave.ParticipantId; - -/** - * Provides a subscription service for changes to wavelets that can cause - * modification of the per user wave view. It is an adapter interface for {@link WaveBus}. - * - * @author [email protected] (Yuri Zelikov) - */ -public interface PerUserWaveViewBus { - - /** - * Receives per user wave view bus messages. - */ - interface Listener { - - /** - * Notifies the subscriber of an user added to wavelet. - * - * @param waveletName the wavelet name. - * @param participant the participant that was added. - * @return the future that allows to be notified of the update completion. - */ - ListenableFuture<Void> onParticipantAdded(WaveletName waveletName, ParticipantId participant); - - /** - * Notifies the subscriber of an user removed from wavelet. - * - * @param waveletName the wavelet name. - * @param participant the participant that was added. - * @return the future that allows to be notified of the update completion. - */ - ListenableFuture<Void> onParticipantRemoved(WaveletName waveletName, ParticipantId participant); - - /** - * Notifies the subscriber of a new wavelet that should be indexed. - * - * @param waveletName the wavelet name. - * @return the future that allows to be notified of the update completion. - */ - ListenableFuture<Void> onWaveInit(WaveletName waveletName); - } - - /** - * Subscribes to the bus, if the subscriber is not already subscribed. - */ - void addListener(Listener listener); - - /** - * Unsubscribes from the bus, if the subscriber is currently subscribed. - */ - void removeListener(Listener listener); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewDistpatcher.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewDistpatcher.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewDistpatcher.java deleted file mode 100644 index 13c0aa2..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewDistpatcher.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.common.DeltaSequence; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.AddParticipant; -import org.waveprotocol.wave.model.operation.wave.RemoveParticipant; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletOperation; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * Forwards wavelet notifications that can cause index changes to subscribers. - * - * @author [email protected] (Yuri Zelikov) - */ -public class PerUserWaveViewDistpatcher implements WaveBus.Subscriber, PerUserWaveViewBus { - private static final Log LOG = Log.get(PerUserWaveViewDistpatcher.class); - - private static final CopyOnWriteArraySet<PerUserWaveViewBus.Listener> listeners = - new CopyOnWriteArraySet<PerUserWaveViewBus.Listener>(); - - @Override - public void waveletUpdate(ReadableWaveletData wavelet, DeltaSequence deltas) { - WaveletId waveletId = wavelet.getWaveletId(); - WaveId waveId = wavelet.getWaveId(); - WaveletName waveletName = WaveletName.of(waveId, waveletId); - if(LOG.isInfoLoggable()) { - LOG.info("Got update for " + waveId + " " + waveletId); - } - - // Find whether participants were added/removed and update the views - // accordingly. - for (TransformedWaveletDelta delta : deltas) { - for (WaveletOperation op : delta) { - if (op instanceof AddParticipant) { - if(LOG.isInfoLoggable()) { - LOG.info("Update contains AddParticipant for " + ((AddParticipant)op).getParticipantId()); - } - - ParticipantId user = ((AddParticipant) op).getParticipantId(); - // Check first if we need to update views for this user. - for (Listener listener : listeners) { - listener.onParticipantAdded(waveletName, user); - } - } else if (op instanceof RemoveParticipant) { - ParticipantId user = ((RemoveParticipant) op).getParticipantId(); - for (Listener listener : listeners) { - listener.onParticipantRemoved(waveletName, user); - } - } - } - } - } - - @Override - public void waveletCommitted(WaveletName waveletName, HashedVersion version) { - // No op. - } - - @Override - public void addListener(Listener listener) { - listeners.add(listener); - } - - @Override - public void removeListener(Listener listener) { - listeners.remove(listener); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewHandler.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewHandler.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewHandler.java deleted file mode 100644 index e13bfed..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.server.waveserver.PerUserWaveViewBus.Listener; - -/** - * Listens on wavelet update events and updates the per user wave view - * accordingly. - * - * Provides unified interface for typical implementations that track/provide the - * state of per user waves view. - * - * @author [email protected] (Yuri Zelikov) - */ -public interface PerUserWaveViewHandler extends Listener, PerUserWaveViewProvider { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewProvider.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewProvider.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewProvider.java deleted file mode 100644 index 5dec5c5..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/PerUserWaveViewProvider.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.collect.Multimap; - -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.wave.ParticipantId; - -/** - * Provides per user wave view. - * - * @author [email protected] (Yuri Zelikov) - */ -public interface PerUserWaveViewProvider { - - /** - * Returns the per user waves view. - */ - Multimap<WaveId, WaveletId> retrievePerUserWaveView(ParticipantId user); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/QueryHelper.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/QueryHelper.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/QueryHelper.java deleted file mode 100644 index c108e5b..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/QueryHelper.java +++ /dev/null @@ -1,351 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; - -import org.waveprotocol.wave.model.id.IdUtil; -import org.waveprotocol.wave.model.wave.InvalidParticipantAddress; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; -import org.waveprotocol.wave.model.wave.data.WaveViewData; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Helper class that allows to add basic sort and filter functionality to the - * search. - * - * @author [email protected] (Yuri Zelikov) - */ -public class QueryHelper { - - @SuppressWarnings("serial") - public static class InvalidQueryException extends Exception { - - public InvalidQueryException(String msg) { - super(msg); - } - } - - /** - * Unknown participantId used by {@link ASC_CREATOR_COMPARATOR} in case wave - * creator cannot be found. - */ - static final ParticipantId UNKNOWN_CREATOR = ParticipantId.ofUnsafe("[email protected]"); - - /** Sorts search result in ascending order by LMT (Last Modified Time). */ - static final Comparator<WaveViewData> ASC_LMT_COMPARATOR = new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - long lmt0 = computeLmt(arg0); - long lmt1 = computeLmt(arg1); - return Long.signum(lmt0 - lmt1); - } - - private long computeLmt(WaveViewData wave) { - long lmt = -1; - for (ObservableWaveletData wavelet : wave.getWavelets()) { - // Skip non conversational wavelets. - if (!IdUtil.isConversationalId(wavelet.getWaveletId())) { - continue; - } - lmt = lmt < wavelet.getLastModifiedTime() ? wavelet.getLastModifiedTime() : lmt; - } - return lmt; - } - }; - - /** Sorts search result in descending order by LMT (Last Modified Time). */ - public static final Comparator<WaveViewData> DESC_LMT_COMPARATOR = - new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - return -ASC_LMT_COMPARATOR.compare(arg0, arg1); - } - }; - - /** Sorts search result in ascending order by creation time. */ - public static final Comparator<WaveViewData> ASC_CREATED_COMPARATOR = - new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - long time0 = computeCreatedTime(arg0); - long time1 = computeCreatedTime(arg1); - return Long.signum(time0 - time1); - } - - private long computeCreatedTime(WaveViewData wave) { - long creationTime = -1; - for (ObservableWaveletData wavelet : wave.getWavelets()) { - creationTime = - creationTime < wavelet.getCreationTime() ? wavelet.getCreationTime() : creationTime; - } - return creationTime; - } - }; - - /** Sorts search result in descending order by creation time. */ - public static final Comparator<WaveViewData> DESC_CREATED_COMPARATOR = - new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - return -ASC_CREATED_COMPARATOR.compare(arg0, arg1); - } - }; - - /** Sorts search result in ascending order by creator */ - public static final Comparator<WaveViewData> ASC_CREATOR_COMPARATOR = - new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - ParticipantId creator0 = computeCreator(arg0); - ParticipantId creator1 = computeCreator(arg1); - return creator0.compareTo(creator1); - } - - private ParticipantId computeCreator(WaveViewData wave) { - for (ObservableWaveletData wavelet : wave.getWavelets()) { - if (IdUtil.isConversationRootWaveletId(wavelet.getWaveletId())) { - return wavelet.getCreator(); - } - } - // If not found creator - compare with UNKNOWN_CREATOR; - return UNKNOWN_CREATOR; - } - }; - - /** Sorts search result in descending order by creator */ - public static final Comparator<WaveViewData> DESC_CREATOR_COMPARATOR = - new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - return -ASC_CREATOR_COMPARATOR.compare(arg0, arg1); - } - }; - - /** Sorts search result by WaveId. */ - public static final Comparator<WaveViewData> ID_COMPARATOR = new Comparator<WaveViewData>() { - @Override - public int compare(WaveViewData arg0, WaveViewData arg1) { - return arg0.getWaveId().compareTo(arg1.getWaveId()); - } - }; - - /** - * Orders using {@link ASCENDING_DATE_COMPARATOR}. - */ - public static final Ordering<WaveViewData> ASC_LMT_ORDERING = Ordering - .from(QueryHelper.ASC_LMT_COMPARATOR); - - /** - * Orders using {@link DESCENDING_DATE_COMPARATOR}. - */ - public static final Ordering<WaveViewData> DESC_LMT_ORDERING = Ordering - .from(QueryHelper.DESC_LMT_COMPARATOR); - - /** - * Orders using {@link ASC_CREATED_COMPARATOR}. - */ - public static final Ordering<WaveViewData> ASC_CREATED_ORDERING = Ordering - .from(QueryHelper.ASC_CREATED_COMPARATOR); - - /** - * Orders using {@link DESC_CREATED_COMPARATOR}. - */ - public static final Ordering<WaveViewData> DESC_CREATED_ORDERING = Ordering - .from(QueryHelper.DESC_CREATED_COMPARATOR); - - /** - * Orders using {@link ASC_CREATOR_COMPARATOR}. - */ - public static final Ordering<WaveViewData> ASC_CREATOR_ORDERING = Ordering - .from(QueryHelper.ASC_CREATOR_COMPARATOR); - - /** - * Orders using {@link DESC_CREATOR_COMPARATOR}. - */ - public static final Ordering<WaveViewData> DESC_CREATOR_ORDERING = Ordering - .from(QueryHelper.DESC_CREATOR_COMPARATOR); - - /** Default ordering is by LMT descending. */ - public static final Ordering<WaveViewData> DEFAULT_ORDERING = DESC_LMT_ORDERING; - - /** Registered order by parameter types and corresponding orderings. */ - public enum OrderByValueType { - DATEASC("dateasc", ASC_LMT_ORDERING), - DATEDESC("datedesc", DESC_LMT_ORDERING), - CREATEDASC("createdasc", ASC_CREATED_ORDERING), - CREATEDDESC("createddesc", DESC_CREATED_ORDERING), - CREATORASC("creatorasc", ASC_CREATOR_ORDERING), - CREATORDESC("creatordesc", DESC_CREATOR_ORDERING); - - final String token; - final Ordering<WaveViewData> ordering; - - OrderByValueType(String value, Ordering<WaveViewData> ordering) { - this.token = value; - this.ordering = ordering; - } - - public String getToken() { - return token; - } - - public Ordering<WaveViewData> getOrdering() { - return ordering; - } - - private static final Map<String, OrderByValueType> reverseLookupMap = - new HashMap<String, OrderByValueType>(); - - static { - for (OrderByValueType type : OrderByValueType.values()) { - reverseLookupMap.put(type.getToken(), type); - } - } - - public static OrderByValueType fromToken(String token) { - OrderByValueType orderByValue = reverseLookupMap.get(token); - if (orderByValue == null) { - throw new IllegalArgumentException("Illegal 'orderby' value: " + token); - } - return reverseLookupMap.get(token); - } - } - - /** - * Builds a list of participants to serve as the filter for the query. - * - * @param queryParams the query params. - * @param queryType the filter for the query , i.e. 'with'. - * @param localDomain the local domain of the logged in user. - * @return the participants list for the filter. - * @throws InvalidParticipantAddress if participant id passed to the query is invalid. - */ - public static List<ParticipantId> buildValidatedParticipantIds( - Map<TokenQueryType, Set<String>> queryParams, - TokenQueryType queryType, String localDomain) throws InvalidParticipantAddress { - Set<String> tokenSet = queryParams.get(queryType); - List<ParticipantId> participants = null; - if (tokenSet != null) { - participants = Lists.newArrayListWithCapacity(tokenSet.size()); - for (String token : tokenSet) { - if (!token.isEmpty() && token.indexOf("@") == -1) { - // If no domain was specified, assume that the participant is from the local domain. - token = token + "@" + localDomain; - } else if (token.equals("@")) { - // "@" is a shortcut for the shared domain participant. - token = "@" + localDomain; - } - ParticipantId otherUser = ParticipantId.of(token); - participants.add(otherUser); - } - } else { - participants = Collections.emptyList(); - } - return participants; - } - - /** - * Computes ordering for the search results. If none are specified - then - * returns the default ordering. The resulting ordering is always compounded - * with ordering by wave id for stability. - */ - public static Ordering<WaveViewData> computeSorter( - Map<TokenQueryType, Set<String>> queryParams) { - Ordering<WaveViewData> ordering = null; - Set<String> orderBySet = queryParams.get(TokenQueryType.ORDERBY); - if (orderBySet != null) { - for (String orderBy : orderBySet) { - QueryHelper.OrderByValueType orderingType = - QueryHelper.OrderByValueType.fromToken(orderBy); - if (ordering == null) { - // Primary ordering. - ordering = orderingType.getOrdering(); - } else { - // All other ordering are compounded to the primary one. - ordering = ordering.compound(orderingType.getOrdering()); - } - } - } else { - ordering = QueryHelper.DEFAULT_ORDERING; - } - // For stability order also by wave id. - ordering = ordering.compound(QueryHelper.ID_COMPARATOR); - return ordering; - } - - /** - * Parses the search query. - * - * @param query the query. - * @return the result map with query tokens. Never returns null. - * @throws InvalidQueryException if the query contains invalid params. - */ - public static Map<TokenQueryType, Set<String>> parseQuery(String query) - throws InvalidQueryException { - Preconditions.checkArgument(query != null); - query = query.trim(); - // If query is empty - return. - if (query.isEmpty()) { - return Collections.emptyMap(); - } - String[] tokens = query.split("\\s+"); - Map<TokenQueryType, Set<String>> tokensMap = Maps.newEnumMap(TokenQueryType.class); - for (String token : tokens) { - String[] pair = token.split(":"); - if (pair.length != 2 || !TokenQueryType.hasToken(pair[0])) { - String msg = "Invalid query param: " + token; - throw new InvalidQueryException(msg); - } - String tokenValue = pair[1]; - TokenQueryType tokenType = TokenQueryType.fromToken(pair[0]); - // Verify the orderby param. - if (tokenType.equals(TokenQueryType.ORDERBY)) { - try { - OrderByValueType.fromToken(tokenValue); - } catch (IllegalArgumentException e) { - String msg = "Invalid orderby query value: " + tokenValue; - throw new InvalidQueryException(msg); - } - } - Set<String> valuesPerToken = tokensMap.get(tokenType); - if (valuesPerToken == null) { - valuesPerToken = Sets.newLinkedHashSet(); - tokensMap.put(tokenType, valuesPerToken); - } - valuesPerToken.add(tokenValue); - } - return tokensMap; - } - - /** Private constructor to prevent instantiation. */ - private QueryHelper() {} -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/ReadableWaveletDataProvider.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/ReadableWaveletDataProvider.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/ReadableWaveletDataProvider.java deleted file mode 100644 index 1f9506f..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/ReadableWaveletDataProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -/** - * Adapter for {@link WaveletProvider} with simpler interface for accessing - * wavelet data. - * - * @author [email protected] (Yuri Zelikov) - */ -public interface ReadableWaveletDataProvider { - - ReadableWaveletData getReadableWaveletData(WaveletName waveletName) throws WaveServerException; -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainer.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainer.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainer.java deleted file mode 100644 index a950bff..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainer.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; - -import org.waveprotocol.wave.federation.FederationException; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.util.List; - -/** - * Remote wavelets differ from local ones in that deltas are not submitted for OT, - * rather they are updated when a remote wave service provider has applied and sent - * a delta. - */ -interface RemoteWaveletContainer extends WaveletContainer { - - /** - * Manufactures remote wavelet containers. - */ - interface Factory extends WaveletContainer.Factory<RemoteWaveletContainer> { } - - /** - * Update the state of the remote wavelet. This acts somewhat like a high - * water mark - if the provided deltas would continue a contiguous block from - * version zero, then they will be immediately transformed and returned to the - * client. If they do not, then an asynchronous callback will be kicked off to - * request the missing deltas. - * - * @param deltas the list of (serialized applied) deltas for the update - * @param domain the listener domain where these deltas were received - * @param federationProvider the provider where missing data may be sourced - * @param certificateManager for verifying signatures and requesting signer info - * @return future which is set after the deltas are applied to the local - * state or a failure occurs. - * Any failure is reported as a {@link FederationException}. - */ - ListenableFuture<Void> update(List<ByteString> deltas, String domain, - WaveletFederationProvider federationProvider, CertificateManager certificateManager); - - /** - * Is called when a commit notice is received from the wavelet host. - */ - void commit(HashedVersion version); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java deleted file mode 100644 index 85ba4f6..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java +++ /dev/null @@ -1,472 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.apache.commons.codec.binary.Base64; - -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.box.server.waveserver.CertificateManager.SignerInfoPrefetchResultListener; -import org.waveprotocol.wave.crypto.SignatureException; -import org.waveprotocol.wave.crypto.UnknownSignerException; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.FederationException; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.federation.WaveletFederationProvider.HistoryResponseListener; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.wave.WaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.ArrayList; -import java.util.List; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Remote wavelets differ from local ones in that deltas are not submitted for OT, - * rather they are updated when a remote wave service provider has applied and sent - * a delta. - */ -class RemoteWaveletContainerImpl extends WaveletContainerImpl implements RemoteWaveletContainer { - private static final Log LOG = Log.get(RemoteWaveletContainerImpl.class); - - /** - * Stores all pending deltas for this wavelet, whose insertions would cause - * discontiguous blocks of deltas. This must only be accessed under writeLock. - */ - private final NavigableMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>> - pendingDeltas = Maps.newTreeMap(); - - /** - * Tracks the highest version commit notice received, which can not be performed - * due to not yet having the required deltas. This must only be access under writeLock. - */ - private boolean pendingCommit = false; - private HashedVersion pendingCommitVersion; - - /** - * Create a new RemoteWaveletContainerImpl. Just pass through to the parent - * constructor. - */ - public RemoteWaveletContainerImpl(WaveletName waveletName, WaveletNotificationSubscriber notifiee, - ListenableFuture<? extends WaveletState> waveletStateFuture, - Executor storageContinuationExecutor) { - // We pass here null for waveDomain because you have to be explicit - // participant on remote wavelet to have access permission. - super(waveletName, notifiee, waveletStateFuture, null, storageContinuationExecutor); - } - - @Override - public ListenableFuture<Void> update(final List<ByteString> deltas, - final String domain, final WaveletFederationProvider federationProvider, - final CertificateManager certificateManager) { - SettableFuture<Void> futureResult = SettableFuture.create(); - internalUpdate(deltas, domain, federationProvider, certificateManager, futureResult); - return futureResult; - } - - @Override - public void commit(HashedVersion version) { - try { - awaitLoad(); - } - catch(WaveletStateException ex) { - LOG.warning("Failed to load " + getWaveletName() + " to perform commit.", ex); - acquireWriteLock(); - markStateCorrupted(); - releaseWriteLock(); - return; - } - - acquireWriteLock(); - try { - attemptCommit(version); - } finally { - releaseWriteLock(); - } - } - - /** - * Attempts to commit at the given version. - * This will only succeed if we are actually up to date. - * If not, then the history is assumed to be coming, and so we can just skip the whole task. - * */ - private void attemptCommit(HashedVersion version) { - HashedVersion expectedVersion = getCurrentVersion(); - if(expectedVersion == null || version.getVersion() == expectedVersion.getVersion()) { - LOG.info("Committed " + getWaveletName() + " at version " + version.getVersion()); - persist(version, ImmutableSet.<String>of()); - if(pendingCommitVersion == null || (version.getVersion() >= pendingCommitVersion.getVersion())) { - pendingCommit = false; - } - } else { - LOG.info("Ignoring commit request at " + version.getVersion() + - " since only at " + expectedVersion.getVersion()); - if(pendingCommitVersion == null || - (pendingCommitVersion != null && pendingCommitVersion.getVersion() < version.getVersion())) { - pendingCommitVersion = version; - } - LOG.info("pendingCommitVersion is now " + pendingCommitVersion.getVersion()); - pendingCommit = true; - } - } - - private void internalUpdate(final List<ByteString> deltas, - final String domain, final WaveletFederationProvider federationProvider, - final CertificateManager certificateManager, final SettableFuture<Void> futureResult) { - // Turn raw serialised ByteStrings in to a more useful representation - final List<ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas = Lists.newArrayList(); - for (ByteString delta : deltas) { - try { - appliedDeltas.add(ByteStringMessage.parseProtocolAppliedWaveletDelta(delta)); - } catch (InvalidProtocolBufferException e) { - LOG.info("Invalid applied delta protobuf for incoming " + getWaveletName(), e); - acquireWriteLock(); - try { - markStateCorrupted(); - } finally { - releaseWriteLock(); - } - futureResult.setException(new FederationException( - FederationErrors.badRequest("Invalid applied delta protocol buffer"))); - return; - } - } - LOG.info("Got update: " + appliedDeltas); - - // Fetch any signer info that we don't already have and then run internalUpdate - final AtomicInteger numSignerInfoPrefetched = new AtomicInteger(1); // extra 1 for sentinel - final Runnable countDown = new Runnable() { - @Override - public void run() { - if (numSignerInfoPrefetched.decrementAndGet() == 0) { - internalUpdateAfterSignerInfoRetrieval( - appliedDeltas, domain, federationProvider, certificateManager, futureResult); - } - } - }; - SignerInfoPrefetchResultListener prefetchListener = new SignerInfoPrefetchResultListener() { - @Override - public void onFailure(FederationError error) { - LOG.warning("Signer info prefetch failed: " + error); - countDown.run(); - } - - @Override - public void onSuccess(ProtocolSignerInfo signerInfo) { - LOG.info("Signer info prefetch success for " + signerInfo.getDomain()); - countDown.run(); - } - }; - for (ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta : appliedDeltas) { - ProtocolSignedDelta toVerify = appliedDelta.getMessage().getSignedOriginalDelta(); - HashedVersion deltaEndVersion; - try { - deltaEndVersion = AppliedDeltaUtil.calculateResultingHashedVersion(appliedDelta); - } catch (InvalidProtocolBufferException e) { - LOG.warning("Skipping illformed applied delta " + appliedDelta, e); - continue; - } - for (ProtocolSignature sig : toVerify.getSignatureList()) { - if (certificateManager.retrieveSignerInfo(sig.getSignerId()) == null) { - LOG.info("Fetching signer info " + Base64.encodeBase64(sig.getSignerId().toByteArray())); - numSignerInfoPrefetched.incrementAndGet(); - certificateManager.prefetchDeltaSignerInfo(federationProvider, sig.getSignerId(), - getWaveletName(), deltaEndVersion, prefetchListener); - } - } - } - // If we didn't fetch any signer info, run internalUpdate immediately - countDown.run(); - } - - private void internalUpdateAfterSignerInfoRetrieval( - List<ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas, - final String domain, final WaveletFederationProvider federationProvider, - final CertificateManager certificateManager, final SettableFuture<Void> futureResult) { - - try { - awaitLoad(); - } - catch(WaveletStateException ex) { - LOG.warning("Failed to load " + getWaveletName() + " to perform update.", ex); - acquireWriteLock(); - markStateCorrupted(); - releaseWriteLock(); - return; - } - - LOG.info("Passed signer info check, now applying all " + appliedDeltas.size() + " deltas"); - acquireWriteLock(); - try { - checkStateOk(); // TODO(soren): if CORRUPTED, throw away wavelet and start again - HashedVersion expectedVersion = getCurrentVersion(); - boolean haveRequestedHistory = false; - - // Verify signatures of all deltas - for (ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta : appliedDeltas) { - try { - certificateManager.verifyDelta(appliedDelta.getMessage().getSignedOriginalDelta()); - } catch (SignatureException e) { - LOG.warning("Verification failure for " + domain + " incoming " + getWaveletName(), e); - throw new WaveServerException("Verification failure", e); - } catch (UnknownSignerException e) { - LOG.severe("Unknown signer for " + domain + " incoming " + getWaveletName() + - ", this is BAD! We were supposed to have prefetched it!", e); - throw new WaveServerException("Unknown signer", e); - } - } - - // Insert all available deltas into pendingDeltas. - for (ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta : appliedDeltas) { - LOG.info("Delta incoming: " + appliedDelta); - - // Log any illformed signed original deltas. TODO: Check if this can be removed. - try { - ProtocolWaveletDelta actualDelta = ProtocolWaveletDelta.parseFrom( - appliedDelta.getMessage().getSignedOriginalDelta().getDelta()); - LOG.info("actual delta: " + actualDelta); - } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); - } - - HashedVersion appliedAt; - try { - appliedAt = AppliedDeltaUtil.getHashedVersionAppliedAt(appliedDelta); - } catch (InvalidProtocolBufferException e) { - markStateCorrupted(); - throw new WaveServerException( - "Authoritative server sent delta with badly formed original wavelet delta", e); - } - - pendingDeltas.put(appliedAt, appliedDelta); - } - - // Traverse pendingDeltas while we have any to process. - ImmutableList.Builder<WaveletDeltaRecord> resultingDeltas = ImmutableList.builder(); - while (pendingDeltas.size() > 0) { - Map.Entry<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>> first = - pendingDeltas.firstEntry(); - HashedVersion appliedAt = first.getKey(); - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = first.getValue(); - - if(LOG.isInfoLoggable()) { - LOG.info("pendingDeltas.size(): " + Integer.toString(pendingDeltas.size())); - LOG.info("current appliedAt: " + appliedAt.getVersion() + " expected: " + expectedVersion.getVersion()); - } - - // If we don't have the right version it implies there is a history we need, so set up a - // callback to request it and fall out of this update - if (appliedAt.getVersion() > expectedVersion.getVersion()) { - LOG.info("Missing history from " + expectedVersion.getVersion() + "-" - + appliedAt.getVersion() + ", requesting from upstream for " + getWaveletName()); - - if (federationProvider != null) { - // TODO: only one request history should be pending at any one time? - // We should derive a new one whenever the active one is finished, - // based on the current state of pendingDeltas. - federationProvider.requestHistory(getWaveletName(), domain, - CoreWaveletOperationSerializer.serialize(expectedVersion), - CoreWaveletOperationSerializer.serialize(appliedAt), - -1, - new HistoryResponseListener() { - @Override - public void onFailure(FederationError error) { - LOG.severe("Callback failure: " + error); - } - - @Override - public void onSuccess(List<ByteString> deltaList, - ProtocolHashedVersion lastCommittedVersion, long versionTruncatedAt) { - LOG.info("Got response callback: " + getWaveletName() + ", lcv " - + lastCommittedVersion + " deltaList length = " + deltaList.size()); - - // Try updating again with the new history - internalUpdate(deltaList, domain, federationProvider, certificateManager, - futureResult); - } - }); - haveRequestedHistory = true; - } else { - LOG.severe("History request resulted in non-contiguous deltas!"); - } - break; - } - - // This delta is at the correct (current) version - apply it. - if (appliedAt.getVersion() == expectedVersion.getVersion()) { - // Confirm that the applied at hash matches the expected hash. - if (!appliedAt.equals(expectedVersion)) { - markStateCorrupted(); - throw new WaveServerException("Incoming delta applied at version " - + appliedAt.getVersion() + " is not applied to the correct hash"); - } - - LOG.info("Applying delta for version " + appliedAt.getVersion()); - try { - WaveletDeltaRecord applicationResult = transformAndApplyRemoteDelta(appliedDelta); - long opsApplied = applicationResult.getResultingVersion().getVersion() - - expectedVersion.getVersion(); - if (opsApplied != appliedDelta.getMessage().getOperationsApplied()) { - throw new OperationException("Operations applied here do not match the authoritative" - + " server claim (got " + opsApplied + ", expected " - + appliedDelta.getMessage().getOperationsApplied() + "."); - } - // Add transformed result to return list. - resultingDeltas.add(applicationResult); - LOG.fine("Applied delta: " + appliedDelta); - } catch (OperationException e) { - markStateCorrupted(); - throw new WaveServerException("Couldn't apply authoritative delta", e); - } catch (InvalidProtocolBufferException e) { - markStateCorrupted(); - throw new WaveServerException("Couldn't apply authoritative delta", e); - } catch (InvalidHashException e) { - markStateCorrupted(); - throw new WaveServerException("Couldn't apply authoritative delta", e); - } - - // TODO: does waveletData update? - expectedVersion = getCurrentVersion(); - } else { - LOG.warning("Got delta from the past: " + appliedDelta); - } - - pendingDeltas.remove(appliedAt); - } - - commitAndNotifyResultingDeltas(resultingDeltas, futureResult); - - } catch (WaveServerException e) { - LOG.warning("Update failure", e); - // TODO(soren): make everyone throw FederationException instead - // of WaveServerException so we don't have to translate between them here - futureResult.setException( - new FederationException(FederationErrors.badRequest(e.getMessage()))); - } finally { - releaseWriteLock(); - } - } - - /** - * Commits the resulting deltas, notifying the server of them. - * Assumes that everything in resultingDeltas is now in-order, since - * even if the original stream was non-contiguous, we have requestedHistory. - * Even if not, it is still safe to commit up to the fragmented point. - */ - private void commitAndNotifyResultingDeltas( - ImmutableList.Builder<WaveletDeltaRecord> resultingDeltas, - final SettableFuture<Void> futureResult) { - if(!resultingDeltas.build().isEmpty()) { - notifyOfDeltas(resultingDeltas.build(), ImmutableSet.<String>of()); - futureResult.set(null); - - //Attempt to run any pending commit - if(pendingCommit) { - releaseWriteLock(); - commit(pendingCommitVersion); - acquireWriteLock(); - } - } else { - LOG.info("No deltas in list (fetching history?), ignoring callback"); - } - } - - /** - * Apply a serialised applied delta to a remote wavelet. This assumes the - * caller has validated that the delta is at the correct version and can be - * applied to the wavelet. Must be called with writelock held. - * - * @param appliedDelta that is to be applied to the wavelet in its serialised form - * @return the transformed and applied delta. - * @throws AccessControlException if the supplied Delta's historyHash does not - * match the canonical history. - * @throws WaveServerException if the delta transforms away. - */ - private WaveletDeltaRecord transformAndApplyRemoteDelta( - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta) throws OperationException, - AccessControlException, InvalidHashException, InvalidProtocolBufferException, - WaveServerException { - // The serialised hashed version should actually match the currentVersion at this point, since - // the caller of transformAndApply delta will have made sure the applied deltas are ordered - HashedVersion hashedVersion = AppliedDeltaUtil.getHashedVersionAppliedAt(appliedDelta); - Preconditions.checkState(hashedVersion.equals(getCurrentVersion()), - "Applied delta must apply to current version"); - - // Extract the serialised wavelet delta - ByteStringMessage<ProtocolWaveletDelta> protocolDelta = - ByteStringMessage.parseProtocolWaveletDelta( - appliedDelta.getMessage().getSignedOriginalDelta().getDelta()); - WaveletDelta delta = CoreWaveletOperationSerializer.deserialize(protocolDelta.getMessage()); - - // Transform operations against earlier deltas, if necessary - WaveletDelta transformed = maybeTransformSubmittedDelta(delta); - if (transformed.getTargetVersion().equals(delta.getTargetVersion())) { - // No transformation took place. - // As a sanity check, the hash from the applied delta should NOT be set (an optimisation, but - // part of the protocol). - if (appliedDelta.getMessage().hasHashedVersionAppliedAt()) { - LOG.warning("Hashes are the same but applied delta has hashed_version_applied_at"); - // TODO: re-enable this exception for version 0.3 of the spec -// throw new InvalidHashException("Applied delta and its contained delta have same hash"); - } - } - - if (transformed.size() == 0) { - // The host shouldn't be forwarding empty deltas! - markStateCorrupted(); - throw new WaveServerException("Couldn't apply authoritative delta, " + - "it transformed away at version " + transformed.getTargetVersion().getVersion()); - } - - if (!transformed.getTargetVersion().equals(hashedVersion)) { - markStateCorrupted(); - throw new WaveServerException("Couldn't apply authoritative delta, " + - "it transformed to wrong version. Expected " + hashedVersion + - ", actual " + transformed.getTargetVersion().getVersion()); - } - - // Apply the delta to the local wavelet state. - // This shouldn't fail since the delta is from the authoritative server, so if it fails - // then the wavelet is corrupted (and the caller of this method will sort it out). - return applyDelta(appliedDelta, transformed); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/RuntimeIOException.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RuntimeIOException.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/RuntimeIOException.java deleted file mode 100755 index 2c85326..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/RuntimeIOException.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import java.io.IOException; - -/** - * Wraps an {@link IOException} in a {@link RuntimeException}. - * - * @author [email protected] (Yuri Zelikov) - */ -@SuppressWarnings("serial") -public class RuntimeIOException extends RuntimeException { - private final IOException cause; - - public RuntimeIOException(IOException cause) { - super(cause); - this.cause = cause; - } - - public IOException getIOException() { - return cause; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/SearchProvider.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/SearchProvider.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/SearchProvider.java deleted file mode 100644 index 55baf42..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/SearchProvider.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.wave.api.SearchResult; - -import org.waveprotocol.wave.model.wave.ParticipantId; - - -/** - * A provider of search results. SearchProviders can be queried, and reply with a set of - * ReadableWaveletData objects which match the query. - * - * @author [email protected] (Joseph Gentle) - */ -public interface SearchProvider { - /** - * Run a search query. - * - * @param user the user executing the query - * @param query the query string - * @param startAt The offset in the results to return - * @param numResults The number of results from startAt to return - * @return the search result with digests which match the specified query - */ - SearchResult search( - ParticipantId user, String query, int startAt, int numResults); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/waveserver/SignatureHandler.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/SignatureHandler.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/SignatureHandler.java deleted file mode 100644 index 9b4fa12..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/SignatureHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; - -/** - * Instances of this class provide signature-related information and decide how - * deltas should be signed. - */ -public interface SignatureHandler { - interface Factory { - SignatureHandler getInstance(); - } - - /** - * @return the domain of this signer. - */ - String getDomain(); - - /** - * Returns the signer info associated with this signer or null if none is - * available. - */ - SignerInfo getSignerInfo(); - - /** - * Returns a list of the appropriate signatures for the specified delta. - */ - Iterable<ProtocolSignature> sign(ByteStringMessage<ProtocolWaveletDelta> delta); - -}
