Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1830
Change subject: Re-register NC with CC on reestablished IPCHandle
......................................................................
Re-register NC with CC on reestablished IPCHandle
In case of failed NC -> CC IPCHandle due to CC crash/restart, the NC
shoud re-register with the CC in order to rejoin the cluster. The CC
will ignore heartbeats from unregistered nodes. This assumes
calling StartupTaskRequestMessage is idempotent. Also improve toString
on IPCHandle.
Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
9 files changed, 104 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/30/1830/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 107b859..7c8e153 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -67,7 +67,8 @@
protected INCServiceContext ncServiceCtx;
private INcApplicationContext runtimeContext;
private String nodeId;
- private boolean stopInitiated = false;
+ private boolean stopInitiated;
+ private boolean startupCompleted;
private SystemState systemState;
protected WebManager webManager;
@@ -190,6 +191,15 @@
}
// Request startup tasks from CC
StartupTaskRequestMessage.send((NodeControllerService)
ncServiceCtx.getControllerService(), systemState);
+ startupCompleted = true;
+ }
+
+ @Override
+ public void onRegisterNode() throws Exception {
+ if (startupCompleted) {
+ // Request startup tasks from CC
+ StartupTaskRequestMessage.send((NodeControllerService)
ncServiceCtx.getControllerService(), systemState);
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 02416c4..64f4e29 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -33,4 +33,5 @@
*/
IFileDeviceResolver getFileDeviceResolver();
+ void onRegisterNode() throws Exception;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
new file mode 100644
index 0000000..b0fa9c7
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.service;
+
+public interface INodeControllerService extends IControllerService {
+ void registerNode() throws Exception;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 98d258f..58ebc98 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.INodeControllerService;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -37,19 +38,34 @@
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
public class ClusterControllerRemoteProxy extends ControllerRemoteProxy
implements IClusterController {
private static final Logger LOGGER =
Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
+ private final INodeControllerService ncs;
private final int clusterConnectRetries;
- public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress
inetSocketAddress, int clusterConnectRetries) {
+ public ClusterControllerRemoteProxy(INodeControllerService ncs, IPCSystem
ipc, InetSocketAddress inetSocketAddress,
+ int clusterConnectRetries) {
super(ipc, inetSocketAddress);
+ this.ncs = ncs;
this.clusterConnectRetries = clusterConnectRetries;
}
@Override
+ protected void onIpcRestored() throws IPCException {
+ super.onIpcRestored();
+ // we need to re-register in case the NC -> CC connection was due to
CC crash
+ try {
+ ncs.registerNode();
+ } catch (Exception e) {
+ throw new IPCException(e);
+ }
+ }
+
+ @Override
protected int getRetries(boolean first) {
return first ? clusterConnectRetries : 0;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 44b0e4a..6d0e1be 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -39,16 +39,24 @@
final boolean first = ipcHandle == null;
if (first || !ipcHandle.isConnected()) {
if (!first) {
- getLogger().warning("ipcHandle " + ipcHandle + " disconnected;
retrying connection");
+ onIpcDisconnected();
}
ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
if (!first && ipcHandle.isConnected()) {
- getLogger().warning("ipcHandle " + ipcHandle + " restored");
+ onIpcRestored();
}
}
return ipcHandle;
}
+ protected void onIpcDisconnected() throws IPCException {
+ getLogger().warning("ipcHandle " + ipcHandle + " disconnected;
retrying connection");
+ }
+
+ protected void onIpcRestored() throws IPCException {
+ getLogger().warning("ipcHandle " + ipcHandle + " restored");
+ }
+
protected abstract int getRetries(boolean first);
protected abstract Logger getLogger();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 20f6378..5afc98d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -53,6 +53,11 @@
}
@Override
+ public void onRegisterNode() throws Exception {
+ // no-op
+ }
+
+ @Override
public void stop() throws Exception {
// no-op
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0587a55..ee97ccb 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -54,7 +54,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.service.INodeControllerService;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
@@ -86,7 +86,7 @@
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
import org.kohsuke.args4j.CmdLineException;
-public class NodeControllerService implements IControllerService {
+public class NodeControllerService implements INodeControllerService {
private static final Logger LOGGER =
Logger.getLogger(NodeControllerService.class.getName());
private static final double MEMORY_FUDGE_FACTOR = 0.8;
@@ -156,6 +156,8 @@
private MessagingNetworkManager messagingNetManager;
private final ConfigManager configManager;
+
+ private NodeRegistration nodeRegistration;
public NodeControllerService(NCConfig config) throws Exception {
this(config, getApplication(config));
@@ -278,7 +280,7 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- this.ccs = new ClusterControllerRemoteProxy(ipc,
+ this.ccs = new ClusterControllerRemoteProxy(this, ipc,
new InetSocketAddress(ncConfig.getClusterAddress(),
ncConfig.getClusterPort()),
ncConfig.getClusterConnectRetries());
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -292,21 +294,13 @@
NetworkAddress meesagingPort = messagingNetManager != null ?
messagingNetManager.getPublicNetworkAddress()
: null;
int allCores = osMXBean.getAvailableProcessors();
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress, datasetAddress,
+ nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress, datasetAddress,
osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
allCores,
runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(),
runtimeMXBean.getVmVendor(),
runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(),
runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(),
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
- application.getCapacity(), PidHelper.getPid()));
-
- synchronized (this) {
- while (registrationPending) {
- wait();
- }
- }
- if (registrationException != null) {
- throw registrationException;
- }
+ application.getCapacity(), PidHelper.getPid());
+ registerNode();
serviceCtx.setDistributedState(nodeParameters.getDistributedState());
workQueue.start();
@@ -330,6 +324,25 @@
application.startupCompleted();
}
+ @Override
+ public void registerNode() throws Exception {
+ LOGGER.info("Registering with Cluster Controller");
+ ccs.registerNode(nodeRegistration);
+
+ synchronized (this) {
+ while (registrationPending) {
+ wait();
+ }
+ }
+ if (registrationException != null) {
+ LOGGER.log(Level.WARNING, "Registering with Cluster Controller
failed with exception",
+ registrationException);
+ throw registrationException;
+ }
+ application.onRegisterNode();
+ LOGGER.info("Registering with Cluster Controller complete");
+ }
+
private void startApplication() throws Exception {
serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id,
memoryManager, lccm, ncConfig.getAppConfig());
application.start(serviceCtx, ncConfig.getAppArgsArray());
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index b7aa342..2967039 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -40,6 +40,11 @@
}
@Override
+ public void onRegisterNode() throws Exception {
+ // No-op
+ }
+
+ @Override
public void stop() throws Exception {
// No-op
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 1fcdb3c..efd9830 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -192,4 +192,9 @@
boolean full() {
return full;
}
+
+ @Override
+ public String toString() {
+ return "IPCHandle [addr=" + remoteAddress + " state=" + state + "]";
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1830
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>