Repository: incubator-wave
Updated Branches:
  refs/heads/feature/add-shutdown-manager [created] cb09a53a5


Adds Shutdown Manager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/cb09a53a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/cb09a53a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/cb09a53a

Branch: refs/heads/feature/add-shutdown-manager
Commit: cb09a53a56c6fedfd861b01056495b6d9922e6d9
Parents: 94991bf
Author: Yuri Zelikov <[email protected]>
Authored: Thu Apr 7 19:17:25 2016 +0300
Committer: yuri.zelikov <[email protected]>
Committed: Thu Apr 7 19:17:25 2016 +0300

----------------------------------------------------------------------
 .../org/waveprotocol/box/server/ServerMain.java |  14 +++
 .../server/executor/RequestScopeExecutor.java   |  13 ++-
 .../executor/ScheduledRequestScopeExecutor.java |   7 +-
 .../box/server/shutdown/LifeCycle.java          | 113 +++++++++++++++++++
 .../box/server/shutdown/ShutdownManager.java    | 108 ++++++++++++++++++
 .../box/server/shutdown/ShutdownPriority.java   |  34 ++++++
 .../box/server/shutdown/Shutdownable.java       |  28 +++++
 .../box/server/waveserver/Wave.java             |  13 +++
 .../box/server/waveserver/WaveMap.java          |  61 ++++++++--
 .../box/server/waveserver/WaveletContainer.java |   4 +
 .../server/waveserver/WaveletContainerImpl.java |  51 ++++++++-
 11 files changed, 433 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java 
b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
index bfa3b92..14e2491 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
@@ -51,6 +51,9 @@ import 
org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet;
 import org.waveprotocol.box.server.robots.dataapi.DataApiServlet;
 import org.waveprotocol.box.server.robots.passive.RobotsGateway;
 import org.waveprotocol.box.server.rpc.*;
+import org.waveprotocol.box.server.shutdown.ShutdownManager;
+import org.waveprotocol.box.server.shutdown.ShutdownPriority;
+import org.waveprotocol.box.server.shutdown.Shutdownable;
 import org.waveprotocol.box.server.stat.RequestScopeFilter;
 import org.waveprotocol.box.server.stat.StatuszServlet;
 import org.waveprotocol.box.server.stat.TimingFilter;
@@ -167,6 +170,7 @@ public class ServerMain {
     initializeFrontend(injector, server, waveBus);
     initializeFederation(injector);
     initializeSearch(injector, waveBus);
+    initializeShutdownHandler(server);
 
     LOG.info("Starting server");
     server.startWebSocketServer(injector);
@@ -284,4 +288,14 @@ public class ServerMain {
     WaveIndexer waveIndexer = injector.getInstance(WaveIndexer.class);
     waveIndexer.remakeIndex();
   }
+
+  private static void initializeShutdownHandler(final ServerRpcProvider 
server) {
+    ShutdownManager.getInstance().register(new Shutdownable() {
+
+      @Override
+      public void shutdown() throws Exception {
+        server.stopServer();
+      }
+    }, ServerMain.class.getSimpleName(), ShutdownPriority.Server);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java
 
b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java
index 40c2039..845b0df 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.waveprotocol.box.server.shutdown.ShutdownManager;
+import org.waveprotocol.box.server.shutdown.ShutdownPriority;
+import org.waveprotocol.box.server.shutdown.Shutdownable;
 import org.waveprotocol.box.stat.RequestScope;
 import org.waveprotocol.box.stat.Timing;
 import org.waveprotocol.wave.model.util.Preconditions;
@@ -38,7 +41,7 @@ import org.waveprotocol.wave.model.util.Preconditions;
  * @author [email protected] (A. Kaplanov)
  */
 @SuppressWarnings("rawtypes")
-public class RequestScopeExecutor implements Executor {
+public class RequestScopeExecutor implements Executor, Shutdownable {
   private final static Logger LOG = 
Logger.getLogger(RequestScopeExecutor.class.getName());
 
   private ExecutorService executor;
@@ -50,6 +53,7 @@ public class RequestScopeExecutor implements Executor {
   public void setExecutor(ExecutorService executor, String name) {
     Preconditions.checkArgument(this.executor == null, "Executor is already 
defined.");
     this.executor = executor;
+    ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task);
   }
 
   @Override
@@ -75,4 +79,11 @@ public class RequestScopeExecutor implements Executor {
       }
     });
   }
+
+  @Override
+  public void shutdown() throws Exception {
+    if (executor != null) {
+      executor.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java
 
b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java
index f841667..4de3bd8 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java
@@ -32,6 +32,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Logger;
 
+import org.waveprotocol.box.server.shutdown.ShutdownManager;
+import org.waveprotocol.box.server.shutdown.ShutdownPriority;
+import org.waveprotocol.box.server.shutdown.Shutdownable;
 import org.waveprotocol.box.stat.RequestScope;
 import org.waveprotocol.box.stat.Timing;
 import org.waveprotocol.wave.model.util.Preconditions;
@@ -44,8 +47,7 @@ import org.waveprotocol.wave.model.util.Preconditions;
  * @author [email protected] (A. Kaplanov)
  */
 @SuppressWarnings("rawtypes")
-public class ScheduledRequestScopeExecutor implements ScheduledExecutorService 
{
-  private final static Logger LOG = 
Logger.getLogger(ScheduledRequestScopeExecutor.class.getName());
+public class ScheduledRequestScopeExecutor implements 
ScheduledExecutorService, Shutdownable {
 
   private ScheduledExecutorService executor;
 
@@ -59,6 +61,7 @@ public class ScheduledRequestScopeExecutor implements 
ScheduledExecutorService {
   public void setExecutor(ScheduledExecutorService executor, String name) {
     Preconditions.checkArgument(this.executor == null, "Executor is already 
defined.");
     this.executor = executor;
+    ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java
new file mode 100644
index 0000000..7244cfa
--- /dev/null
+++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.waveprotocol.box.server.shutdown;
+
+import org.waveprotocol.wave.model.util.Preconditions;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Life cycle control.
+ *
+ * @author [email protected] (A. Kaplanov)
+ */
+public class LifeCycle {
+
+    private static int SHUTDOWN_TIMOUT_SEC = 10;
+
+    private final String name;
+    private final ShutdownPriority shutdownPriority;
+    private final Shutdownable shutdownHandler;
+    private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
+    private boolean started;
+
+    /**
+     * Creates lifecycle.
+     *
+     * @param name the name of task.
+     * @param shutdownPriority determines shutdown order.
+     */
+    public LifeCycle(String name, ShutdownPriority shutdownPriority) {
+        this(name, shutdownPriority, null);
+    }
+
+    /**
+     * Creates lifecycle.
+     *
+     * @param name the name of task.
+     * @param shutdownPriority determines shutdown order.
+     * @param shutdownHandler the handler executed on shutdown.
+     */
+    public LifeCycle(String name, ShutdownPriority shutdownPriority, 
Shutdownable shutdownHandler) {
+        this.name = name;
+        this.shutdownPriority = shutdownPriority;
+        this.shutdownHandler = shutdownHandler;
+    }
+
+    /**
+     * Starts lifecycle.
+     */
+    public synchronized void start() {
+        Preconditions.checkArgument(!started, name + " is already started.");
+        started = true;
+        ShutdownManager.getInstance().register(new Shutdownable() {
+
+            @Override
+            public void shutdown() throws Exception {
+                synchronized (LifeCycle.this) {
+                    if (shutdownHandler != null) {
+                        shutdownHandler.shutdown();
+                    }
+                    if (!semaphore.tryAcquire(Integer.MAX_VALUE, 
SHUTDOWN_TIMOUT_SEC, TimeUnit.SECONDS)) {
+                        throw new TimeoutException();
+                    }
+                    started = false;
+                }
+            }
+        }, name, shutdownPriority);
+    }
+
+    /**
+     * Enters to execution block of task.
+     */
+    public synchronized void enter() {
+        checkIsStarted();
+        try {
+            semaphore.acquire();
+        } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     * Leaves execution block of task.
+     */
+    public synchronized void leave() {
+        semaphore.release();
+    }
+
+    private void checkIsStarted() {
+        if (!started) {
+            throw new IllegalStateException(name + " is not started");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java
new file mode 100644
index 0000000..d3a3218
--- /dev/null
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.waveprotocol.box.server.shutdown;
+
+import org.waveprotocol.wave.util.logging.Log;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Registers and executed by specified priority shutdown tasks.
+ *
+ * @author [email protected] (A. Kaplanov)
+ */
+public class ShutdownManager extends Thread {
+
+    interface NamedShutdownable extends Shutdownable {
+        String getName();
+    }
+
+    private static final Log LOG = Log.get(ShutdownManager.class);
+    private static ShutdownManager instance;
+
+    private final SortedMap<ShutdownPriority, Set<NamedShutdownable>> tasks = 
new TreeMap<>();
+
+    private ShutdownManager() {
+        super(ShutdownManager.class.getSimpleName());
+    }
+
+    public static synchronized ShutdownManager getInstance() {
+        if (instance == null) {
+            instance = new ShutdownManager();
+        }
+        return instance;
+    }
+
+    /**
+     * Requsters shutdown task.
+     *
+     * @param shutdownHandler the handler to execute on shutdown.
+     * @param taskName the name of task.
+     * @param priority the priority determines shutdown order.
+     */
+    public synchronized void register(final Shutdownable shutdownHandler, 
final String taskName, ShutdownPriority priority) {
+        if (tasks.isEmpty()) {
+            Runtime.getRuntime().addShutdownHook(this);
+        }
+        Set<NamedShutdownable> priorityTasks = tasks.get(priority);
+        if (priorityTasks == null) {
+            tasks.put(priority, priorityTasks = new HashSet<>());
+        }
+        priorityTasks.add(new NamedShutdownable() {
+
+            @Override
+            public String getName() {
+                return taskName;
+            }
+
+            @Override
+            public void shutdown() throws Exception {
+                shutdownHandler.shutdown();
+            }
+        });
+    }
+
+    /**
+     * Executes on Java shutdown hook.
+     */
+    @Override
+    public void run() {
+        LOG.info("Shutdown hook is fired.");
+        shutdown();
+    }
+
+    private synchronized void shutdown() {
+        LOG.info("Start of shutdown procedure.");
+        for (ShutdownPriority priority : tasks.keySet()) {
+            LOG.info("Shutdown priority class " + priority.name());
+            for (NamedShutdownable task : tasks.get(priority)) {
+                LOG.info("Shutdown of " + task.getName() + " ...");
+                try {
+                    task.shutdown();
+                } catch (Exception ex) {
+                    LOG.severe("Shutdown of " + task.getName() + " error", ex);
+                }
+            }
+        }
+        LOG.info("End of shutdown procedure.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java
new file mode 100644
index 0000000..d6c4b76
--- /dev/null
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.waveprotocol.box.server.shutdown;
+
+/**
+ * Priority determines shutdown order.
+ *
+ * @author [email protected] (A. Kaplanov)
+ */
+
+public enum ShutdownPriority {
+    Server(1), Waves(3), Task(2), Storage(3);
+    final int value;
+
+    ShutdownPriority(int priority) {
+        this.value = priority;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java 
b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java
new file mode 100644
index 0000000..2814fd1
--- /dev/null
+++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.waveprotocol.box.server.shutdown;
+
+/**
+ * Shutdown handler.
+ *
+ * @author [email protected] (A. Kaplanov)
+ */
+public interface Shutdownable {
+    void shutdown() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java
index 815f814..2dd5a36 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/Wave.java
@@ -26,15 +26,18 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.waveprotocol.box.server.persistence.PersistenceException;
 import org.waveprotocol.wave.model.id.WaveId;
 import org.waveprotocol.wave.model.id.WaveletId;
 import org.waveprotocol.wave.model.id.WaveletName;
+import org.waveprotocol.wave.model.util.CollectionUtils;
 import org.waveprotocol.wave.util.logging.Log;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -165,4 +168,14 @@ final class Wave implements Iterable<WaveletContainer> {
   ListenableFuture<ImmutableSet<WaveletId>> getLookedupWavelets() {
     return lookedupWavelets;
   }
+
+  synchronized ListenableFuture close() {
+    List<ListenableFuture<Void>> futures = CollectionUtils.newLinkedList();
+    Iterator<WaveletContainer> it = Iterators.unmodifiableIterator(
+            Iterables.concat(localWavelets.asMap().values(), 
remoteWavelets.asMap().values()).iterator());
+    while (it.hasNext()) {
+      futures.add(it.next().close());
+    }
+    return Futures.<Void>successfulAsList(futures);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java
index b2fa769..ef5d8dc 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java
@@ -19,9 +19,7 @@
 
 package org.waveprotocol.box.server.waveserver;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+import com.google.common.cache.*;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,15 +30,17 @@ import org.waveprotocol.box.common.ExceptionalIterator;
 import org.waveprotocol.box.server.CoreSettingsNames;
 import org.waveprotocol.box.server.executor.ExecutorAnnotations.LookupExecutor;
 import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.shutdown.LifeCycle;
+import org.waveprotocol.box.server.shutdown.ShutdownPriority;
+import org.waveprotocol.box.server.shutdown.Shutdownable;
 import org.waveprotocol.wave.model.id.WaveId;
 import org.waveprotocol.wave.model.id.WaveletId;
 import org.waveprotocol.wave.model.id.WaveletName;
 
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.*;
+import java.util.logging.Logger;
 
 /**
  * A collection of wavelets, local and remote, held in memory.
@@ -49,6 +49,11 @@ import java.util.concurrent.Executor;
  */
 public class WaveMap {
 
+  private static final Logger LOG = Logger.getLogger(WaveMap.class.getName());
+
+  private static final int  WAVE_CACHE_SIZE = 1000; // TODO (yurize): Make 
configurable.
+  private static final int  WAVE_CACHE_EXPIRE = 60;
+
   /**
    * Returns a future whose result is the ids of stored wavelets in the given 
wave.
    * Any failure is reported as a {@link PersistenceException}.
@@ -68,8 +73,24 @@ public class WaveMap {
   }
 
   private final LoadingCache<WaveId, Wave> waves;
+  private final Map<WaveId, ListenableFuture> closingWaves = new 
ConcurrentHashMap<>();
   private final WaveletStore<?> store;
 
+  final private LifeCycle lifeCycle = new 
LifeCycle(WaveMap.class.getSimpleName(), ShutdownPriority.Waves,
+          new Shutdownable() {
+            @Override
+            public void shutdown() {
+              waves.invalidateAll();
+              for (Map.Entry<WaveId, ListenableFuture> entry : 
closingWaves.entrySet()) {
+                try {
+                  entry.getValue().get(1, TimeUnit.SECONDS);
+                } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                  LOG.warning("Expect the closing of " + 
entry.getKey().toString());
+                }
+              }
+            }
+          });
+
   @Inject
   public WaveMap(final DeltaAndSnapshotStore waveletStore,
       final WaveletNotificationSubscriber notifiee,
@@ -77,10 +98,32 @@ public class WaveMap {
       final RemoteWaveletContainer.Factory remoteFactory,
                  @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) final String 
waveDomain,
       @LookupExecutor final Executor lookupExecutor) {
-    // NOTE(anorth): DeltaAndSnapshotStore is more specific than necessary, but
-    // helps Guice out.
+
     this.store = waveletStore;
-    waves = CacheBuilder.newBuilder().build(new CacheLoader<WaveId, Wave>() {
+    waves = CacheBuilder.newBuilder()
+            .maximumSize(WAVE_CACHE_SIZE)
+            .expireAfterAccess(WAVE_CACHE_EXPIRE, TimeUnit.MINUTES)
+            .removalListener(new RemovalListener<WaveId, Wave>() {
+              @Override
+              public void onRemoval(final RemovalNotification<WaveId, Wave> 
rn) {
+                LOG.info("Wave " + rn.getKey() + " is evicted, current cache 
size " + waves.size());
+                final ListenableFuture future = rn.getValue().close();
+                closingWaves.put(rn.getKey(), future);
+                future.addListener(new Runnable() {
+
+                  @Override
+                  public void run() {
+                    try {
+                      future.get();
+                    } catch (InterruptedException | ExecutionException ex) {
+                      LOG.log(Level.WARNING, "Closing wave exception", ex);
+                    }
+                    closingWaves.remove(rn.getKey());
+                  }
+                }, MoreExecutors.sameThreadExecutor());
+              }
+            }
+            .build(new CacheLoader<WaveId, Wave>() {
       @Override
       public Wave load(WaveId waveId) {
         ListenableFuture<ImmutableSet<WaveletId>> lookedupWavelets =

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java
index 7cf0763..f6073cb 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainer.java
@@ -19,6 +19,7 @@
 
 package org.waveprotocol.box.server.waveserver;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.waveprotocol.box.common.Receiver;
 import org.waveprotocol.box.server.frontend.CommittedWaveletSnapshot;
 import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
@@ -49,6 +50,9 @@ interface WaveletContainer {
     T create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, 
String waveDomain);
   }
 
+  /** Start closing resources */
+  ListenableFuture close();
+
   /** Returns the name of the wavelet. */
   WaveletName getWaveletName();
 

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/cb09a53a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
index fe08fe0..62c7d4e 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
@@ -19,6 +19,7 @@
 
 package org.waveprotocol.box.server.waveserver;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.waveprotocol.box.common.Receiver;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -79,6 +80,15 @@ abstract class WaveletContainerImpl implements 
WaveletContainer {
     /** Wavelet state is being loaded from storage. */
     LOADING,
 
+    /** Wavelet state is being indexing. */
+    INDEXING,
+
+    /** Wavelet state is being closing. */
+    CLOSING,
+
+    /** Wavelet has been closed. */
+    CLOSED,
+
     /** Wavelet has been deleted, the instance will not contain any data. */
     DELETED,
 
@@ -89,6 +99,7 @@ abstract class WaveletContainerImpl implements 
WaveletContainer {
     CORRUPTED
   }
 
+
   private final Executor storageContinuationExecutor;
 
   private final Lock readLock;
@@ -96,6 +107,7 @@ abstract class WaveletContainerImpl implements 
WaveletContainer {
   private final WaveletName waveletName;
   private final WaveletNotificationSubscriber notifiee;
   private final ParticipantId sharedDomainParticipantId;
+
   /** Is counted down when initial loading from storage completes. */
   private final CountDownLatch loadLatch = new CountDownLatch(1);
   /** Is set at most once, before loadLatch is counted down. */
@@ -107,7 +119,7 @@ abstract class WaveletContainerImpl implements 
WaveletContainer {
    * WaveletData is not set until a delta has been applied.
    *
    * @param notifiee the subscriber to notify of wavelet updates and commits.
-   * @param waveletState the wavelet's delta history and current state.
+   * @param waveletStateFuture the wavelet's delta history and current state.
    * @param waveDomain the wave server domain.
    * @param storageContinuationExecutor the executor used to perform post 
wavelet loading logic.
    */
@@ -561,4 +573,41 @@ abstract class WaveletContainerImpl implements 
WaveletContainer {
   protected ReadableWaveletData accessSnapshot() {
     return waveletState.getSnapshot();
   }
+
+  @Override
+  public ListenableFuture close() {
+    state = State.CLOSING;
+    final SettableFuture<Void> future = SettableFuture.create();
+    final ListenableFuture deltaCloseFuture = deltaStateAccessor.close();
+    deltaCloseFuture.addListener(new Runnable() {
+
+      @Override
+      public void run() {
+        boolean consistent = true;
+        try {
+          deltaCloseFuture.get();
+        } catch (InterruptedException | ExecutionException ex) {
+          LOG.severe("Exception of closing delta store", ex);
+          consistent = false;
+        }
+        if (segmentWaveletState != null) {
+          if (consistent) {
+            segmentWaveletState.markAsConsistent();
+          }
+          segmentWaveletState.close().addListener(new Runnable() {
+
+            @Override
+            public void run() {
+              state = State.CLOSED;
+              future.set(null);
+            }
+          }, MoreExecutors.sameThreadExecutor());
+        } else {
+          state = State.CLOSED;
+          future.set(null);
+        }
+      }
+    }, storageContinuationExecutor);
+    return future;
+  }
 }

Reply via email to