Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1247

Change subject: Wait For Metadata Registration Before Active, Etc.
......................................................................

Wait For Metadata Registration Before Active, Etc.

Also:
- Honor [nc] metadata.port
- Add [cc] metadata.callback.port

Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
31 files changed, 167 insertions(+), 107 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/47/1247/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 392bec8..1987f2b 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -23,7 +23,7 @@
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index fc67d3c..0774b16 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -23,7 +23,7 @@
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -65,7 +65,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         ActiveLifecycleListener.INSTANCE.receive(this);
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 9120aa5..33b3752 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -71,14 +71,22 @@
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlet.ServletMapping;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.Option;
 
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static final Logger LOGGER = 
Logger.getLogger(CCApplicationEntryPoint.class.getName());
+
+    @Option(name = "-metadata-callback-port",
+            usage = "Sets the port to bind metadata callback listener 
(default: random port)")
+    public int metadataCallbackPort = 0;
 
     private List<Server> servers;
 
@@ -88,7 +96,8 @@
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws 
Exception {
-        IMessageBroker messageBroker = new 
CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
+        final ClusterControllerService controllerService = 
(ClusterControllerService) ccAppCtx.getControllerService();
+        IMessageBroker messageBroker = new CCMessageBroker(controllerService);
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -101,16 +110,21 @@
         AsterixResourceIdManager resourceIdManager = new 
AsterixResourceIdManager();
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         AsterixAppContextInfo.initialize(appCtx, 
getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
-                libraryManager, resourceIdManager);
+                libraryManager, resourceIdManager, () -> 
MetadataManager.INSTANCE);
         ccExtensionManager = new CompilerExtensionManager(getExtensions());
         AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
 
-        if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname",
-                    ((ClusterControllerService) 
ccAppCtx.getControllerService()).getCCConfig().clusterNetIpAddress);
+        final CCConfig ccConfig = controllerService.getCCConfig();
+        if (ccConfig.configFile != null) {
+            Ini ini = IniUtils.loadINIFile(ccConfig.configFile);
+            metadataCallbackPort = IniUtils.getInt(ini, "cc", 
"metadata.callback.port" , metadataCallbackPort);
         }
 
-        setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
+        if (System.getProperty("java.rmi.server.hostname") == null) {
+            System.setProperty("java.rmi.server.hostname", 
ccConfig.clusterNetIpAddress);
+        }
+
+        
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataCallbackPort));
         appCtx.setDistributedState(proxy);
 
         AsterixMetadataProperties metadataProperties = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 4eaab2d..8423fa7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
 import org.apache.asterix.runtime.util.ClusterStateManager;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.exceptions.HyracksException;
 
 public class ClusterLifecycleListener implements IClusterLifecycleListener {
 
@@ -64,7 +65,7 @@
     }
 
     @Override
-    public void notifyNodeJoin(String nodeId, Map<String, String> 
ncConfiguration) {
+    public void notifyNodeJoin(String nodeId, Map<String, String> 
ncConfiguration) throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("NC: " + nodeId + " joined");
         }
@@ -88,7 +89,7 @@
     }
 
     @Override
-    public void notifyNodeFailure(Set<String> deadNodeIds) {
+    public void notifyNodeFailure(Set<String> deadNodeIds) throws 
HyracksException {
         for (String deadNode : deadNodeIds) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index f26afa8..f0701dc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
@@ -53,7 +54,9 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.ini4j.Ini;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
@@ -94,7 +97,6 @@
             parser.printUsage(System.err);
             throw e;
         }
-
         ncAppCtx.setThreadFactory(new 
AsterixThreadFactory(ncAppCtx.getThreadFactory(),
                 ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
@@ -103,8 +105,15 @@
             LOGGER.info("Starting Asterix node controller: " + nodeId);
         }
 
+        final NodeControllerService controllerService = 
(NodeControllerService) ncAppCtx.getControllerService();
+        if (controllerService.getConfiguration().configFile != null) {
+            Ini ini = 
IniUtils.loadINIFile(controllerService.getConfiguration().configFile);
+            metadataRmiPort = IniUtils.getInt(ini, 
AsterixProperties.SECTION_PREFIX_NC + nodeId,
+                    AsterixProperties.PROPERTY_METADATA_PORT, metadataRmiPort);
+        }
+
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", 
((NodeControllerService) ncAppCtx.getControllerService())
+            System.setProperty("java.rmi.server.hostname", (controllerService)
                     .getConfiguration().clusterNetPublicIPAddress);
         }
         runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, 
metadataRmiPort, getExtensions());
@@ -120,8 +129,7 @@
         ncApplicationContext.setApplicationObject(runtimeContext);
         MessagingProperties messagingProperties = 
((IAsterixPropertiesProvider) runtimeContext)
                 .getMessagingProperties();
-        messageBroker = new NCMessageBroker((NodeControllerService) 
ncAppCtx.getControllerService(),
-                messagingProperties);
+        messageBroker = new NCMessageBroker(controllerService, 
messagingProperties);
         ncApplicationContext.setMessageBroker(messageBroker);
         MessagingChannelInterfaceFactory interfaceFactory = new 
MessagingChannelInterfaceFactory(
                 (NCMessageBroker) messageBroker, messagingProperties);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index adf8e38..323df65 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -27,6 +27,7 @@
 
     public enum ClusterState {
         STARTING,
+        PENDING,
         ACTIVE,
         UNUSABLE,
         REBALANCING
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index ea1ee31..2ceb538 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -204,7 +204,6 @@
         // Here we figure out which is the metadata node. If any NCs
         // declare "metadata.port", use that one; otherwise just use the first.
         if (mdNode == null || cfg.getString(section, 
AsterixProperties.PROPERTY_METADATA_PORT) != null) {
-            // QQQ But we don't actually *honor* metadata.port yet!
             newMetadataNode = ncId;
         }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index dbd2139..d10223b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.common.messaging.api;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -28,5 +28,5 @@
     /**
      * handle the message upon delivery
      */
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException;
+    void handle(IControllerService cs) throws HyracksException, 
InterruptedException;
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 70da097..2a799ff 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -22,6 +22,7 @@
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -33,6 +34,7 @@
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
 import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.asterix.metadata.api.IMetadataBootstrap;
 import org.apache.asterix.metadata.api.IMetadataManager;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -49,6 +51,7 @@
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.om.types.ARecordType;
 import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.api.exceptions.HyracksException;
 
 /**
  * Provides access to Asterix metadata via remote methods to the metadata node.
@@ -82,15 +85,14 @@
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
-public class MetadataManager implements IMetadataManager {
-    private static final int INITIAL_SLEEP_TIME = 64;
-    private static final int RETRY_MULTIPLIER = 5;
-    private static final int MAX_RETRY_COUNT = 10;
+public class MetadataManager implements IMetadataManager, IMetadataBootstrap {
+    private static final int WAIT_TIME_SECONDS = 120;
 
     // Set in init().
-    public static MetadataManager INSTANCE;
+    public static MetadataManager INSTANCE = null;
     private final MetadataCache cache = new MetadataCache();
     private final IAsterixStateProxy proxy;
+    private final Object lock = new Object();
     private IMetadataNode metadataNode;
     private final ReadWriteLock metadataLatch;
     private final AsterixMetadataProperties metadataProperties;
@@ -98,7 +100,7 @@
 
     public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties 
metadataProperties) {
         if (proxy == null) {
-            throw new Error("Null proxy given to MetadataManager.");
+            throw new IllegalArgumentException("Null proxy given to 
MetadataManager");
         }
         this.proxy = proxy;
         this.metadataProperties = metadataProperties;
@@ -108,7 +110,7 @@
 
     public MetadataManager(IAsterixStateProxy proxy, IMetadataNode 
metadataNode) {
         if (metadataNode == null) {
-            throw new Error("Null metadataNode given to MetadataManager.");
+            throw new IllegalArgumentException("Null metadataNode given to 
MetadataManager");
         }
         this.proxy = proxy;
         this.metadataProperties = null;
@@ -117,30 +119,25 @@
     }
 
     @Override
-    public void init() throws RemoteException, MetadataException {
-        // Could be synchronized on any object. Arbitrarily chose proxy.
-        synchronized (proxy) {
+    public void init() throws HyracksException {
+        synchronized (lock) {
             if (metadataNode != null && !rebindMetadataNode) {
                 return;
             }
             try {
-                int retry = 0;
-                int sleep = INITIAL_SLEEP_TIME;
-                while (retry++ < MAX_RETRY_COUNT) {
-                    metadataNode = proxy.getMetadataNode();
-                    if (metadataNode != null) {
-                        rebindMetadataNode = false;
-                        break;
-                    }
-                    Thread.sleep(sleep);
-                    sleep *= RETRY_MULTIPLIER;
+                metadataNode = proxy.waitForMetadataNode(WAIT_TIME_SECONDS, 
TimeUnit.SECONDS);
+                if (metadataNode != null) {
+                    rebindMetadataNode = false;
+                } else {
+                    throw new IllegalStateException("Failed to get the 
MetadataNode.\n"
+                            + "The MetadataNode was configured to run on NC: "
+                            + metadataProperties.getMetadataNodeName());
                 }
             } catch (InterruptedException e) {
-                throw new MetadataException(e);
-            }
-            if (metadataNode == null) {
-                throw new Error("Failed to get the MetadataNode.\n" + "The 
MetadataNode was configured to run on NC: "
-                        + metadataProperties.getMetadataNodeName());
+                Thread.currentThread().interrupt();
+                throw new HyracksException(e);
+            } catch (RemoteException e) {
+                throw new HyracksException(e);
             }
         }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
index 7717a79..0374ff7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
@@ -22,12 +22,15 @@
 import java.io.Serializable;
 import java.rmi.Remote;
 import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Interface for setting/getting distributed state of Asterix.
  */
 public interface IAsterixStateProxy extends Remote, Serializable {
-    public void setMetadataNode(IMetadataNode metadataNode) throws 
RemoteException;
+    void setMetadataNode(IMetadataNode metadataNode) throws RemoteException;
 
-    public IMetadataNode getMetadataNode() throws RemoteException;
+    IMetadataNode getMetadataNode() throws RemoteException;
+
+    IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws 
RemoteException, InterruptedException;
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 0acc027..fa12d75 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -56,15 +56,6 @@
 public interface IMetadataManager {
 
     /**
-     * Initializes the metadata manager, e.g., finds the remote metadata node.
-     *
-     * @throws RemoteException
-     *             If an error occurred while contacting the proxy for finding
-     *             the metadata node.
-     */
-    void init() throws RemoteException, MetadataException;
-
-    /**
      * Begins a transaction on the metadata node.
      *
      * @return A globally unique transaction id.
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 2f881be..16c9d05 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -21,6 +21,7 @@
 
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -36,8 +37,8 @@
     private IMetadataNode metadataNode;
     private static final IAsterixStateProxy cc = new AsterixStateProxy();
 
-    public static IAsterixStateProxy registerRemoteObject() throws 
RemoteException {
-        IAsterixStateProxy stub = (IAsterixStateProxy) 
UnicastRemoteObject.exportObject(cc, 0);
+    public static IAsterixStateProxy registerRemoteObject(int 
metadataCallbackPort) throws RemoteException {
+        IAsterixStateProxy stub = (IAsterixStateProxy) 
UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
         LOGGER.info("Asterix Distributed State Proxy Bound");
         return stub;
     }
@@ -48,12 +49,26 @@
     }
 
     @Override
-    public void setMetadataNode(IMetadataNode metadataNode) throws 
RemoteException {
+    public synchronized void setMetadataNode(IMetadataNode metadataNode) {
         this.metadataNode = metadataNode;
+        notifyAll();
     }
 
     @Override
-    public IMetadataNode getMetadataNode() throws RemoteException {
+    public IMetadataNode getMetadataNode() {
         return this.metadataNode;
     }
+
+    @Override
+    public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) 
throws InterruptedException {
+        synchronized (this) {
+            long timeToWait = timeUnit.convert(waitFor, TimeUnit.MILLISECONDS);
+            while (metadataNode == null && timeToWait > 0) {
+                long startTime = System.currentTimeMillis();
+                wait(timeToWait);
+                timeToWait -= System.currentTimeMillis() - startTime;
+            }
+            return metadataNode;
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index 5cb1a6a..d110ba2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -63,7 +64,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 4ae73ea..885dfce 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -21,7 +21,7 @@
 import java.util.Set;
 
 import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class CompleteFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index c112366..bcb6764 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -72,7 +72,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index db89f7c..1c139b0 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -21,7 +21,7 @@
 import java.util.Set;
 
 import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class PreparePartitionsFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         
ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index fc55968..d925684 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.event.schema.cluster.Node;
 import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index f9f6233..1d91572 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -48,7 +49,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         IAsterixResourceIdManager resourceIdManager =
                 AsterixAppContextInfo.INSTANCE.getResourceIdManager();
         resourceIdManager.report(src, maxResourceId);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 203104e..b089085 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -27,7 +27,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         ReportMaxResourceIdMessage.send((NodeControllerService) cs);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index afe2427..108fd73 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -26,7 +26,7 @@
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class ResourceIdRequestMessage implements IApplicationMessage {
@@ -38,7 +38,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         try {
             ICCMessageBroker broker =
                     (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index d4cc022..06862f7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -21,7 +21,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext asterixNcAppRuntimeCtx =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index e877f52..f1a5f8c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -35,7 +36,7 @@
     private static final Logger LOGGER = 
Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index 2466e2b..1a76e4c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -20,7 +20,7 @@
 
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class TakeoverMetadataNodeResponseMessage implements 
IApplicationMessage {
@@ -37,7 +37,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index e024eed..02ab1a6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -74,7 +75,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         NodeControllerService ncs = (NodeControllerService) cs;
         IAsterixAppRuntimeContext appContext =
                 (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index a4a5226..daedc61 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -20,7 +20,7 @@
 
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
+    public void handle(IControllerService cs) throws HyracksException, 
InterruptedException {
         ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index 471d3d3..73509b4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.util;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
@@ -38,6 +39,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.metadata.api.IMetadataBootstrap;
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -66,6 +68,7 @@
     private AsterixReplicationProperties replicationProperties;
     private AsterixExtensionProperties extensionProperties;
     private MessagingProperties messagingProperties;
+    private Supplier<IMetadataBootstrap> metadataBootstrapSupplier;
     private IHyracksClientConnection hcc;
     private Object extensionManager;
     private volatile boolean initialized = false;
@@ -74,8 +77,10 @@
     }
 
     public static synchronized void initialize(ICCApplicationContext ccAppCtx, 
IHyracksClientConnection hcc,
-            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager 
libraryManager,
-            IAsterixResourceIdManager resourceIdManager)
+                                               IGlobalRecoveryMaanger 
globalRecoveryMaanger,
+                                               ILibraryManager libraryManager,
+                                               IAsterixResourceIdManager 
resourceIdManager,
+                                               Supplier<IMetadataBootstrap> 
metadataBootstrapSupplier)
             throws AsterixException, IOException {
         if (INSTANCE.initialized) {
             throw new 
AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been 
initialized already");
@@ -107,6 +112,8 @@
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new 
AsterixBuildProperties(propertiesAccessor);
         INSTANCE.messagingProperties = new 
MessagingProperties(propertiesAccessor);
+        INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier;
+
         
Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel());
         
Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
@@ -204,4 +211,8 @@
     public IAsterixResourceIdManager getResourceIdManager() {
         return resourceIdManager;
     }
+
+    public IMetadataBootstrap getMetadataBootstrap() {
+        return metadataBootstrapSupplier.get();
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index bc15788..4c17c74 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -51,6 +51,7 @@
 import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -109,7 +110,7 @@
         }
     }
 
-    public synchronized void removeNCConfiguration(String nodeId) {
+    public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
         }
@@ -139,7 +140,8 @@
         }
     }
 
-    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration) {
+    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration)
+            throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
         }
@@ -167,7 +169,7 @@
         updateNodePartitions(nodeId, true);
     }
 
-    private synchronized void updateNodePartitions(String nodeId, boolean 
added) {
+    private synchronized void updateNodePartitions(String nodeId, boolean 
added) throws HyracksException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
@@ -183,7 +185,7 @@
         }
     }
 
-    private synchronized void updateClusterState() {
+    private synchronized void updateClusterState() throws HyracksException {
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 state = ClusterState.UNUSABLE;
@@ -191,11 +193,14 @@
                 return;
             }
         }
-        //if all storage partitions are active as well as the metadata node, 
then the cluster is active
+        // if all storage partitions are active as well as the metadata node, 
then the cluster is active
         if (metadataNodeActive) {
+            state = ClusterState.PENDING;
+            LOGGER.info("Cluster is now " + state);
+            AsterixAppContextInfo.INSTANCE.getMetadataBootstrap().init();
             state = ClusterState.ACTIVE;
-            LOGGER.info("Cluster is now ACTIVE");
-            //start global recovery
+            LOGGER.info("Cluster is now " + state);
+            // start global recovery
             
AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
             if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
                 processPendingFailbackPlans();
@@ -412,19 +417,21 @@
         }
     }
 
-    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
-        for (Integer partitonId : reponse.getPartitions()) {
+    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+            throws HyracksException {
+        for (Integer partitonId : response.getPartitions()) {
             ClusterPartition partition = clusterPartitions.get(partitonId);
             partition.setActive(true);
-            partition.setActiveNodeId(reponse.getNodeId());
+            partition.setActiveNodeId(response.getNodeId());
         }
-        pendingTakeoverRequests.remove(reponse.getRequestId());
+        pendingTakeoverRequests.remove(response.getRequestId());
         resetClusterPartitionConstraint();
         updateClusterState();
     }
 
-    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
reponse) {
-        currentMetadataNode = reponse.getNodeId();
+    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
+            throws HyracksException {
+        currentMetadataNode = response.getNodeId();
         metadataNodeActive = true;
         LOGGER.info("Current metadata node: " + currentMetadataNode);
         updateClusterState();
@@ -556,7 +563,8 @@
         }
     }
 
-    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+            throws HyracksException {
         /**
          * the failback plan completed successfully:
          * Remove all references to it.
@@ -564,7 +572,7 @@
          * Notify its replicas to reconnect to it.
          * Set the failing back node partitions as active.
          */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(reponse.getPlanId());
+        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
         String nodeId = plan.getNodeId();
         failedNodes.remove(nodeId);
         //notify impacted replicas they can reconnect to this node
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index 733382b..a9bef18 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -21,8 +21,11 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.api.exceptions.HyracksException;
+
 /**
- * A listener interface for providing notification call backs to events such 
as a Node Controller joining/leaving the cluster.
+ * A listener interface for providing notification call backs to events such 
as a Node Controller joining/leaving the
+ * cluster.
  */
 public interface IClusterLifecycleListener {
 
@@ -35,15 +38,15 @@
     /**
      * @param nodeId
      *            A unique identifier of a Node Controller
-     * @param ncConfig
+     * @param ncConfiguration
      *            A map containing the set of configuration parameters that 
were used to start the Node Controller
      */
-    public void notifyNodeJoin(String nodeId, Map<String, String> 
ncConfiguration);
+    public void notifyNodeJoin(String nodeId, Map<String, String> 
ncConfiguration) throws HyracksException;
 
     /**
      * @param deadNodeIds
      *            A set of Node Controller Ids that have left the cluster. The 
set is not cumulative.
      */
-    public void notifyNodeFailure(Set<String> deadNodeIds);
+    public void notifyNodeFailure(Set<String> deadNodeIds) throws 
HyracksException;
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index 786a89f..dff3107 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -23,7 +23,7 @@
 import org.apache.hyracks.control.common.controllers.CCConfig;
 
 public class CCDriver {
-    public static void main(String args[]) throws Exception {
+    public static void main(String args []) throws Exception {
         try {
             CCConfig ccConfig = new CCConfig();
             CmdLineParser cp = new CmdLineParser(ccConfig);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dfce7b8..dd6f83b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -110,7 +110,7 @@
         }
     }
 
-    public void notifyNodeFailure(Set<String> deadNodeIds) {
+    public void notifyNodeFailure(Set<String> deadNodeIds) throws 
HyracksException {
         for (IClusterLifecycleListener l : clusterLifecycleListeners) {
             l.notifyNodeFailure(deadNodeIds);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index b3a3065..510c729 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -24,6 +24,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -41,7 +42,7 @@
 
     @Override
     public void run() {
-        Set<String> deadNodes = new HashSet<String>();
+        final Set<String> deadNodes = new HashSet<String>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
             NodeControllerState state = e.getValue();
@@ -69,8 +70,12 @@
                 }
             }
         }
-        if (deadNodes != null && deadNodes.size() > 0) {
-            ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+        if (!deadNodes.isEmpty()) {
+            try {
+                ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+            } catch (HyracksException e) {
+                LOGGER.log(Level.WARNING, "Uncaught exception on 
notifyNodeFailure", e);
+            }
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1247
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to