Repository: incubator-wave Updated Branches: refs/heads/feature/add-shutdown-manager [created] cb09a53a5
Adds Shutdown Manager. Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/cb09a53a Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/cb09a53a Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/cb09a53a Branch: refs/heads/feature/add-shutdown-manager Commit: cb09a53a56c6fedfd861b01056495b6d9922e6d9 Parents: 94991bf Author: Yuri Zelikov <[email protected]> Authored: Thu Apr 7 19:17:25 2016 +0300 Committer: yuri.zelikov <[email protected]> Committed: Thu Apr 7 19:17:25 2016 +0300 ---------------------------------------------------------------------- .../org/waveprotocol/box/server/ServerMain.java | 14 +++ .../server/executor/RequestScopeExecutor.java | 13 ++- .../executor/ScheduledRequestScopeExecutor.java | 7 +- .../box/server/shutdown/LifeCycle.java | 113 +++++++++++++++++++ .../box/server/shutdown/ShutdownManager.java | 108 ++++++++++++++++++ .../box/server/shutdown/ShutdownPriority.java | 34 ++++++ .../box/server/shutdown/Shutdownable.java | 28 +++++ .../box/server/waveserver/Wave.java | 13 +++ .../box/server/waveserver/WaveMap.java | 61 ++++++++-- .../box/server/waveserver/WaveletContainer.java | 4 + .../server/waveserver/WaveletContainerImpl.java | 51 ++++++++- 11 files changed, 433 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java index bfa3b92..14e2491 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java +++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java @@ -51,6 +51,9 @@ import org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet; import org.waveprotocol.box.server.robots.dataapi.DataApiServlet; import org.waveprotocol.box.server.robots.passive.RobotsGateway; import org.waveprotocol.box.server.rpc.*; +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.server.stat.RequestScopeFilter; import org.waveprotocol.box.server.stat.StatuszServlet; import org.waveprotocol.box.server.stat.TimingFilter; @@ -167,6 +170,7 @@ public class ServerMain { initializeFrontend(injector, server, waveBus); initializeFederation(injector); initializeSearch(injector, waveBus); + initializeShutdownHandler(server); LOG.info("Starting server"); server.startWebSocketServer(injector); @@ -284,4 +288,14 @@ public class ServerMain { WaveIndexer waveIndexer = injector.getInstance(WaveIndexer.class); waveIndexer.remakeIndex(); } + + private static void initializeShutdownHandler(final ServerRpcProvider server) { + ShutdownManager.getInstance().register(new Shutdownable() { + + @Override + public void shutdown() throws Exception { + server.stopServer(); + } + }, ServerMain.class.getSimpleName(), ShutdownPriority.Server); + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java index 40c2039..845b0df 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java @@ -25,6 +25,9 @@ import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.stat.RequestScope; import org.waveprotocol.box.stat.Timing; import org.waveprotocol.wave.model.util.Preconditions; @@ -38,7 +41,7 @@ import org.waveprotocol.wave.model.util.Preconditions; * @author [email protected] (A. Kaplanov) */ @SuppressWarnings("rawtypes") -public class RequestScopeExecutor implements Executor { +public class RequestScopeExecutor implements Executor, Shutdownable { private final static Logger LOG = Logger.getLogger(RequestScopeExecutor.class.getName()); private ExecutorService executor; @@ -50,6 +53,7 @@ public class RequestScopeExecutor implements Executor { public void setExecutor(ExecutorService executor, String name) { Preconditions.checkArgument(this.executor == null, "Executor is already defined."); this.executor = executor; + ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task); } @Override @@ -75,4 +79,11 @@ public class RequestScopeExecutor implements Executor { } }); } + + @Override + public void shutdown() throws Exception { + if (executor != null) { + executor.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java index f841667..4de3bd8 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java @@ -32,6 +32,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.stat.RequestScope; import org.waveprotocol.box.stat.Timing; import org.waveprotocol.wave.model.util.Preconditions; @@ -44,8 +47,7 @@ import org.waveprotocol.wave.model.util.Preconditions; * @author [email protected] (A. Kaplanov) */ @SuppressWarnings("rawtypes") -public class ScheduledRequestScopeExecutor implements ScheduledExecutorService { - private final static Logger LOG = Logger.getLogger(ScheduledRequestScopeExecutor.class.getName()); +public class ScheduledRequestScopeExecutor implements ScheduledExecutorService, Shutdownable { private ScheduledExecutorService executor; @@ -59,6 +61,7 @@ public class ScheduledRequestScopeExecutor implements ScheduledExecutorService { public void setExecutor(ScheduledExecutorService executor, String name) { Preconditions.checkArgument(this.executor == null, "Executor is already defined."); this.executor = executor; + ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java new file mode 100644 index 0000000..7244cfa --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.shutdown; + +import org.waveprotocol.wave.model.util.Preconditions; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Life cycle control. + * + * @author [email protected] (A. Kaplanov) + */ +public class LifeCycle { + + private static int SHUTDOWN_TIMOUT_SEC = 10; + + private final String name; + private final ShutdownPriority shutdownPriority; + private final Shutdownable shutdownHandler; + private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + private boolean started; + + /** + * Creates lifecycle. + * + * @param name the name of task. + * @param shutdownPriority determines shutdown order. + */ + public LifeCycle(String name, ShutdownPriority shutdownPriority) { + this(name, shutdownPriority, null); + } + + /** + * Creates lifecycle. + * + * @param name the name of task. + * @param shutdownPriority determines shutdown order. + * @param shutdownHandler the handler executed on shutdown. + */ + public LifeCycle(String name, ShutdownPriority shutdownPriority, Shutdownable shutdownHandler) { + this.name = name; + this.shutdownPriority = shutdownPriority; + this.shutdownHandler = shutdownHandler; + } + + /** + * Starts lifecycle. + */ + public synchronized void start() { + Preconditions.checkArgument(!started, name + " is already started."); + started = true; + ShutdownManager.getInstance().register(new Shutdownable() { + + @Override + public void shutdown() throws Exception { + synchronized (LifeCycle.this) { + if (shutdownHandler != null) { + shutdownHandler.shutdown(); + } + if (!semaphore.tryAcquire(Integer.MAX_VALUE, SHUTDOWN_TIMOUT_SEC, TimeUnit.SECONDS)) { + throw new TimeoutException(); + } + started = false; + } + } + }, name, shutdownPriority); + } + + /** + * Enters to execution block of task. + */ + public synchronized void enter() { + checkIsStarted(); + try { + semaphore.acquire(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Leaves execution block of task. + */ + public synchronized void leave() { + semaphore.release(); + } + + private void checkIsStarted() { + if (!started) { + throw new IllegalStateException(name + " is not started"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java new file mode 100644 index 0000000..d3a3218 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +import org.waveprotocol.wave.util.logging.Log; + +import java.util.HashSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Registers and executed by specified priority shutdown tasks. + * + * @author [email protected] (A. Kaplanov) + */ +public class ShutdownManager extends Thread { + + interface NamedShutdownable extends Shutdownable { + String getName(); + } + + private static final Log LOG = Log.get(ShutdownManager.class); + private static ShutdownManager instance; + + private final SortedMap<ShutdownPriority, Set<NamedShutdownable>> tasks = new TreeMap<>(); + + private ShutdownManager() { + super(ShutdownManager.class.getSimpleName()); + } + + public static synchronized ShutdownManager getInstance() { + if (instance == null) { + instance = new ShutdownManager(); + } + return instance; + } + + /** + * Requsters shutdown task. + * + * @param shutdownHandler the handler to execute on shutdown. + * @param taskName the name of task. + * @param priority the priority determines shutdown order. + */ + public synchronized void register(final Shutdownable shutdownHandler, final String taskName, ShutdownPriority priority) { + if (tasks.isEmpty()) { + Runtime.getRuntime().addShutdownHook(this); + } + Set<NamedShutdownable> priorityTasks = tasks.get(priority); + if (priorityTasks == null) { + tasks.put(priority, priorityTasks = new HashSet<>()); + } + priorityTasks.add(new NamedShutdownable() { + + @Override + public String getName() { + return taskName; + } + + @Override + public void shutdown() throws Exception { + shutdownHandler.shutdown(); + } + }); + } + + /** + * Executes on Java shutdown hook. + */ + @Override + public void run() { + LOG.info("Shutdown hook is fired."); + shutdown(); + } + + private synchronized void shutdown() { + LOG.info("Start of shutdown procedure."); + for (ShutdownPriority priority : tasks.keySet()) { + LOG.info("Shutdown priority class " + priority.name()); + for (NamedShutdownable task : tasks.get(priority)) { + LOG.info("Shutdown of " + task.getName() + " ..."); + try { + task.shutdown(); + } catch (Exception ex) { + LOG.severe("Shutdown of " + task.getName() + " error", ex); + } + } + } + LOG.info("End of shutdown procedure."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java new file mode 100644 index 0000000..d6c4b76 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +/** + * Priority determines shutdown order. + * + * @author [email protected] (A. Kaplanov) + */ + +public enum ShutdownPriority { + Server(1), Waves(3), Task(2), Storage(3); + final int value; + + ShutdownPriority(int priority) { + this.value = priority; + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java new file mode 100644 index 0000000..2814fd1 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +/** + * Shutdown handler. + * + * @author [email protected] (A. Kaplanov) + */ +public interface Shutdownable { + void shutdown() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java index 815f814..2dd5a36 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java @@ -26,15 +26,18 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.waveprotocol.box.server.persistence.PersistenceException; import org.waveprotocol.wave.model.id.WaveId; import org.waveprotocol.wave.model.id.WaveletId; import org.waveprotocol.wave.model.id.WaveletName; +import org.waveprotocol.wave.model.util.CollectionUtils; import org.waveprotocol.wave.util.logging.Log; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ExecutionException; /** @@ -165,4 +168,14 @@ final class Wave implements Iterable<WaveletContainer> { ListenableFuture<ImmutableSet<WaveletId>> getLookedupWavelets() { return lookedupWavelets; } + + synchronized ListenableFuture close() { + List<ListenableFuture<Void>> futures = CollectionUtils.newLinkedList(); + Iterator<WaveletContainer> it = Iterators.unmodifiableIterator( + Iterables.concat(localWavelets.asMap().values(), remoteWavelets.asMap().values()).iterator()); + while (it.hasNext()) { + futures.add(it.next().close()); + } + return Futures.<Void>successfulAsList(futures); + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java index b2fa769..ef5d8dc 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java @@ -19,9 +19,7 @@ package org.waveprotocol.box.server.waveserver; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.google.common.cache.*; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; @@ -32,15 +30,17 @@ import org.waveprotocol.box.common.ExceptionalIterator; import org.waveprotocol.box.server.CoreSettingsNames; import org.waveprotocol.box.server.executor.ExecutorAnnotations.LookupExecutor; import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.box.server.shutdown.LifeCycle; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.wave.model.id.WaveId; import org.waveprotocol.wave.model.id.WaveletId; import org.waveprotocol.wave.model.id.WaveletName; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.*; +import java.util.logging.Logger; /** * A collection of wavelets, local and remote, held in memory. @@ -49,6 +49,11 @@ import java.util.concurrent.Executor; */ public class WaveMap { + private static final Logger LOG = Logger.getLogger(WaveMap.class.getName()); + + private static final int WAVE_CACHE_SIZE = 1000; // TODO (yurize): Make configurable. + private static final int WAVE_CACHE_EXPIRE = 60; + /** * Returns a future whose result is the ids of stored wavelets in the given wave. * Any failure is reported as a {@link PersistenceException}. @@ -68,8 +73,24 @@ public class WaveMap { } private final LoadingCache<WaveId, Wave> waves; + private final Map<WaveId, ListenableFuture> closingWaves = new ConcurrentHashMap<>(); private final WaveletStore<?> store; + final private LifeCycle lifeCycle = new LifeCycle(WaveMap.class.getSimpleName(), ShutdownPriority.Waves, + new Shutdownable() { + @Override + public void shutdown() { + waves.invalidateAll(); + for (Map.Entry<WaveId, ListenableFuture> entry : closingWaves.entrySet()) { + try { + entry.getValue().get(1, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOG.warning("Expect the closing of " + entry.getKey().toString()); + } + } + } + }); + @Inject public WaveMap(final DeltaAndSnapshotStore waveletStore, final WaveletNotificationSubscriber notifiee, @@ -77,10 +98,32 @@ public class WaveMap { final RemoteWaveletContainer.Factory remoteFactory, @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) final String waveDomain, @LookupExecutor final Executor lookupExecutor) { - // NOTE(anorth): DeltaAndSnapshotStore is more specific than necessary, but - // helps Guice out. + this.store = waveletStore; - waves = CacheBuilder.newBuilder().build(new CacheLoader<WaveId, Wave>() { + waves = CacheBuilder.newBuilder() + .maximumSize(WAVE_CACHE_SIZE) + .expireAfterAccess(WAVE_CACHE_EXPIRE, TimeUnit.MINUTES) + .removalListener(new RemovalListener<WaveId, Wave>() { + @Override + public void onRemoval(final RemovalNotification<WaveId, Wave> rn) { + LOG.info("Wave " + rn.getKey() + " is evicted, current cache size " + waves.size()); + final ListenableFuture future = rn.getValue().close(); + closingWaves.put(rn.getKey(), future); + future.addListener(new Runnable() { + + @Override + public void run() { + try { + future.get(); + } catch (InterruptedException | ExecutionException ex) { + LOG.log(Level.WARNING, "Closing wave exception", ex); + } + closingWaves.remove(rn.getKey()); + } + }, MoreExecutors.sameThreadExecutor()); + } + } + .build(new CacheLoader<WaveId, Wave>() { @Override public Wave load(WaveId waveId) { ListenableFuture<ImmutableSet<WaveletId>> lookedupWavelets = http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java index 7cf0763..f6073cb 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java @@ -19,6 +19,7 @@ package org.waveprotocol.box.server.waveserver; +import com.google.common.util.concurrent.ListenableFuture; import org.waveprotocol.box.common.Receiver; import org.waveprotocol.box.server.frontend.CommittedWaveletSnapshot; import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; @@ -49,6 +50,9 @@ interface WaveletContainer { T create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, String waveDomain); } + /** Start closing resources */ + ListenableFuture close(); + /** Returns the name of the wavelet. */ WaveletName getWaveletName(); http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java index fe08fe0..62c7d4e 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java @@ -19,6 +19,7 @@ package org.waveprotocol.box.server.waveserver; +import com.google.common.util.concurrent.SettableFuture; import org.waveprotocol.box.common.Receiver; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -79,6 +80,15 @@ abstract class WaveletContainerImpl implements WaveletContainer { /** Wavelet state is being loaded from storage. */ LOADING, + /** Wavelet state is being indexing. */ + INDEXING, + + /** Wavelet state is being closing. */ + CLOSING, + + /** Wavelet has been closed. */ + CLOSED, + /** Wavelet has been deleted, the instance will not contain any data. */ DELETED, @@ -89,6 +99,7 @@ abstract class WaveletContainerImpl implements WaveletContainer { CORRUPTED } + private final Executor storageContinuationExecutor; private final Lock readLock; @@ -96,6 +107,7 @@ abstract class WaveletContainerImpl implements WaveletContainer { private final WaveletName waveletName; private final WaveletNotificationSubscriber notifiee; private final ParticipantId sharedDomainParticipantId; + /** Is counted down when initial loading from storage completes. */ private final CountDownLatch loadLatch = new CountDownLatch(1); /** Is set at most once, before loadLatch is counted down. */ @@ -107,7 +119,7 @@ abstract class WaveletContainerImpl implements WaveletContainer { * WaveletData is not set until a delta has been applied. * * @param notifiee the subscriber to notify of wavelet updates and commits. - * @param waveletState the wavelet's delta history and current state. + * @param waveletStateFuture the wavelet's delta history and current state. * @param waveDomain the wave server domain. * @param storageContinuationExecutor the executor used to perform post wavelet loading logic. */ @@ -561,4 +573,41 @@ abstract class WaveletContainerImpl implements WaveletContainer { protected ReadableWaveletData accessSnapshot() { return waveletState.getSnapshot(); } + + @Override + public ListenableFuture close() { + state = State.CLOSING; + final SettableFuture<Void> future = SettableFuture.create(); + final ListenableFuture deltaCloseFuture = deltaStateAccessor.close(); + deltaCloseFuture.addListener(new Runnable() { + + @Override + public void run() { + boolean consistent = true; + try { + deltaCloseFuture.get(); + } catch (InterruptedException | ExecutionException ex) { + LOG.severe("Exception of closing delta store", ex); + consistent = false; + } + if (segmentWaveletState != null) { + if (consistent) { + segmentWaveletState.markAsConsistent(); + } + segmentWaveletState.close().addListener(new Runnable() { + + @Override + public void run() { + state = State.CLOSED; + future.set(null); + } + }, MoreExecutors.sameThreadExecutor()); + } else { + state = State.CLOSED; + future.set(null); + } + } + }, storageContinuationExecutor); + return future; + } }
