Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2344

Change subject: [NO ISSUE] More multi-CC support, ConfigManager updates
......................................................................

[NO ISSUE] More multi-CC support, ConfigManager updates

- add ability for OptionTypes to natively parse JsonNodes
- allow all options to be overridden at the NC level, not just NC options (i.e. 
common, cc)
- accept controller id from the CC, avoid configuring this on NCs
- update all CCs with metadata bootstrap, not just the primary CC

Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
M 
asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
25 files changed, 359 insertions(+), 186 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/44/2344/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c554cbd..94f688f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -23,8 +23,11 @@
 import java.rmi.server.UnicastRemoteObject;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -77,6 +80,7 @@
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -414,20 +418,20 @@
 
     @Override
     public void initializeMetadata(boolean newUniverse) throws Exception {
-        IAsterixStateProxy proxy;
+        Collection<IAsterixStateProxy> proxies;
         LOGGER.info("Bootstrapping metadata");
         MetadataNode.INSTANCE.initialize(this, 
ncExtensionManager.getMetadataTupleTranslatorProvider(),
                 ncExtensionManager.getMetadataExtensions());
 
-        proxy = (IAsterixStateProxy) getServiceContext().getDistributedState();
-        if (proxy == null) {
+        proxies = ((ConcurrentHashMap<CcId, IAsterixStateProxy>) 
getServiceContext().getDistributedState()).values();
+        if (proxies == null) {
             throw new IllegalStateException("Metadata node cannot access 
distributed state");
         }
 
         // This is a special case, we just give the metadataNode directly.
         // This way we can delay the registration of the metadataNode until
         // it is completely initialized.
-        MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
+        MetadataManager.initialize(proxies, MetadataNode.INSTANCE);
         MetadataBootstrap.startUniverse(getServiceContext(), newUniverse);
         MetadataBootstrap.startDDLRecovery();
         ncExtensionManager.initializeMetadata(getServiceContext());
@@ -440,7 +444,10 @@
             metadataNodeStub = (IMetadataNode) 
UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
                     getMetadataProperties().getMetadataPort());
         }
-        ((IAsterixStateProxy) 
getServiceContext().getDistributedState()).setMetadataNode(metadataNodeStub);
+        for (IAsterixStateProxy proxy : ((ConcurrentMap<CcId, 
IAsterixStateProxy>) getServiceContext()
+                .getDistributedState()).values()) {
+            proxy.setMetadataNode(metadataNodeStub);
+        }
     }
 
     @Override
@@ -472,7 +479,9 @@
                 if (hcc == null || !hcc.isConnected()) {
                     try {
                         NodeControllerService ncSrv = (NodeControllerService) 
ncServiceContext.getControllerService();
-                        ClusterControllerInfo ccInfo = 
ncSrv.getNodeParameters().getClusterControllerInfo();
+                        // TODO(mblow): multicc
+                        CcId primaryCcId = ncSrv.getPrimaryCcId();
+                        ClusterControllerInfo ccInfo = 
ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
                         hcc = new 
HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
                     } catch (Exception e) {
                         throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 980375d..47344cd 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -182,7 +182,7 @@
         if (metadataNodeId.equals(node)) {
             return;
         }
-        // if current metadata node is active, we need to unbind its metadata 
proxy object
+        // if current metadata node is active, we need to unbind its metadata 
proxies object
         if (clusterManager.isMetadataNodeActive()) {
             MetadataNodeRequestMessage msg = new 
MetadataNodeRequestMessage(false);
             try {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 1cefd42..89ff077 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -152,7 +152,7 @@
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", 
ccConfig.getClusterListenAddress());
+            System.setProperty("java.rmi.server.hostname", 
ccConfig.getClusterPublicAddress());
         }
         MetadataProperties metadataProperties = appCtx.getMetadataProperties();
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a23a763..2d52913 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -202,16 +202,14 @@
         // Request registration tasks from CC
         // TODO (mblow): multicc
         final NodeControllerService ncControllerService = 
(NodeControllerService) ncServiceCtx.getControllerService();
-        
RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(),
-                ncControllerService, NodeStatus.BOOTING, state);
+        
RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), 
ncControllerService,
+                NodeStatus.BOOTING, state);
         startupCompleted = true;
     }
 
     @Override
     public void onRegisterNode(CcId ccId) throws Exception {
-        // TODO (mblow): multicc
-        if (startupCompleted && ccId.equals(((NodeControllerService) 
ncServiceCtx.getControllerService())
-                .getPrimaryClusterController().getCcId())) {
+        if (startupCompleted) {
             /*
              * If the node completed its startup before, then this is a 
re-registration with
              * the CC and therefore the system state should be HEALTHY and the 
node status is ACTIVE
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 22458d3..7a74940 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -75,7 +75,7 @@
 
     @Override
     public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws 
Exception {
-        sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message);
+        sendMessageToCC(ncs.getPrimaryCcId(), message);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 4dd7463..54408d6 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -203,7 +203,7 @@
         CcApplicationContext ccApplicationContext = 
Mockito.mock(CcApplicationContext.class);
         ConfigManager configManager = new ConfigManager(null);
         IApplicationConfig applicationConfig = new 
ConfigManagerApplicationConfig(configManager);
-        ICCServiceContext iccServiceContext = 
Mockito.mock(CCServiceContext.class);
+        ICCServiceContext iccServiceContext = 
Mockito.mock(ICCServiceContext.class);
         
Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
         
Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 8578d6b..6aba9f4 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -21,6 +21,8 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -91,7 +93,7 @@
  */
 public class MetadataManager implements IMetadataManager {
     private final MetadataCache cache = new MetadataCache();
-    protected final IAsterixStateProxy proxy;
+    protected final Collection<IAsterixStateProxy> proxies;
     protected IMetadataNode metadataNode;
     private final ReadWriteLock metadataLatch;
     protected boolean rebindMetadataNode = false;
@@ -100,19 +102,19 @@
     // update field name accordingly
     public static IMetadataManager INSTANCE;
 
-    private MetadataManager(IAsterixStateProxy proxy, IMetadataNode 
metadataNode) {
-        this(proxy);
+    private MetadataManager(Collection<IAsterixStateProxy> proxies, 
IMetadataNode metadataNode) {
+        this(proxies);
         if (metadataNode == null) {
             throw new IllegalArgumentException("Null metadataNode given to 
MetadataManager");
         }
         this.metadataNode = metadataNode;
     }
 
-    private MetadataManager(IAsterixStateProxy proxy) {
-        if (proxy == null) {
-            throw new IllegalArgumentException("Null proxy given to 
MetadataManager");
+    private MetadataManager(Collection<IAsterixStateProxy> proxies) {
+        if (proxies == null || proxies.isEmpty()) {
+            throw new IllegalArgumentException("Null / empty list of proxies 
given to MetadataManager");
         }
-        this.proxy = proxy;
+        this.proxies = proxies;
         this.metadataLatch = new ReentrantReadWriteLock(true);
     }
 
@@ -1002,15 +1004,15 @@
         INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties);
     }
 
-    public static void initialize(IAsterixStateProxy proxy, MetadataNode 
metadataNode) {
-        INSTANCE = new MetadataManager(proxy, metadataNode);
+    public static void initialize(Collection<IAsterixStateProxy> proxies, 
MetadataNode metadataNode) {
+        INSTANCE = new MetadataManager(proxies, metadataNode);
     }
 
     private static class CCMetadataManagerImpl extends MetadataManager {
         private final MetadataProperties metadataProperties;
 
         public CCMetadataManagerImpl(IAsterixStateProxy proxy, 
MetadataProperties metadataProperties) {
-            super(proxy);
+            super(Collections.singleton(proxy));
             this.metadataProperties = metadataProperties;
         }
 
@@ -1020,8 +1022,8 @@
                 return;
             }
             try {
-                metadataNode =
-                        
proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), 
TimeUnit.SECONDS);
+                metadataNode = proxies.iterator().next()
+                        
.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), 
TimeUnit.SECONDS);
                 if (metadataNode != null) {
                     rebindMetadataNode = false;
                 } else {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 8ab9f82..5357fc8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -58,6 +58,7 @@
     @Override
     public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) 
throws InterruptedException {
         synchronized (this) {
+            //TODO(mblow): replace with nanoTime() to avoid being affected by 
system clock adjustments...
             long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit);
             while (metadataNode == null && timeToWait > 0) {
                 long startTime = System.currentTimeMillis();
diff --git 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index 406a762..201945c 100644
--- 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -22,27 +22,28 @@
 
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.parsers.SAXParser;
 import javax.xml.parsers.SAXParserFactory;
 import javax.xml.transform.sax.SAXSource;
 
 import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
 public class TestSuiteParser {
-    public TestSuiteParser() {
-    }
 
-    public org.apache.asterix.testframework.xml.TestSuite parse(File 
testSuiteCatalog) throws Exception {
+    public TestSuite parse(File testSuiteCatalog) throws SAXException, 
JAXBException, ParserConfigurationException {
         SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
         saxParserFactory.setNamespaceAware(true);
         saxParserFactory.setXIncludeAware(true);
         SAXParser saxParser = saxParserFactory.newSAXParser();
         saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
 
-        JAXBContext ctx = 
JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
+        JAXBContext ctx = JAXBContext.newInstance(TestSuite.class);
         Unmarshaller um = ctx.createUnmarshaller();
-        return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(
+        return (TestSuite) um.unmarshal(
                 new SAXSource(saxParser.getXMLReader(), new 
InputSource(testSuiteCatalog.toURI().toString())));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 3d69ddb..0e04dca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.client;
 
+import org.apache.hyracks.api.control.CcId;
+
 import java.io.Serializable;
 
 public class ClusterControllerInfo implements Serializable {
     private static final long serialVersionUID = 1L;
+
+    private final CcId ccId;
 
     private final String clientNetAddress;
 
@@ -29,12 +33,17 @@
 
     private final int webPort;
 
-    public ClusterControllerInfo(String clientNetAddress, int clientNetPort, 
int webPort) {
+    public ClusterControllerInfo(CcId ccId, String clientNetAddress, int 
clientNetPort, int webPort) {
+        this.ccId = ccId;
         this.clientNetAddress = clientNetAddress;
         this.clientNetPort = clientNetPort;
         this.webPort = webPort;
     }
 
+    public CcId getCcId() {
+        return ccId;
+    }
+
     public int getWebPort() {
         return webPort;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
index d2a254f..aee22c9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.config;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public interface IOptionType<T> {
@@ -26,6 +27,11 @@
      */
     T parse(String s);
 
+    /**
+     * @throws IllegalArgumentException when the supplied JSON node cannot be 
interpreted
+     */
+    T parse(JsonNode node);
+
     Class<T> targetType();
 
     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index c83366f..a520aef 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -80,7 +80,7 @@
 
     @Override
     public String toString() {
-        return "JID:" + id;
+        return "JID:[" + getCcId() + "]" + getIdOnly();
     }
 
     public static JobId parse(String str) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a6edd70..f8fe77f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -215,7 +215,7 @@
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort(),
+        info = new ClusterControllerInfo(ccId, 
ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
         timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
         jobLog.open();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 26245e1..a7a373a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 77ecbee..96f5f1b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -71,6 +71,7 @@
             params.setDistributedState(ccs.getContext().getDistributedState());
             
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+            params.setRegistrationId(reg.getRegistrationId());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Node registration failed", e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 9cf84dd..a95ae3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -73,6 +73,4 @@
     void getNodeControllerInfos() throws Exception;
 
     void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception;
-
-    CcId getCcId();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 986ca96..f57bd63 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -154,10 +154,7 @@
                 }
             } else {
                 registeredOptions.add(option);
-                optionSetters.put(option,
-                        (node, value,
-                                isDefault) -> correctedMap(option.section() == 
Section.NC ? node : null, isDefault)
-                                        .put(option, value));
+                optionSetters.put(option, (node, value, isDefault) -> 
correctedMap(node, isDefault).put(option, value));
                 if (LOGGER.isDebugEnabled()) {
                     optionSetters.put(option, (node, value, isDefault) -> 
LOGGER
                             .debug((isDefault ? "defaulting" : "setting ") + 
option.toIniString() + " to " + value));
@@ -352,17 +349,13 @@
     private void applyDefaults() {
         LOGGER.debug("applying defaults");
         sectionMap.forEach((key, value) -> {
-            if (key == Section.NC) {
-                value.values().forEach(option -> getNodeNames()
-                        .forEach(node -> 
getOrDefault(getNodeEffectiveMap(node), option, node)));
-                for (Map.Entry<String, Map<IOption, Object>> nodeMap : 
nodeSpecificMap.entrySet()) {
-                    value.values()
-                            .forEach(option -> getOrDefault(
-                                    new CompositeMap<>(nodeMap.getValue(), 
definedMap, new NoOpMapMutator()), option,
-                                    nodeMap.getKey()));
-                }
-            } else {
-                value.values().forEach(option -> 
getOrDefault(configurationMap, option, null));
+            value.values().forEach(
+                    option -> getNodeNames().forEach(node -> 
getOrDefault(getNodeEffectiveMap(node), option, node)));
+            for (Map.Entry<String, Map<IOption, Object>> nodeMap : 
nodeSpecificMap.entrySet()) {
+                value.values()
+                        .forEach(option -> getOrDefault(
+                                new CompositeMap<>(nodeMap.getValue(), 
definedMap, new NoOpMapMutator()), option,
+                                nodeMap.getKey()));
             }
         });
     }
@@ -473,9 +466,8 @@
                 }
             }
         });
-        extensionOptions.forEach((extension, options) -> {
-            options.forEach(option -> ini.add(extension, option.getKey(), 
option.getValue()));
-        });
+        extensionOptions.forEach((extension, options) -> options
+                .forEach(option -> ini.add(extension, option.getKey(), 
option.getValue())));
         return ini;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 3807a00..d9d6798 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -19,7 +19,12 @@
 package org.apache.hyracks.control.common.config;
 
 import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.util.StorageUtil;
@@ -40,6 +45,11 @@
                 throw new IllegalArgumentException("The given value: " + 
result1 + " is not within the int range.");
             }
             return (int) result1;
+        }
+
+        @Override
+        public Integer parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
         }
 
         @Override
@@ -65,6 +75,11 @@
         }
 
         @Override
+        public Long parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<Long> targetType() {
             return Long.class;
         }
@@ -84,10 +99,19 @@
         @Override
         public Short parse(String s) {
             int value = Integer.decode(s);
+            return validateShort(value);
+        }
+
+        private Short validateShort(int value) {
             if (Integer.highestOneBit(value) > 16) {
-                throw new IllegalArgumentException("The given value " + s + " 
is too big for a short");
+                throw new IllegalArgumentException("The given value " + value 
+ " is too big for a short");
             }
             return (short) value;
+        }
+
+        @Override
+        public Short parse(JsonNode node) {
+            return node.isNull() ? null : validateShort(node.asInt());
         }
 
         @Override
@@ -108,6 +132,11 @@
         }
 
         @Override
+        public Integer parse(JsonNode node) {
+            return node.isNull() ? null : node.asInt();
+        }
+
+        @Override
         public Class<Integer> targetType() {
             return Integer.class;
         }
@@ -122,6 +151,11 @@
         @Override
         public Double parse(String s) {
             return Double.parseDouble(s);
+        }
+
+        @Override
+        public Double parse(JsonNode node) {
+            return node.isNull() ? null : node.asDouble();
         }
 
         @Override
@@ -142,6 +176,11 @@
         }
 
         @Override
+        public String parse(JsonNode node) {
+            return node.isNull() ? null : node.asText();
+        }
+
+        @Override
         public Class<String> targetType() {
             return String.class;
         }
@@ -159,6 +198,11 @@
         }
 
         @Override
+        public Long parse(JsonNode node) {
+            return node.isNull() ? null : node.asLong();
+        }
+
+        @Override
         public Class<Long> targetType() {
             return Long.class;
         }
@@ -173,6 +217,11 @@
         @Override
         public Boolean parse(String s) {
             return Boolean.parseBoolean(s);
+        }
+
+        @Override
+        public Boolean parse(JsonNode node) {
+            return node.isNull() ? null : node.asBoolean();
         }
 
         @Override
@@ -197,6 +246,11 @@
                 throw new IllegalArgumentException("Unrecognized logging 
level: " + s);
             }
             return level;
+        }
+
+        @Override
+        public Level parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
         }
 
         @Override
@@ -227,6 +281,20 @@
         }
 
         @Override
+        public String[] parse(JsonNode node) {
+            if (node.isNull()) {
+                return null;
+            }
+            List<String> strings = new ArrayList<>();
+            if (node instanceof ArrayNode) {
+                node.elements().forEachRemaining(n -> strings.add(n.asText()));
+                return strings.toArray(new String[strings.size()]);
+            } else {
+                return parse(node.asText());
+            }
+        }
+
+        @Override
         public Class<String[]> targetType() {
             return String[].class;
         }
@@ -253,6 +321,11 @@
         }
 
         @Override
+        public java.net.URL parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<java.net.URL> targetType() {
             return java.net.URL.class;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 519bafc..75c0827 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -50,7 +50,6 @@
         NCSERVICE_PORT(INTEGER, 9090),
         CLUSTER_ADDRESS(STRING, (String) null),
         CLUSTER_PORT(INTEGER, 1099),
-        CLUSTER_CONTROLLER_ID(SHORT, (short) 0x0000),
         CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
         NODE_ID(STRING, (String) null),
@@ -144,8 +143,6 @@
                     return "Cluster Controller port";
                 case CLUSTER_LISTEN_PORT:
                     return "IP port to bind cluster listener";
-                case CLUSTER_CONTROLLER_ID:
-                    return "16-bit (0-65535) id of the Cluster Controller";
                 case CLUSTER_PUBLIC_ADDRESS:
                     return "Public IP Address to announce cluster listener";
                 case CLUSTER_PUBLIC_PORT:
@@ -311,10 +308,6 @@
 
     public void setClusterPort(int clusterPort) {
         configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
-    }
-
-    public CcId getClusterControllerId() {
-        return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
     }
 
     public String getClusterListenAddress() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index bf233a8..e78a423 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -33,6 +33,8 @@
 
     private int profileDumpPeriod;
 
+    private int registrationId;
+
     public ClusterControllerInfo getClusterControllerInfo() {
         return ccInfo;
     }
@@ -64,4 +66,12 @@
     public void setProfileDumpPeriod(int profileDumpPeriod) {
         this.profileDumpPeriod = profileDumpPeriod;
     }
+
+    public int getRegistrationId() {
+        return registrationId;
+    }
+
+    public void setRegistrationId(int registrationId) {
+        this.registrationId = registrationId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 75ef0b7..5cec0ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -74,6 +75,10 @@
 
     private final long maxJobId;
 
+    private final int registrationId;
+
+    private static final AtomicInteger nextRegistrationId = new 
AtomicInteger();
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, 
NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String 
osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String 
libraryPath, String bootClasspath,
@@ -101,6 +106,7 @@
         this.capacity = capacity;
         this.pid = pid;
         this.maxJobId = maxJobId;
+        this.registrationId = nextRegistrationId.getAndIncrement();
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -190,4 +196,8 @@
     public long getMaxJobId() {
         return maxJobId;
     }
+
+    public int getRegistrationId() {
+        return registrationId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0fdafe3..ae40ea3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -57,11 +57,9 @@
 
 public class ClusterControllerRemoteProxy implements IClusterController {
 
-    private final CcId ccId;
     private IIPCHandle ipcHandle;
 
-    public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
-        this.ccId = ccId;
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
         this.ipcHandle = ipcHandle;
     }
 
@@ -178,12 +176,7 @@
     }
 
     @Override
-    public CcId getCcId() {
-        return ccId;
-    }
-
-    @Override
     public String toString() {
-        return getClass().getSimpleName() + " " + ccId + " [" + 
ipcHandle.getRemoteAddress() + "]";
+        return getClass().getSimpleName() + " [" + 
ipcHandle.getRemoteAddress() + "]";
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
new file mode 100644
index 0000000..cdd9f74
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -0,0 +1,62 @@
+package org.apache.hyracks.control.nc;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CcConnection {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final IClusterController ccs;
+    private boolean registrationPending;
+    private Exception registrationException;
+    private NodeParameters nodeParameters;
+
+    CcConnection(IClusterController ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public String toString() {
+        return ccs.toString();
+    }
+
+    public CcId getCcId() {
+        return getNodeParameters().getClusterControllerInfo().getCcId();
+    }
+
+    synchronized void setNodeRegistrationResult(NodeParameters parameters, 
Exception exception) {
+        nodeParameters = parameters;
+        registrationException = exception;
+        registrationPending = false;
+        notifyAll();
+    }
+
+    public synchronized CcId registerNode(NodeRegistration nodeRegistration) 
throws Exception {
+        registrationPending = true;
+        ccs.registerNode(nodeRegistration);
+        while (registrationPending) {
+            wait();
+        }
+        if (registrationException != null) {
+            LOGGER.log(Level.WARN, "Registering with {} failed with 
exception", this, registrationException);
+            throw registrationException;
+        }
+        return getCcId();
+    }
+
+    public IClusterController getClusterControllerService() {
+        return ccs;
+    }
+
+    public NodeParameters getNodeParameters() {
+        return nodeParameters;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0e74a4c..8151004 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -108,7 +109,7 @@
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
     private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
-    private NCConfig ncConfig;
+    private final NCConfig ncConfig;
 
     private final String id;
 
@@ -128,13 +129,13 @@
 
     private final Timer timer;
 
-    private boolean registrationPending;
+    private CcId primaryCcId;
 
-    private Exception registrationException;
+    private final Map<CcId, CcConnection> ccMap = 
Collections.synchronizedMap(new HashMap<>());
 
-    private IClusterController primaryCcs;
+    private final Map<InetSocketAddress, CcId> ccAddressMap = 
Collections.synchronizedMap(new HashMap<>());
 
-    private final Map<CcId, IClusterController> ccsMap = 
Collections.synchronizedMap(new HashMap<>());
+    private final Map<Integer, CcConnection> pendingRegistrations = 
Collections.synchronizedMap(new HashMap<>());
 
     private final Map<JobId, Joblet> jobletMap;
 
@@ -144,11 +145,9 @@
 
     private ExecutorService executor;
 
-    private NodeParameters nodeParameters;
+    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
 
-    private Map<IClusterController, Thread> heartbeatThreads = new 
ConcurrentHashMap<>();
-
-    private Map<IClusterController, Timer> ccTimers = new 
ConcurrentHashMap<>();
+    private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
     private final ServerContext serverCtx;
 
@@ -179,8 +178,6 @@
     private MessagingNetworkManager messagingNetManager;
 
     private final ConfigManager configManager;
-
-    private NodeRegistration nodeRegistration;
 
     private final AtomicLong maxJobId = new AtomicLong(-1);
 
@@ -254,7 +251,7 @@
             }
             getNodeControllerInfosAcceptor.setValue(fv);
         }
-        primaryCcs.getNodeControllerInfos();
+        getPrimaryClusterController().getNodeControllerInfos();
         return fv.get();
     }
 
@@ -302,9 +299,7 @@
             messagingNetManager.start();
         }
 
-        final InetSocketAddress ccAddress =
-                new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort());
-        this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
+        this.primaryCcId = addCc(new 
InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
 
         workQueue.start();
 
@@ -315,64 +310,65 @@
         application.startupCompleted();
     }
 
-    public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress 
ccAddress) throws Exception {
-        ClusterControllerRemoteProxy ccProxy;
-        synchronized (ccsMap) {
-            if (ccsMap.containsKey(ccId)) {
-                throw new IllegalStateException("cc already registered: " + 
ccId);
-            }
-            final IIPCEventListener ipcEventListener = new IIPCEventListener() 
{
-                @Override
-                public void ipcHandleRestored(IIPCHandle handle) throws 
IPCException {
-                    // we need to re-register in case of NC -> CC connection 
reset
-                    try {
-                        registerNode(ccsMap.get(ccId));
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARN, "Failed Registering with cc", 
e);
-                        throw new IPCException(e);
-                    }
+    public CcId addCc(InetSocketAddress ccAddress) throws Exception {
+        if (ccAddress.isUnresolved()) {
+            throw new IllegalArgumentException("must used resolved 
InetSocketAddress");
+        }
+        if (ccAddressMap.containsKey(ccAddress)) {
+            throw new IllegalStateException("cc already registered: " + 
ccAddress);
+        }
+        final IIPCEventListener ipcEventListener = new IIPCEventListener() {
+            @Override
+            public void ipcHandleRestored(IIPCHandle handle) throws 
IPCException {
+                // we need to re-register in case of NC -> CC connection reset
+                try {
+                    registerNode(getCcConnection(ccAddressMap.get(ccAddress)), 
ccAddress);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+                    throw new IPCException(e);
                 }
-            };
-            ccProxy = new ClusterControllerRemoteProxy(ccId,
-                    ipc.getHandle(ccAddress, 
ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            registerNode(ccProxy);
-            ccsMap.put(ccId, ccProxy);
-        }
-        return ccProxy;
+            }
+        };
+        ClusterControllerRemoteProxy ccProxy = new 
ClusterControllerRemoteProxy(
+                ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 
1, ipcEventListener));
+        CcConnection ccc = new CcConnection(ccProxy);
+        return registerNode(ccc, ccAddress);
     }
 
-    public void makePrimaryCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            if (!ccsMap.containsKey(ccId)) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
-            }
-            primaryCcs = ccsMap.get(ccId);
+    public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception {
+        if (ccAddress.isUnresolved()) {
+            throw new IllegalArgumentException("must used resolved 
InetSocketAddress");
         }
+        CcId newPrimaryCc = ccAddressMap.get(ccAddress);
+        if (newPrimaryCc == null) {
+            throw new IllegalArgumentException("unknown cc: " + ccAddress);
+        }
+        this.primaryCcId = newPrimaryCc;
     }
 
-    public void removeCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            final IClusterController ccs = ccsMap.get(ccId);
-            if (ccs == null) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
-            }
-            if (primaryCcs.equals(ccs)) {
-                throw new IllegalStateException("cannot remove primary cc: " + 
ccId);
-            }
-            // TODO(mblow): consider how to handle running jobs
-            ccs.unregisterNode(id);
-            Thread hbThread = heartbeatThreads.remove(ccs);
-            hbThread.interrupt();
-            Timer ccTimer = ccTimers.remove(ccs);
-            if (ccTimer != null) {
-                ccTimer.cancel();
-            }
+    public void removeCc(InetSocketAddress ccAddress) throws Exception {
+        if (ccAddress.isUnresolved()) {
+            throw new IllegalArgumentException("must used resolved 
InetSocketAddress");
         }
+        CcId ccId = ccAddressMap.get(ccAddress);
+        if (primaryCcId.equals(ccId)) {
+            throw new IllegalStateException("cannot remove primary cc: " + 
ccAddress);
+        }
+        final CcConnection ccc = getCcConnection(ccId);
+        // TODO(mblow): consider how to handle running jobs
+        ccc.getClusterControllerService().unregisterNode(id);
+        Thread hbThread = heartbeatThreads.remove(ccId);
+        hbThread.interrupt();
+        Timer ccTimer = ccTimers.remove(ccId);
+        if (ccTimer != null) {
+            ccTimer.cancel();
+        }
+        ccMap.remove(ccId);
+        ccAddressMap.remove(ccAddress);
     }
 
-    protected void registerNode(IClusterController ccs) throws Exception {
-        LOGGER.info("Registering with Cluster Controller {}", ccs);
-        registrationPending = true;
+    protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
+        LOGGER.info("Registering with Cluster Controller {}", ccc);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -389,54 +385,67 @@
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress messagingAddress =
                 messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress() : null;
-        int allCores = osMXBean.getAvailableProcessors();
-        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, 
netAddress, datasetAddress, osMXBean.getName(),
-                osMXBean.getArch(), osMXBean.getVersion(), allCores, 
runtimeMXBean.getVmName(),
-                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(),
-                runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema, 
messagingAddress, application.getCapacity(),
-                PidHelper.getPid(), maxJobId.get());
+        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, 
id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
+                application.getCapacity(), PidHelper.getPid(), maxJobId.get());
 
-        ccs.registerNode(nodeRegistration);
-
-        completeNodeRegistration(ccs);
+        pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
+        CcId ccId = ccc.registerNode(nodeRegistration);
+        ccMap.put(ccId, ccc);
+        ccAddressMap.put(ccAddress, ccId);
+        getDistributedState().put(ccId, 
ccc.getNodeParameters().getDistributedState());
+        application.onRegisterNode(ccId);
+        IClusterController ccs = ccc.getClusterControllerService();
+        NodeParameters nodeParameters = ccc.getNodeParameters();
 
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccs)) {
+        if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread =
                     new Thread(new HeartbeatTask(ccs, 
nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
             heartbeatThread.setPriority(Thread.MAX_PRIORITY);
             heartbeatThread.setDaemon(true);
             heartbeatThread.start();
-            heartbeatThreads.put(ccs, heartbeatThread);
+            heartbeatThreads.put(ccId, heartbeatThread);
         }
-        if (!ccTimers.containsKey(ccs) && 
nodeParameters.getProfileDumpPeriod() > 0) {
-            Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+        if (!ccTimers.containsKey(ccId) && 
nodeParameters.getProfileDumpPeriod() > 0) {
+            Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
-            ccTimer.schedule(new ProfileDumpTask(ccs), 0, 
nodeParameters.getProfileDumpPeriod());
-            ccTimers.put(ccs, ccTimer);
+            ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, 
nodeParameters.getProfileDumpPeriod());
+            ccTimers.put(ccId, ccTimer);
         }
 
-        LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+        return ccId;
     }
 
-    synchronized void setNodeRegistrationResult(NodeParameters parameters, 
Exception exception) {
-        this.nodeParameters = parameters;
-        this.registrationException = exception;
-        this.registrationPending = false;
-        notifyAll();
+    void setNodeRegistrationResult(NodeParameters parameters, Exception 
exception) {
+        CcConnection ccc = getPendingNodeRegistration(parameters);
+        ccc.setNodeRegistrationResult(parameters, exception);
     }
 
-    private synchronized void completeNodeRegistration(IClusterController ccs) 
throws Exception {
-        while (registrationPending) {
-            wait();
+    private CcConnection getCcConnection(CcId ccId) {
+        CcConnection ccConnection = ccMap.get(ccId);
+        if (ccConnection == null) {
+            throw new IllegalArgumentException("unknown ccId: " + ccId);
         }
-        if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed 
with exception", registrationException);
-            throw registrationException;
+        return ccConnection;
+    }
+
+    private CcConnection getPendingNodeRegistration(NodeParameters 
nodeParameters) {
+        CcConnection ccConnection = 
pendingRegistrations.remove(nodeParameters.getRegistrationId());
+        if (ccConnection == null) {
+            throw new IllegalStateException("Unknown pending node registration 
" + nodeParameters.getRegistrationId()
+                    + " for " + 
nodeParameters.getClusterControllerInfo().getCcId());
         }
-        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-        application.onRegisterNode(ccs.getCcId());
+        return ccConnection;
+    }
+
+    private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
+        //noinspection unchecked
+        return (ConcurrentHashMap<CcId, Serializable>) 
serviceCtx.getDistributedState();
     }
 
     private void startApplication() throws Exception {
@@ -478,10 +487,10 @@
                 t.interrupt();
                 InvokeUtil.doUninterruptibly(() -> t.join(1000));
             });
-            synchronized (ccsMap) {
-                ccsMap.values().parallelStream().forEach(ccs -> {
+            synchronized (ccMap) {
+                ccMap.values().parallelStream().forEach(cc -> {
                     try {
-                        ccs.notifyShutdown(id);
+                        cc.getClusterControllerService().notifyShutdown(id);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Exception notifying CC of 
shutdown", e);
                     }
@@ -520,13 +529,8 @@
         jobParameterByteStoreMap.remove(jobId);
     }
 
-    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
throws HyracksException {
-        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
-        if (jpbs == null) {
-            jpbs = new JobParameterByteStore();
-            jobParameterByteStoreMap.put(jobId, jpbs);
-        }
-        return jpbs;
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
{
+        return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new 
JobParameterByteStore());
     }
 
     public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, 
ActivityClusterGraph acg)
@@ -550,7 +554,7 @@
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) throws HyracksException {
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) {
         return 
deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
@@ -566,16 +570,21 @@
         return partitionManager;
     }
 
+    public CcId getPrimaryCcId() {
+        // TODO(mblow): this can change at any time, need notification 
framework
+        return primaryCcId;
+    }
+
     public IClusterController getPrimaryClusterController() {
-        return primaryCcs;
+        return getClusterController(primaryCcId);
     }
 
     public IClusterController getClusterController(CcId ccId) {
-        return ccsMap.get(ccId);
+        return getCcConnection(ccId).getClusterControllerService();
     }
 
-    public NodeParameters getNodeParameters() {
-        return nodeParameters;
+    public NodeParameters getNodeParameters(CcId ccId) {
+        return getCcConnection(ccId).getNodeParameters();
     }
 
     @Override
@@ -691,17 +700,19 @@
     }
 
     private class ProfileDumpTask extends TimerTask {
-        private IClusterController cc;
+        private final IClusterController cc;
+        private final CcId ccId;
 
-        public ProfileDumpTask(IClusterController cc) {
+        public ProfileDumpTask(IClusterController cc, CcId ccId) {
             this.cc = cc;
+            this.ccId = ccId;
         }
 
         @Override
         public void run() {
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
-                BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
+                BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, ccId, fv);
                 workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
@@ -734,7 +745,7 @@
     }
 
     public void sendApplicationMessageToCC(CcId ccId, byte[] data, 
DeploymentId deploymentId) throws Exception {
-        ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+        getClusterController(ccId).sendApplicationMessageToCC(data, 
deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
@@ -759,4 +770,5 @@
     public Object getApplicationContext() {
         return application.getApplicationContext();
     }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6a75471..8733022 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -50,7 +51,7 @@
 
     public NCServiceContext(NodeControllerService ncs, ServerContext 
serverCtx, IOManager ioManager, String nodeId,
             MemoryManager memoryManager, ILifeCycleComponentManager 
lifeCyclecomponentManager,
-            IApplicationConfig appConfig) throws IOException {
+            IApplicationConfig appConfig) {
         super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -59,6 +60,7 @@
         this.ncs = ncs;
         this.sdh = lccm::dumpState;
         this.tracer = new Tracer(nodeId, 
ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry());
+        this.distributedState = new ConcurrentHashMap<>();
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2344
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mb...@apache.org>

Reply via email to