Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1247
Change subject: Wait For Metadata Registration Before Active, Etc.
......................................................................
Wait For Metadata Registration Before Active, Etc.
Also:
- Honor [nc] metadata.port
- Add [cc] metadata.callback.port
Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
31 files changed, 167 insertions(+), 107 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/47/1247/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 392bec8..1987f2b 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -23,7 +23,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -54,7 +54,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index fc67d3c..0774b16 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -23,7 +23,7 @@
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.service.IControllerService;
@@ -65,7 +65,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ActiveLifecycleListener.INSTANCE.receive(this);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 9120aa5..33b3752 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -71,14 +71,22 @@
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.IniUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlet.ServletMapping;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.Option;
public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private static final Logger LOGGER =
Logger.getLogger(CCApplicationEntryPoint.class.getName());
+
+ @Option(name = "-metadata-callback-port",
+ usage = "Sets the port to bind metadata callback listener
(default: random port)")
+ public int metadataCallbackPort = 0;
private List<Server> servers;
@@ -88,7 +96,8 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws
Exception {
- IMessageBroker messageBroker = new
CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
+ final ClusterControllerService controllerService =
(ClusterControllerService) ccAppCtx.getControllerService();
+ IMessageBroker messageBroker = new CCMessageBroker(controllerService);
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -101,16 +110,21 @@
AsterixResourceIdManager resourceIdManager = new
AsterixResourceIdManager();
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
AsterixAppContextInfo.initialize(appCtx,
getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
- libraryManager, resourceIdManager);
+ libraryManager, resourceIdManager, () ->
MetadataManager.INSTANCE);
ccExtensionManager = new CompilerExtensionManager(getExtensions());
AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
- if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname",
- ((ClusterControllerService)
ccAppCtx.getControllerService()).getCCConfig().clusterNetIpAddress);
+ final CCConfig ccConfig = controllerService.getCCConfig();
+ if (ccConfig.configFile != null) {
+ Ini ini = IniUtils.loadINIFile(ccConfig.configFile);
+ metadataCallbackPort = IniUtils.getInt(ini, "cc",
"metadata.callback.port" , metadataCallbackPort);
}
- setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
+ if (System.getProperty("java.rmi.server.hostname") == null) {
+ System.setProperty("java.rmi.server.hostname",
ccConfig.clusterNetIpAddress);
+ }
+
+
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataCallbackPort));
appCtx.setDistributedState(proxy);
AsterixMetadataProperties metadataProperties =
AsterixAppContextInfo.INSTANCE.getMetadataProperties();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 4eaab2d..8423fa7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -42,6 +42,7 @@
import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.exceptions.HyracksException;
public class ClusterLifecycleListener implements IClusterLifecycleListener {
@@ -64,7 +65,7 @@
}
@Override
- public void notifyNodeJoin(String nodeId, Map<String, String>
ncConfiguration) {
+ public void notifyNodeJoin(String nodeId, Map<String, String>
ncConfiguration) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + nodeId + " joined");
}
@@ -88,7 +89,7 @@
}
@Override
- public void notifyNodeFailure(Set<String> deadNodeIds) {
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws
HyracksException {
for (String deadNode : deadNodeIds) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index f26afa8..f0701dc 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixProperties;
import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
@@ -53,7 +54,9 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.control.common.controllers.IniUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.ini4j.Ini;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -94,7 +97,6 @@
parser.printUsage(System.err);
throw e;
}
-
ncAppCtx.setThreadFactory(new
AsterixThreadFactory(ncAppCtx.getThreadFactory(),
ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
@@ -103,8 +105,15 @@
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
+ final NodeControllerService controllerService =
(NodeControllerService) ncAppCtx.getControllerService();
+ if (controllerService.getConfiguration().configFile != null) {
+ Ini ini =
IniUtils.loadINIFile(controllerService.getConfiguration().configFile);
+ metadataRmiPort = IniUtils.getInt(ini,
AsterixProperties.SECTION_PREFIX_NC + nodeId,
+ AsterixProperties.PROPERTY_METADATA_PORT, metadataRmiPort);
+ }
+
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname",
((NodeControllerService) ncAppCtx.getControllerService())
+ System.setProperty("java.rmi.server.hostname", (controllerService)
.getConfiguration().clusterNetPublicIPAddress);
}
runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext,
metadataRmiPort, getExtensions());
@@ -120,8 +129,7 @@
ncApplicationContext.setApplicationObject(runtimeContext);
MessagingProperties messagingProperties =
((IAsterixPropertiesProvider) runtimeContext)
.getMessagingProperties();
- messageBroker = new NCMessageBroker((NodeControllerService)
ncAppCtx.getControllerService(),
- messagingProperties);
+ messageBroker = new NCMessageBroker(controllerService,
messagingProperties);
ncApplicationContext.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory = new
MessagingChannelInterfaceFactory(
(NCMessageBroker) messageBroker, messagingProperties);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index adf8e38..323df65 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -27,6 +27,7 @@
public enum ClusterState {
STARTING,
+ PENDING,
ACTIVE,
UNUSABLE,
REBALANCING
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index ea1ee31..2ceb538 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -204,7 +204,6 @@
// Here we figure out which is the metadata node. If any NCs
// declare "metadata.port", use that one; otherwise just use the first.
if (mdNode == null || cfg.getString(section,
AsterixProperties.PROPERTY_METADATA_PORT) != null) {
- // QQQ But we don't actually *honor* metadata.port yet!
newMetadataNode = ncId;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index dbd2139..d10223b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.common.messaging.api;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.service.IControllerService;
@@ -28,5 +28,5 @@
/**
* handle the message upon delivery
*/
- public void handle(IControllerService cs) throws HyracksDataException,
InterruptedException;
+ void handle(IControllerService cs) throws HyracksException,
InterruptedException;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 70da097..2a799ff 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -22,6 +22,7 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +34,7 @@
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.asterix.metadata.api.IMetadataBootstrap;
import org.apache.asterix.metadata.api.IMetadataManager;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -49,6 +51,7 @@
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.om.types.ARecordType;
import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.api.exceptions.HyracksException;
/**
* Provides access to Asterix metadata via remote methods to the metadata node.
@@ -82,15 +85,14 @@
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
-public class MetadataManager implements IMetadataManager {
- private static final int INITIAL_SLEEP_TIME = 64;
- private static final int RETRY_MULTIPLIER = 5;
- private static final int MAX_RETRY_COUNT = 10;
+public class MetadataManager implements IMetadataManager, IMetadataBootstrap {
+ private static final int WAIT_TIME_SECONDS = 120;
// Set in init().
- public static MetadataManager INSTANCE;
+ public static MetadataManager INSTANCE = null;
private final MetadataCache cache = new MetadataCache();
private final IAsterixStateProxy proxy;
+ private final Object lock = new Object();
private IMetadataNode metadataNode;
private final ReadWriteLock metadataLatch;
private final AsterixMetadataProperties metadataProperties;
@@ -98,7 +100,7 @@
public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties
metadataProperties) {
if (proxy == null) {
- throw new Error("Null proxy given to MetadataManager.");
+ throw new IllegalArgumentException("Null proxy given to
MetadataManager");
}
this.proxy = proxy;
this.metadataProperties = metadataProperties;
@@ -108,7 +110,7 @@
public MetadataManager(IAsterixStateProxy proxy, IMetadataNode
metadataNode) {
if (metadataNode == null) {
- throw new Error("Null metadataNode given to MetadataManager.");
+ throw new IllegalArgumentException("Null metadataNode given to
MetadataManager");
}
this.proxy = proxy;
this.metadataProperties = null;
@@ -117,30 +119,25 @@
}
@Override
- public void init() throws RemoteException, MetadataException {
- // Could be synchronized on any object. Arbitrarily chose proxy.
- synchronized (proxy) {
+ public void init() throws HyracksException {
+ synchronized (lock) {
if (metadataNode != null && !rebindMetadataNode) {
return;
}
try {
- int retry = 0;
- int sleep = INITIAL_SLEEP_TIME;
- while (retry++ < MAX_RETRY_COUNT) {
- metadataNode = proxy.getMetadataNode();
- if (metadataNode != null) {
- rebindMetadataNode = false;
- break;
- }
- Thread.sleep(sleep);
- sleep *= RETRY_MULTIPLIER;
+ metadataNode = proxy.waitForMetadataNode(WAIT_TIME_SECONDS,
TimeUnit.SECONDS);
+ if (metadataNode != null) {
+ rebindMetadataNode = false;
+ } else {
+ throw new IllegalStateException("Failed to get the
MetadataNode.\n"
+ + "The MetadataNode was configured to run on NC: "
+ + metadataProperties.getMetadataNodeName());
}
} catch (InterruptedException e) {
- throw new MetadataException(e);
- }
- if (metadataNode == null) {
- throw new Error("Failed to get the MetadataNode.\n" + "The
MetadataNode was configured to run on NC: "
- + metadataProperties.getMetadataNodeName());
+ Thread.currentThread().interrupt();
+ throw new HyracksException(e);
+ } catch (RemoteException e) {
+ throw new HyracksException(e);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
index 7717a79..0374ff7 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
@@ -22,12 +22,15 @@
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
/**
* Interface for setting/getting distributed state of Asterix.
*/
public interface IAsterixStateProxy extends Remote, Serializable {
- public void setMetadataNode(IMetadataNode metadataNode) throws
RemoteException;
+ void setMetadataNode(IMetadataNode metadataNode) throws RemoteException;
- public IMetadataNode getMetadataNode() throws RemoteException;
+ IMetadataNode getMetadataNode() throws RemoteException;
+
+ IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws
RemoteException, InterruptedException;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 0acc027..fa12d75 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -56,15 +56,6 @@
public interface IMetadataManager {
/**
- * Initializes the metadata manager, e.g., finds the remote metadata node.
- *
- * @throws RemoteException
- * If an error occurred while contacting the proxy for finding
- * the metadata node.
- */
- void init() throws RemoteException, MetadataException;
-
- /**
* Begins a transaction on the metadata node.
*
* @return A globally unique transaction id.
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 2f881be..16c9d05 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -21,6 +21,7 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -36,8 +37,8 @@
private IMetadataNode metadataNode;
private static final IAsterixStateProxy cc = new AsterixStateProxy();
- public static IAsterixStateProxy registerRemoteObject() throws
RemoteException {
- IAsterixStateProxy stub = (IAsterixStateProxy)
UnicastRemoteObject.exportObject(cc, 0);
+ public static IAsterixStateProxy registerRemoteObject(int
metadataCallbackPort) throws RemoteException {
+ IAsterixStateProxy stub = (IAsterixStateProxy)
UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
LOGGER.info("Asterix Distributed State Proxy Bound");
return stub;
}
@@ -48,12 +49,26 @@
}
@Override
- public void setMetadataNode(IMetadataNode metadataNode) throws
RemoteException {
+ public synchronized void setMetadataNode(IMetadataNode metadataNode) {
this.metadataNode = metadataNode;
+ notifyAll();
}
@Override
- public IMetadataNode getMetadataNode() throws RemoteException {
+ public IMetadataNode getMetadataNode() {
return this.metadataNode;
}
+
+ @Override
+ public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit)
throws InterruptedException {
+ synchronized (this) {
+ long timeToWait = timeUnit.convert(waitFor, TimeUnit.MILLISECONDS);
+ while (metadataNode == null && timeToWait > 0) {
+ long startTime = System.currentTimeMillis();
+ wait(timeToWait);
+ timeToWait -= System.currentTimeMillis() - startTime;
+ }
+ return metadataNode;
+ }
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index 5cb1a6a..d110ba2 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -63,7 +64,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 4ae73ea..885dfce 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -21,7 +21,7 @@
import java.util.Set;
import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
public class CompleteFailbackResponseMessage extends
AbstractFailbackPlanMessage {
@@ -48,7 +48,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index c112366..bcb6764 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -27,7 +27,7 @@
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -72,7 +72,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index db89f7c..1c139b0 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -21,7 +21,7 @@
import java.util.Set;
import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
public class PreparePartitionsFailbackResponseMessage extends
AbstractFailbackPlanMessage {
@@ -39,7 +39,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index fc55968..d925684 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -24,7 +24,7 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.event.schema.cluster.Node;
import
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -54,7 +54,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index f9f6233..1d91572 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -48,7 +49,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
IAsterixResourceIdManager resourceIdManager =
AsterixAppContextInfo.INSTANCE.getResourceIdManager();
resourceIdManager.report(src, maxResourceId);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 203104e..b089085 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.message;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -27,7 +27,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ReportMaxResourceIdMessage.send((NodeControllerService) cs);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index afe2427..108fd73 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -26,7 +26,7 @@
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
public class ResourceIdRequestMessage implements IApplicationMessage {
@@ -38,7 +38,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
try {
ICCMessageBroker broker =
(ICCMessageBroker)
AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index d4cc022..06862f7 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -21,7 +21,7 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -48,7 +48,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException,
InterruptedException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext asterixNcAppRuntimeCtx =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index e877f52..f1a5f8c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -35,7 +36,7 @@
private static final Logger LOGGER =
Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index 2466e2b..1a76e4c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -20,7 +20,7 @@
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
public class TakeoverMetadataNodeResponseMessage implements
IApplicationMessage {
@@ -37,7 +37,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index e024eed..02ab1a6 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -74,7 +75,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index a4a5226..daedc61 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -20,7 +20,7 @@
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.service.IControllerService;
public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
@@ -49,7 +49,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksException,
InterruptedException {
ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index 471d3d3..73509b4 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.util;
import java.io.IOException;
+import java.util.function.Supplier;
import java.util.logging.Logger;
import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
@@ -38,6 +39,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.metadata.api.IMetadataBootstrap;
import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -66,6 +68,7 @@
private AsterixReplicationProperties replicationProperties;
private AsterixExtensionProperties extensionProperties;
private MessagingProperties messagingProperties;
+ private Supplier<IMetadataBootstrap> metadataBootstrapSupplier;
private IHyracksClientConnection hcc;
private Object extensionManager;
private volatile boolean initialized = false;
@@ -74,8 +77,10 @@
}
public static synchronized void initialize(ICCApplicationContext ccAppCtx,
IHyracksClientConnection hcc,
- IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager
libraryManager,
- IAsterixResourceIdManager resourceIdManager)
+ IGlobalRecoveryMaanger
globalRecoveryMaanger,
+ ILibraryManager libraryManager,
+ IAsterixResourceIdManager
resourceIdManager,
+ Supplier<IMetadataBootstrap>
metadataBootstrapSupplier)
throws AsterixException, IOException {
if (INSTANCE.initialized) {
throw new
AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been
initialized already");
@@ -107,6 +112,8 @@
INSTANCE.hcc = hcc;
INSTANCE.buildProperties = new
AsterixBuildProperties(propertiesAccessor);
INSTANCE.messagingProperties = new
MessagingProperties(propertiesAccessor);
+ INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier;
+
Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel());
Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel());
}
@@ -204,4 +211,8 @@
public IAsterixResourceIdManager getResourceIdManager() {
return resourceIdManager;
}
+
+ public IMetadataBootstrap getMetadataBootstrap() {
+ return metadataBootstrapSupplier.get();
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index bc15788..4c17c74 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -51,6 +51,7 @@
import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.json.JSONException;
import org.json.JSONObject;
@@ -109,7 +110,7 @@
}
}
- public synchronized void removeNCConfiguration(String nodeId) {
+ public synchronized void removeNCConfiguration(String nodeId) throws
HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " +
nodeId);
}
@@ -139,7 +140,8 @@
}
}
- public synchronized void addNCConfiguration(String nodeId, Map<String,
String> configuration) {
+ public synchronized void addNCConfiguration(String nodeId, Map<String,
String> configuration)
+ throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering configuration parameters for node id " +
nodeId);
}
@@ -167,7 +169,7 @@
updateNodePartitions(nodeId, true);
}
- private synchronized void updateNodePartitions(String nodeId, boolean
added) {
+ private synchronized void updateNodePartitions(String nodeId, boolean
added) throws HyracksException {
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
@@ -183,7 +185,7 @@
}
}
- private synchronized void updateClusterState() {
+ private synchronized void updateClusterState() throws HyracksException {
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
state = ClusterState.UNUSABLE;
@@ -191,11 +193,14 @@
return;
}
}
- //if all storage partitions are active as well as the metadata node,
then the cluster is active
+ // if all storage partitions are active as well as the metadata node,
then the cluster is active
if (metadataNodeActive) {
+ state = ClusterState.PENDING;
+ LOGGER.info("Cluster is now " + state);
+ AsterixAppContextInfo.INSTANCE.getMetadataBootstrap().init();
state = ClusterState.ACTIVE;
- LOGGER.info("Cluster is now ACTIVE");
- //start global recovery
+ LOGGER.info("Cluster is now " + state);
+ // start global recovery
AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
processPendingFailbackPlans();
@@ -412,19 +417,21 @@
}
}
- public synchronized void
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
- for (Integer partitonId : reponse.getPartitions()) {
+ public synchronized void
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+ throws HyracksException {
+ for (Integer partitonId : response.getPartitions()) {
ClusterPartition partition = clusterPartitions.get(partitonId);
partition.setActive(true);
- partition.setActiveNodeId(reponse.getNodeId());
+ partition.setActiveNodeId(response.getNodeId());
}
- pendingTakeoverRequests.remove(reponse.getRequestId());
+ pendingTakeoverRequests.remove(response.getRequestId());
resetClusterPartitionConstraint();
updateClusterState();
}
- public synchronized void
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage
reponse) {
- currentMetadataNode = reponse.getNodeId();
+ public synchronized void
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage
response)
+ throws HyracksException {
+ currentMetadataNode = response.getNodeId();
metadataNodeActive = true;
LOGGER.info("Current metadata node: " + currentMetadataNode);
updateClusterState();
@@ -556,7 +563,8 @@
}
}
- public synchronized void
processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+ public synchronized void
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+ throws HyracksException {
/**
* the failback plan completed successfully:
* Remove all references to it.
@@ -564,7 +572,7 @@
* Notify its replicas to reconnect to it.
* Set the failing back node partitions as active.
*/
- NodeFailbackPlan plan =
planId2FailbackPlanMap.remove(reponse.getPlanId());
+ NodeFailbackPlan plan =
planId2FailbackPlanMap.remove(response.getPlanId());
String nodeId = plan.getNodeId();
failedNodes.remove(nodeId);
//notify impacted replicas they can reconnect to this node
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index 733382b..a9bef18 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -21,8 +21,11 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
/**
- * A listener interface for providing notification call backs to events such
as a Node Controller joining/leaving the cluster.
+ * A listener interface for providing notification call backs to events such
as a Node Controller joining/leaving the
+ * cluster.
*/
public interface IClusterLifecycleListener {
@@ -35,15 +38,15 @@
/**
* @param nodeId
* A unique identifier of a Node Controller
- * @param ncConfig
+ * @param ncConfiguration
* A map containing the set of configuration parameters that
were used to start the Node Controller
*/
- public void notifyNodeJoin(String nodeId, Map<String, String>
ncConfiguration);
+ public void notifyNodeJoin(String nodeId, Map<String, String>
ncConfiguration) throws HyracksException;
/**
* @param deadNodeIds
* A set of Node Controller Ids that have left the cluster. The
set is not cumulative.
*/
- public void notifyNodeFailure(Set<String> deadNodeIds);
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws
HyracksException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index 786a89f..dff3107 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
public class CCDriver {
- public static void main(String args[]) throws Exception {
+ public static void main(String args []) throws Exception {
try {
CCConfig ccConfig = new CCConfig();
CmdLineParser cp = new CmdLineParser(ccConfig);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dfce7b8..dd6f83b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -110,7 +110,7 @@
}
}
- public void notifyNodeFailure(Set<String> deadNodeIds) {
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws
HyracksException {
for (IClusterLifecycleListener l : clusterLifecycleListeners) {
l.notifyNodeFailure(deadNodeIds);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index b3a3065..510c729 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -24,6 +24,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -41,7 +42,7 @@
@Override
public void run() {
- Set<String> deadNodes = new HashSet<String>();
+ final Set<String> deadNodes = new HashSet<String>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
NodeControllerState state = e.getValue();
@@ -69,8 +70,12 @@
}
}
}
- if (deadNodes != null && deadNodes.size() > 0) {
- ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+ if (!deadNodes.isEmpty()) {
+ try {
+ ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+ } catch (HyracksException e) {
+ LOGGER.log(Level.WARNING, "Uncaught exception on
notifyNodeFailure", e);
+ }
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1247
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>