Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2344
Change subject: [NO ISSUE] More multi-CC support, ConfigManager updates ...................................................................... [NO ISSUE] More multi-CC support, ConfigManager updates - add ability for OptionTypes to natively parse JsonNodes - allow all options to be overridden at the NC level, not just NC options (i.e. common, cc) - accept controller id from the CC, avoid configuring this on NCs - update all CCs with metadata bootstrap, not just the primary CC Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java M asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java 25 files changed, 359 insertions(+), 186 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/44/2344/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index c554cbd..94f688f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -23,8 +23,11 @@ import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -77,6 +80,7 @@ import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; @@ -414,20 +418,20 @@ @Override public void initializeMetadata(boolean newUniverse) throws Exception { - IAsterixStateProxy proxy; + Collection<IAsterixStateProxy> proxies; LOGGER.info("Bootstrapping metadata"); MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(), ncExtensionManager.getMetadataExtensions()); - proxy = (IAsterixStateProxy) getServiceContext().getDistributedState(); - if (proxy == null) { + proxies = ((ConcurrentHashMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState()).values(); + if (proxies == null) { throw new IllegalStateException("Metadata node cannot access distributed state"); } // This is a special case, we just give the metadataNode directly. // This way we can delay the registration of the metadataNode until // it is completely initialized. - MetadataManager.initialize(proxy, MetadataNode.INSTANCE); + MetadataManager.initialize(proxies, MetadataNode.INSTANCE); MetadataBootstrap.startUniverse(getServiceContext(), newUniverse); MetadataBootstrap.startDDLRecovery(); ncExtensionManager.initializeMetadata(getServiceContext()); @@ -440,7 +444,10 @@ metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, getMetadataProperties().getMetadataPort()); } - ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(metadataNodeStub); + for (IAsterixStateProxy proxy : ((ConcurrentMap<CcId, IAsterixStateProxy>) getServiceContext() + .getDistributedState()).values()) { + proxy.setMetadataNode(metadataNodeStub); + } } @Override @@ -472,7 +479,9 @@ if (hcc == null || !hcc.isConnected()) { try { NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService(); - ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo(); + // TODO(mblow): multicc + CcId primaryCcId = ncSrv.getPrimaryCcId(); + ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo(); hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort()); } catch (Exception e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 980375d..47344cd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -182,7 +182,7 @@ if (metadataNodeId.equals(node)) { return; } - // if current metadata node is active, we need to unbind its metadata proxy object + // if current metadata node is active, we need to unbind its metadata proxies object if (clusterManager.isMetadataNodeActive()) { MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 1cefd42..89ff077 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -152,7 +152,7 @@ appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); if (System.getProperty("java.rmi.server.hostname") == null) { - System.setProperty("java.rmi.server.hostname", ccConfig.getClusterListenAddress()); + System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress()); } MetadataProperties metadataProperties = appCtx.getMetadataProperties(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index a23a763..2d52913 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -202,16 +202,14 @@ // Request registration tasks from CC // TODO (mblow): multicc final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService(); - RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(), - ncControllerService, NodeStatus.BOOTING, state); + RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), ncControllerService, + NodeStatus.BOOTING, state); startupCompleted = true; } @Override public void onRegisterNode(CcId ccId) throws Exception { - // TODO (mblow): multicc - if (startupCompleted && ccId.equals(((NodeControllerService) ncServiceCtx.getControllerService()) - .getPrimaryClusterController().getCcId())) { + if (startupCompleted) { /* * If the node completed its startup before, then this is a re-registration with * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java index 22458d3..7a74940 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -75,7 +75,7 @@ @Override public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception { - sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message); + sendMessageToCC(ncs.getPrimaryCcId(), message); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 4dd7463..54408d6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -203,7 +203,7 @@ CcApplicationContext ccApplicationContext = Mockito.mock(CcApplicationContext.class); ConfigManager configManager = new ConfigManager(null); IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager); - ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class); + ICCServiceContext iccServiceContext = Mockito.mock(ICCServiceContext.class); Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig); Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 8578d6b..6aba9f4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -21,6 +21,8 @@ import java.rmi.RemoteException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; @@ -91,7 +93,7 @@ */ public class MetadataManager implements IMetadataManager { private final MetadataCache cache = new MetadataCache(); - protected final IAsterixStateProxy proxy; + protected final Collection<IAsterixStateProxy> proxies; protected IMetadataNode metadataNode; private final ReadWriteLock metadataLatch; protected boolean rebindMetadataNode = false; @@ -100,19 +102,19 @@ // update field name accordingly public static IMetadataManager INSTANCE; - private MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) { - this(proxy); + private MetadataManager(Collection<IAsterixStateProxy> proxies, IMetadataNode metadataNode) { + this(proxies); if (metadataNode == null) { throw new IllegalArgumentException("Null metadataNode given to MetadataManager"); } this.metadataNode = metadataNode; } - private MetadataManager(IAsterixStateProxy proxy) { - if (proxy == null) { - throw new IllegalArgumentException("Null proxy given to MetadataManager"); + private MetadataManager(Collection<IAsterixStateProxy> proxies) { + if (proxies == null || proxies.isEmpty()) { + throw new IllegalArgumentException("Null / empty list of proxies given to MetadataManager"); } - this.proxy = proxy; + this.proxies = proxies; this.metadataLatch = new ReentrantReadWriteLock(true); } @@ -1002,15 +1004,15 @@ INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties); } - public static void initialize(IAsterixStateProxy proxy, MetadataNode metadataNode) { - INSTANCE = new MetadataManager(proxy, metadataNode); + public static void initialize(Collection<IAsterixStateProxy> proxies, MetadataNode metadataNode) { + INSTANCE = new MetadataManager(proxies, metadataNode); } private static class CCMetadataManagerImpl extends MetadataManager { private final MetadataProperties metadataProperties; public CCMetadataManagerImpl(IAsterixStateProxy proxy, MetadataProperties metadataProperties) { - super(proxy); + super(Collections.singleton(proxy)); this.metadataProperties = metadataProperties; } @@ -1020,8 +1022,8 @@ return; } try { - metadataNode = - proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS); + metadataNode = proxies.iterator().next() + .waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS); if (metadataNode != null) { rebindMetadataNode = false; } else { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java index 8ab9f82..5357fc8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java @@ -58,6 +58,7 @@ @Override public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws InterruptedException { synchronized (this) { + //TODO(mblow): replace with nanoTime() to avoid being affected by system clock adjustments... long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit); while (metadataNode == null && timeToWait > 0) { long startTime = System.currentTimeMillis(); diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java index 406a762..201945c 100644 --- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java +++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java @@ -22,27 +22,28 @@ import javax.xml.XMLConstants; import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; +import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import javax.xml.transform.sax.SAXSource; import org.xml.sax.InputSource; +import org.xml.sax.SAXException; public class TestSuiteParser { - public TestSuiteParser() { - } - public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception { + public TestSuite parse(File testSuiteCatalog) throws SAXException, JAXBException, ParserConfigurationException { SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); saxParserFactory.setNamespaceAware(true); saxParserFactory.setXIncludeAware(true); SAXParser saxParser = saxParserFactory.newSAXParser(); saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file"); - JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class); + JAXBContext ctx = JAXBContext.newInstance(TestSuite.class); Unmarshaller um = ctx.createUnmarshaller(); - return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal( + return (TestSuite) um.unmarshal( new SAXSource(saxParser.getXMLReader(), new InputSource(testSuiteCatalog.toURI().toString()))); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java index 3d69ddb..0e04dca 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java @@ -18,10 +18,14 @@ */ package org.apache.hyracks.api.client; +import org.apache.hyracks.api.control.CcId; + import java.io.Serializable; public class ClusterControllerInfo implements Serializable { private static final long serialVersionUID = 1L; + + private final CcId ccId; private final String clientNetAddress; @@ -29,12 +33,17 @@ private final int webPort; - public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) { + public ClusterControllerInfo(CcId ccId, String clientNetAddress, int clientNetPort, int webPort) { + this.ccId = ccId; this.clientNetAddress = clientNetAddress; this.clientNetPort = clientNetPort; this.webPort = webPort; } + public CcId getCcId() { + return ccId; + } + public int getWebPort() { return webPort; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java index d2a254f..aee22c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.api.config; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; public interface IOptionType<T> { @@ -26,6 +27,11 @@ */ T parse(String s); + /** + * @throws IllegalArgumentException when the supplied JSON node cannot be interpreted + */ + T parse(JsonNode node); + Class<T> targetType(); /** diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java index c83366f..a520aef 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java @@ -80,7 +80,7 @@ @Override public String toString() { - return "JID:" + id; + return "JID:[" + getCcId() + "]" + getIdOnly(); } public static JobId parse(String str) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index a6edd70..f8fe77f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -215,7 +215,7 @@ clusterIPC.start(); clientIPC.start(); webServer.start(); - info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(), + info = new ClusterControllerInfo(ccId, ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(), webServer.getListeningPort()); timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis()); jobLog.open(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java index 26245e1..a7a373a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ServiceContext; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 77ecbee..96f5f1b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -71,6 +71,7 @@ params.setDistributedState(ccs.getContext().getDistributedState()); params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis()); params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod()); + params.setRegistrationId(reg.getRegistrationId()); result = new CCNCFunctions.NodeRegistrationResult(params, null); } catch (Exception e) { LOGGER.log(Level.WARN, "Node registration failed", e); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 9cf84dd..a95ae3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -73,6 +73,4 @@ void getNodeControllerInfos() throws Exception; void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; - - CcId getCcId(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java index 986ca96..f57bd63 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java @@ -154,10 +154,7 @@ } } else { registeredOptions.add(option); - optionSetters.put(option, - (node, value, - isDefault) -> correctedMap(option.section() == Section.NC ? node : null, isDefault) - .put(option, value)); + optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value)); if (LOGGER.isDebugEnabled()) { optionSetters.put(option, (node, value, isDefault) -> LOGGER .debug((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value)); @@ -352,17 +349,13 @@ private void applyDefaults() { LOGGER.debug("applying defaults"); sectionMap.forEach((key, value) -> { - if (key == Section.NC) { - value.values().forEach(option -> getNodeNames() - .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node))); - for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) { - value.values() - .forEach(option -> getOrDefault( - new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option, - nodeMap.getKey())); - } - } else { - value.values().forEach(option -> getOrDefault(configurationMap, option, null)); + value.values().forEach( + option -> getNodeNames().forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node))); + for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) { + value.values() + .forEach(option -> getOrDefault( + new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option, + nodeMap.getKey())); } }); } @@ -473,9 +466,8 @@ } } }); - extensionOptions.forEach((extension, options) -> { - options.forEach(option -> ini.add(extension, option.getKey(), option.getValue())); - }); + extensionOptions.forEach((extension, options) -> options + .forEach(option -> ini.add(extension, option.getKey(), option.getValue()))); return ini; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java index 3807a00..d9d6798 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java @@ -19,7 +19,12 @@ package org.apache.hyracks.control.common.config; import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.util.StorageUtil; @@ -40,6 +45,11 @@ throw new IllegalArgumentException("The given value: " + result1 + " is not within the int range."); } return (int) result1; + } + + @Override + public Integer parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); } @Override @@ -65,6 +75,11 @@ } @Override + public Long parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<Long> targetType() { return Long.class; } @@ -84,10 +99,19 @@ @Override public Short parse(String s) { int value = Integer.decode(s); + return validateShort(value); + } + + private Short validateShort(int value) { if (Integer.highestOneBit(value) > 16) { - throw new IllegalArgumentException("The given value " + s + " is too big for a short"); + throw new IllegalArgumentException("The given value " + value + " is too big for a short"); } return (short) value; + } + + @Override + public Short parse(JsonNode node) { + return node.isNull() ? null : validateShort(node.asInt()); } @Override @@ -108,6 +132,11 @@ } @Override + public Integer parse(JsonNode node) { + return node.isNull() ? null : node.asInt(); + } + + @Override public Class<Integer> targetType() { return Integer.class; } @@ -122,6 +151,11 @@ @Override public Double parse(String s) { return Double.parseDouble(s); + } + + @Override + public Double parse(JsonNode node) { + return node.isNull() ? null : node.asDouble(); } @Override @@ -142,6 +176,11 @@ } @Override + public String parse(JsonNode node) { + return node.isNull() ? null : node.asText(); + } + + @Override public Class<String> targetType() { return String.class; } @@ -159,6 +198,11 @@ } @Override + public Long parse(JsonNode node) { + return node.isNull() ? null : node.asLong(); + } + + @Override public Class<Long> targetType() { return Long.class; } @@ -173,6 +217,11 @@ @Override public Boolean parse(String s) { return Boolean.parseBoolean(s); + } + + @Override + public Boolean parse(JsonNode node) { + return node.isNull() ? null : node.asBoolean(); } @Override @@ -197,6 +246,11 @@ throw new IllegalArgumentException("Unrecognized logging level: " + s); } return level; + } + + @Override + public Level parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); } @Override @@ -227,6 +281,20 @@ } @Override + public String[] parse(JsonNode node) { + if (node.isNull()) { + return null; + } + List<String> strings = new ArrayList<>(); + if (node instanceof ArrayNode) { + node.elements().forEachRemaining(n -> strings.add(n.asText())); + return strings.toArray(new String[strings.size()]); + } else { + return parse(node.asText()); + } + } + + @Override public Class<String[]> targetType() { return String[].class; } @@ -253,6 +321,11 @@ } @Override + public java.net.URL parse(JsonNode node) { + return node.isNull() ? null : parse(node.asText()); + } + + @Override public Class<java.net.URL> targetType() { return java.net.URL.class; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 519bafc..75c0827 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -50,7 +50,6 @@ NCSERVICE_PORT(INTEGER, 9090), CLUSTER_ADDRESS(STRING, (String) null), CLUSTER_PORT(INTEGER, 1099), - CLUSTER_CONTROLLER_ID(SHORT, (short) 0x0000), CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS), CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT), NODE_ID(STRING, (String) null), @@ -144,8 +143,6 @@ return "Cluster Controller port"; case CLUSTER_LISTEN_PORT: return "IP port to bind cluster listener"; - case CLUSTER_CONTROLLER_ID: - return "16-bit (0-65535) id of the Cluster Controller"; case CLUSTER_PUBLIC_ADDRESS: return "Public IP Address to announce cluster listener"; case CLUSTER_PUBLIC_PORT: @@ -311,10 +308,6 @@ public void setClusterPort(int clusterPort) { configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort); - } - - public CcId getClusterControllerId() { - return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID)); } public String getClusterListenAddress() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java index bf233a8..e78a423 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java @@ -33,6 +33,8 @@ private int profileDumpPeriod; + private int registrationId; + public ClusterControllerInfo getClusterControllerInfo() { return ccInfo; } @@ -64,4 +66,12 @@ public void setProfileDumpPeriod(int profileDumpPeriod) { this.profileDumpPeriod = profileDumpPeriod; } + + public int getRegistrationId() { + return registrationId; + } + + public void setRegistrationId(int registrationId) { + this.registrationId = registrationId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java index 75ef0b7..5cec0ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -74,6 +75,10 @@ private final long maxJobId; + private final int registrationId; + + private static final AtomicInteger nextRegistrationId = new AtomicInteger(); + public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort, NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath, @@ -101,6 +106,7 @@ this.capacity = capacity; this.pid = pid; this.maxJobId = maxJobId; + this.registrationId = nextRegistrationId.getAndIncrement(); } public InetSocketAddress getNodeControllerAddress() { @@ -190,4 +196,8 @@ public long getMaxJobId() { return maxJobId; } + + public int getRegistrationId() { + return registrationId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 0fdafe3..ae40ea3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -57,11 +57,9 @@ public class ClusterControllerRemoteProxy implements IClusterController { - private final CcId ccId; private IIPCHandle ipcHandle; - public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) { - this.ccId = ccId; + public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) { this.ipcHandle = ipcHandle; } @@ -178,12 +176,7 @@ } @Override - public CcId getCcId() { - return ccId; - } - - @Override public String toString() { - return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]"; + return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]"; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java new file mode 100644 index 0000000..cdd9f74 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java @@ -0,0 +1,62 @@ +package org.apache.hyracks.control.nc; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.control.common.base.IClusterController; +import org.apache.hyracks.control.common.controllers.NodeParameters; +import org.apache.hyracks.control.common.controllers.NodeRegistration; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; + +public class CcConnection { + private static final Logger LOGGER = LogManager.getLogger(); + + private final IClusterController ccs; + private boolean registrationPending; + private Exception registrationException; + private NodeParameters nodeParameters; + + CcConnection(IClusterController ccs) { + this.ccs = ccs; + } + + @Override + public String toString() { + return ccs.toString(); + } + + public CcId getCcId() { + return getNodeParameters().getClusterControllerInfo().getCcId(); + } + + synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + nodeParameters = parameters; + registrationException = exception; + registrationPending = false; + notifyAll(); + } + + public synchronized CcId registerNode(NodeRegistration nodeRegistration) throws Exception { + registrationPending = true; + ccs.registerNode(nodeRegistration); + while (registrationPending) { + wait(); + } + if (registrationException != null) { + LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException); + throw registrationException; + } + return getCcId(); + } + + public IClusterController getClusterControllerService() { + return ccs; + } + + public NodeParameters getNodeParameters() { + return nodeParameters; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 0e74a4c..8151004 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; @@ -108,7 +109,7 @@ private static final double MEMORY_FUDGE_FACTOR = 0.8; private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1); - private NCConfig ncConfig; + private final NCConfig ncConfig; private final String id; @@ -128,13 +129,13 @@ private final Timer timer; - private boolean registrationPending; + private CcId primaryCcId; - private Exception registrationException; + private final Map<CcId, CcConnection> ccMap = Collections.synchronizedMap(new HashMap<>()); - private IClusterController primaryCcs; + private final Map<InetSocketAddress, CcId> ccAddressMap = Collections.synchronizedMap(new HashMap<>()); - private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>()); + private final Map<Integer, CcConnection> pendingRegistrations = Collections.synchronizedMap(new HashMap<>()); private final Map<JobId, Joblet> jobletMap; @@ -144,11 +145,9 @@ private ExecutorService executor; - private NodeParameters nodeParameters; + private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>(); - private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>(); - - private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>(); + private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>(); private final ServerContext serverCtx; @@ -179,8 +178,6 @@ private MessagingNetworkManager messagingNetManager; private final ConfigManager configManager; - - private NodeRegistration nodeRegistration; private final AtomicLong maxJobId = new AtomicLong(-1); @@ -254,7 +251,7 @@ } getNodeControllerInfosAcceptor.setValue(fv); } - primaryCcs.getNodeControllerInfos(); + getPrimaryClusterController().getNodeControllerInfos(); return fv.get(); } @@ -302,9 +299,7 @@ messagingNetManager.start(); } - final InetSocketAddress ccAddress = - new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()); - this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress); + this.primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort())); workQueue.start(); @@ -315,64 +310,65 @@ application.startupCompleted(); } - public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception { - ClusterControllerRemoteProxy ccProxy; - synchronized (ccsMap) { - if (ccsMap.containsKey(ccId)) { - throw new IllegalStateException("cc already registered: " + ccId); - } - final IIPCEventListener ipcEventListener = new IIPCEventListener() { - @Override - public void ipcHandleRestored(IIPCHandle handle) throws IPCException { - // we need to re-register in case of NC -> CC connection reset - try { - registerNode(ccsMap.get(ccId)); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failed Registering with cc", e); - throw new IPCException(e); - } + public CcId addCc(InetSocketAddress ccAddress) throws Exception { + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must used resolved InetSocketAddress"); + } + if (ccAddressMap.containsKey(ccAddress)) { + throw new IllegalStateException("cc already registered: " + ccAddress); + } + final IIPCEventListener ipcEventListener = new IIPCEventListener() { + @Override + public void ipcHandleRestored(IIPCHandle handle) throws IPCException { + // we need to re-register in case of NC -> CC connection reset + try { + registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failed Registering with cc", e); + throw new IPCException(e); } - }; - ccProxy = new ClusterControllerRemoteProxy(ccId, - ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener)); - registerNode(ccProxy); - ccsMap.put(ccId, ccProxy); - } - return ccProxy; + } + }; + ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy( + ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener)); + CcConnection ccc = new CcConnection(ccProxy); + return registerNode(ccc, ccAddress); } - public void makePrimaryCc(CcId ccId) throws Exception { - synchronized (ccsMap) { - if (!ccsMap.containsKey(ccId)) { - throw new IllegalArgumentException("unknown cc: " + ccId); - } - primaryCcs = ccsMap.get(ccId); + public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception { + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must used resolved InetSocketAddress"); } + CcId newPrimaryCc = ccAddressMap.get(ccAddress); + if (newPrimaryCc == null) { + throw new IllegalArgumentException("unknown cc: " + ccAddress); + } + this.primaryCcId = newPrimaryCc; } - public void removeCc(CcId ccId) throws Exception { - synchronized (ccsMap) { - final IClusterController ccs = ccsMap.get(ccId); - if (ccs == null) { - throw new IllegalArgumentException("unknown cc: " + ccId); - } - if (primaryCcs.equals(ccs)) { - throw new IllegalStateException("cannot remove primary cc: " + ccId); - } - // TODO(mblow): consider how to handle running jobs - ccs.unregisterNode(id); - Thread hbThread = heartbeatThreads.remove(ccs); - hbThread.interrupt(); - Timer ccTimer = ccTimers.remove(ccs); - if (ccTimer != null) { - ccTimer.cancel(); - } + public void removeCc(InetSocketAddress ccAddress) throws Exception { + if (ccAddress.isUnresolved()) { + throw new IllegalArgumentException("must used resolved InetSocketAddress"); } + CcId ccId = ccAddressMap.get(ccAddress); + if (primaryCcId.equals(ccId)) { + throw new IllegalStateException("cannot remove primary cc: " + ccAddress); + } + final CcConnection ccc = getCcConnection(ccId); + // TODO(mblow): consider how to handle running jobs + ccc.getClusterControllerService().unregisterNode(id); + Thread hbThread = heartbeatThreads.remove(ccId); + hbThread.interrupt(); + Timer ccTimer = ccTimers.remove(ccId); + if (ccTimer != null) { + ccTimer.cancel(); + } + ccMap.remove(ccId); + ccAddressMap.remove(ccAddress); } - protected void registerNode(IClusterController ccs) throws Exception { - LOGGER.info("Registering with Cluster Controller {}", ccs); - registrationPending = true; + protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception { + LOGGER.info("Registering with Cluster Controller {}", ccc); HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()]; for (int i = 0; i < gcInfos.length; ++i) { gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName()); @@ -389,54 +385,67 @@ NetworkAddress netAddress = netManager.getPublicNetworkAddress(); NetworkAddress messagingAddress = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; - int allCores = osMXBean.getAvailableProcessors(); - nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(), - osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(), - runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), - runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(), - PidHelper.getPid(), maxJobId.get()); + NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), + runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), + runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), + runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, + application.getCapacity(), PidHelper.getPid(), maxJobId.get()); - ccs.registerNode(nodeRegistration); - - completeNodeRegistration(ccs); + pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc); + CcId ccId = ccc.registerNode(nodeRegistration); + ccMap.put(ccId, ccc); + ccAddressMap.put(ccAddress, ccId); + getDistributedState().put(ccId, ccc.getNodeParameters().getDistributedState()); + application.onRegisterNode(ccId); + IClusterController ccs = ccc.getClusterControllerService(); + NodeParameters nodeParameters = ccc.getNodeParameters(); // Start heartbeat generator. - if (!heartbeatThreads.containsKey(ccs)) { + if (!heartbeatThreads.containsKey(ccId)) { Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat"); heartbeatThread.setPriority(Thread.MAX_PRIORITY); heartbeatThread.setDaemon(true); heartbeatThread.start(); - heartbeatThreads.put(ccs, heartbeatThread); + heartbeatThreads.put(ccId, heartbeatThread); } - if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) { - Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true); + if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) { + Timer ccTimer = new Timer("Timer-" + ccId, true); // Schedule profile dump generator. - ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod()); - ccTimers.put(ccs, ccTimer); + ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod()); + ccTimers.put(ccId, ccTimer); } - LOGGER.info("Registering with Cluster Controller {} complete", ccs); + LOGGER.info("Registering with Cluster Controller {} complete", ccc); + return ccId; } - synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { - this.nodeParameters = parameters; - this.registrationException = exception; - this.registrationPending = false; - notifyAll(); + void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + CcConnection ccc = getPendingNodeRegistration(parameters); + ccc.setNodeRegistrationResult(parameters, exception); } - private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception { - while (registrationPending) { - wait(); + private CcConnection getCcConnection(CcId ccId) { + CcConnection ccConnection = ccMap.get(ccId); + if (ccConnection == null) { + throw new IllegalArgumentException("unknown ccId: " + ccId); } - if (registrationException != null) { - LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException); - throw registrationException; + return ccConnection; + } + + private CcConnection getPendingNodeRegistration(NodeParameters nodeParameters) { + CcConnection ccConnection = pendingRegistrations.remove(nodeParameters.getRegistrationId()); + if (ccConnection == null) { + throw new IllegalStateException("Unknown pending node registration " + nodeParameters.getRegistrationId() + + " for " + nodeParameters.getClusterControllerInfo().getCcId()); } - serviceCtx.setDistributedState(nodeParameters.getDistributedState()); - application.onRegisterNode(ccs.getCcId()); + return ccConnection; + } + + private ConcurrentHashMap<CcId, Serializable> getDistributedState() { + //noinspection unchecked + return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState(); } private void startApplication() throws Exception { @@ -478,10 +487,10 @@ t.interrupt(); InvokeUtil.doUninterruptibly(() -> t.join(1000)); }); - synchronized (ccsMap) { - ccsMap.values().parallelStream().forEach(ccs -> { + synchronized (ccMap) { + ccMap.values().parallelStream().forEach(cc -> { try { - ccs.notifyShutdown(id); + cc.getClusterControllerService().notifyShutdown(id); } catch (Exception e) { LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); } @@ -520,13 +529,8 @@ jobParameterByteStoreMap.remove(jobId); } - public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { - JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId); - if (jpbs == null) { - jpbs = new JobParameterByteStore(); - jobParameterByteStoreMap.put(jobId, jpbs); - } - return jpbs; + public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) { + return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new JobParameterByteStore()); } public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg) @@ -550,7 +554,7 @@ } } - public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException { + public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) { return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()); } @@ -566,16 +570,21 @@ return partitionManager; } + public CcId getPrimaryCcId() { + // TODO(mblow): this can change at any time, need notification framework + return primaryCcId; + } + public IClusterController getPrimaryClusterController() { - return primaryCcs; + return getClusterController(primaryCcId); } public IClusterController getClusterController(CcId ccId) { - return ccsMap.get(ccId); + return getCcConnection(ccId).getClusterControllerService(); } - public NodeParameters getNodeParameters() { - return nodeParameters; + public NodeParameters getNodeParameters(CcId ccId) { + return getCcConnection(ccId).getNodeParameters(); } @Override @@ -691,17 +700,19 @@ } private class ProfileDumpTask extends TimerTask { - private IClusterController cc; + private final IClusterController cc; + private final CcId ccId; - public ProfileDumpTask(IClusterController cc) { + public ProfileDumpTask(IClusterController cc, CcId ccId) { this.cc = cc; + this.ccId = ccId; } @Override public void run() { try { FutureValue<List<JobProfile>> fv = new FutureValue<>(); - BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv); + BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, ccId, fv); workQueue.scheduleAndSync(bjpw); List<JobProfile> profiles = fv.get(); if (!profiles.isEmpty()) { @@ -734,7 +745,7 @@ } public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception { - ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id); + getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id); } public IDatasetPartitionManager getDatasetPartitionManager() { @@ -759,4 +770,5 @@ public Object getApplicationContext() { return application.getApplicationContext(); } + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java index 6a75471..8733022 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IStateDumpHandler; @@ -50,7 +51,7 @@ public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager, String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager, - IApplicationConfig appConfig) throws IOException { + IApplicationConfig appConfig) { super(serverCtx, appConfig, new HyracksThreadFactory(nodeId)); this.lccm = lifeCyclecomponentManager; this.nodeId = nodeId; @@ -59,6 +60,7 @@ this.ncs = ncs; this.sdh = lccm::dumpState; this.tracer = new Tracer(nodeId, ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry()); + this.distributedState = new ConcurrentHashMap<>(); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/2344 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>