Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1828
Change subject: Attempt to reconnect IPCHandle on connection failure
......................................................................
Attempt to reconnect IPCHandle on connection failure
IPCHandles can become invalid due to network interruption or node
crash/restart. Automatically retry connection in event of attempt
to use disconnected handle.
Change-Id: I069dcd59898021054462c8213fb623df2deec598
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
5 files changed, 84 insertions(+), 43 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/28/1828/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 79033d8..dc7bad0 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,8 +54,8 @@
CCNCFunctions.NodeRegistrationResult result;
Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
- INodeController nodeController = new
NodeControllerRemoteProxy(ncIPCHandle);
- NodeControllerState state = new
NodeControllerState(nodeController, reg);
+ INodeController nc = new
NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+ NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
IApplicationConfig cfg =
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 83ef32b..1e4f786 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,9 @@
*/
package org.apache.hyracks.control.common.ipc;
+import java.net.InetSocketAddress;
import java.util.List;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -34,24 +36,47 @@
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
public class ClusterControllerRemoteProxy implements IClusterController {
- private final IIPCHandle ipcHandle;
+ private static final Logger LOGGER =
Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
- public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
- this.ipcHandle = ipcHandle;
+ private final IPCSystem ipc;
+ private final InetSocketAddress inetSocketAddress;
+ private final int clusterConnectRetries;
+ private IIPCHandle ipcHandle;
+
+ public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress
inetSocketAddress, int clusterConnectRetries) {
+ this.ipc = ipc;
+ this.inetSocketAddress = inetSocketAddress;
+ this.clusterConnectRetries = clusterConnectRetries;
+ }
+
+ private IIPCHandle ensureIpcHandle() throws IPCException {
+ final boolean first = ipcHandle == null;
+ if (first || !ipcHandle.isConnected()) {
+ if (!first) {
+ LOGGER.warning("ipcHandle " + ipcHandle + " disconnected;
retrying connection");
+ }
+ ipcHandle = ipc.getHandle(inetSocketAddress, first ?
clusterConnectRetries : 0);
+ if (!first && ipcHandle.isConnected()) {
+ LOGGER.warning("ipcHandle " + ipcHandle + " restored");
+ }
+ }
+ return ipcHandle;
}
@Override
public void registerNode(NodeRegistration reg) throws Exception {
CCNCFunctions.RegisterNodeFunction fn = new
CCNCFunctions.RegisterNodeFunction(reg);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
CCNCFunctions.UnregisterNodeFunction fn = new
CCNCFunctions.UnregisterNodeFunction(nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
@@ -59,7 +84,7 @@
throws Exception {
CCNCFunctions.NotifyTaskCompleteFunction fn = new
CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
nodeId, statistics);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
@@ -67,53 +92,53 @@
throws Exception {
CCNCFunctions.NotifyTaskFailureFunction fn = new
CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
exceptions);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws
Exception {
CCNCFunctions.NotifyJobletCleanupFunction fn = new
CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyDeployBinary(DeploymentId deploymentId, String nodeId,
DeploymentStatus status) throws Exception {
CCNCFunctions.NotifyDeployBinaryFunction fn = new
CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
nodeId, status);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws
Exception {
CCNCFunctions.NodeHeartbeatFunction fn = new
CCNCFunctions.NodeHeartbeatFunction(id, hbData);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws
Exception {
CCNCFunctions.ReportProfileFunction fn = new
CCNCFunctions.ReportProfileFunction(id, profiles);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor
partitionDescriptor) throws Exception {
CCNCFunctions.RegisterPartitionProviderFunction fn = new
CCNCFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest)
throws Exception {
CCNCFunctions.RegisterPartitionRequestFunction fn = new
CCNCFunctions.RegisterPartitionRequestFunction(
partitionRequest);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void sendApplicationMessageToCC(byte[] data, DeploymentId
deploymentId, String nodeId) throws Exception {
CCNCFunctions.SendApplicationMessageFunction fn = new
CCNCFunctions.SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
@@ -123,51 +148,51 @@
CCNCFunctions.RegisterResultPartitionLocationFunction fn =
new
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId,
orderedResult, emptyResult,
partition, nPartitions, networkAddress);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId
rsId, int partition) throws Exception {
CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn =
new
CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId,
partition);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId,
int partition) throws Exception {
CCNCFunctions.ReportResultPartitionFailureFunction fn =
new CCNCFunctions.ReportResultPartitionFailureFunction(jobId,
rsId, partition);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws
Exception {
CCNCFunctions.ReportDistributedJobFailureFunction fn =
new CCNCFunctions.ReportDistributedJobFailureFunction(jobId,
nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void getNodeControllerInfos() throws Exception {
- ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(),
null);
+ ensureIpcHandle().send(-1, new
CCNCFunctions.GetNodeControllersInfoFunction(), null);
}
@Override
public void notifyStateDump(String nodeId, String stateDumpId, String
state) throws Exception {
CCNCFunctions.StateDumpResponseFunction fn =
new CCNCFunctions.StateDumpResponseFunction(nodeId,
stateDumpId, state);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyShutdown(String nodeId) throws Exception{
CCNCFunctions.ShutdownResponseFunction sdrf = new
CCNCFunctions.ShutdownResponseFunction(nodeId);
- ipcHandle.send(-1, sdrf, null);
+ ensureIpcHandle().send(-1, sdrf, null);
}
@Override
public void notifyThreadDump(String nodeId, String requestId, String
threadDumpJSON) throws Exception {
CCNCFunctions.ThreadDumpResponseFunction tdrf =
new CCNCFunctions.ThreadDumpResponseFunction(nodeId,
requestId, threadDumpJSON);
- ipcHandle.send(-1, tdrf, null);
+ ensureIpcHandle().send(-1, tdrf, null);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..fa2c2d4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,10 +18,12 @@
*/
package org.apache.hyracks.control.common.ipc;
+import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -35,12 +37,28 @@
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
public class NodeControllerRemoteProxy implements INodeController {
- private final IIPCHandle ipcHandle;
+ private static final Logger LOGGER =
Logger.getLogger(NodeControllerRemoteProxy.class.getName());
+ private final IPCSystem ipc;
+ private final InetSocketAddress inetSocketAddress;
+ private IIPCHandle ipcHandle;
- public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
- this.ipcHandle = ipcHandle;
+ public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress
inetSocketAddress) {
+ this.ipc = ipc;
+ this.inetSocketAddress = inetSocketAddress;
+ }
+
+ private IIPCHandle ensureIpcHandle() throws IPCException {
+ if (ipcHandle == null || !ipcHandle.isConnected()) {
+ if (ipcHandle != null) {
+ LOGGER.warning("ipcHandle disconnected; retrying connection");
+ }
+ ipcHandle = ipc.getHandle(inetSocketAddress, 0);
+ }
+ return ipcHandle;
}
@Override
@@ -49,74 +67,74 @@
Set<JobFlag> flags) throws Exception {
CCNCFunctions.StartTasksFunction stf = new
CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags);
- ipcHandle.send(-1, stf, null);
+ ensureIpcHandle().send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws
Exception {
CCNCFunctions.AbortTasksFunction atf = new
CCNCFunctions.AbortTasksFunction(jobId, tasks);
- ipcHandle.send(-1, atf, null);
+ ensureIpcHandle().send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
CCNCFunctions.CleanupJobletFunction cjf = new
CCNCFunctions.CleanupJobletFunction(jobId, status);
- ipcHandle.send(-1, cjf, null);
+ ensureIpcHandle().send(-1, cjf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress
networkAddress) throws Exception {
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new
CCNCFunctions.ReportPartitionAvailabilityFunction(
pid, networkAddress);
- ipcHandle.send(-1, rpaf, null);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs)
throws Exception {
CCNCFunctions.DeployBinaryFunction rpaf = new
CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
- ipcHandle.send(-1, rpaf, null);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void undeployBinary(DeploymentId deploymentId) throws Exception {
CCNCFunctions.UnDeployBinaryFunction rpaf = new
CCNCFunctions.UnDeployBinaryFunction(deploymentId);
- ipcHandle.send(-1, rpaf, null);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
CCNCFunctions.DistributeJobFunction fn = new
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void destroyJob(JobId jobId) throws Exception {
CCNCFunctions.DestroyJobFunction fn = new
CCNCFunctions.DestroyJobFunction(jobId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void dumpState(String stateDumpId) throws Exception {
CCNCFunctions.StateDumpRequestFunction dsf = new
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
- ipcHandle.send(-1, dsf, null);
+ ensureIpcHandle().send(-1, dsf, null);
}
@Override
public void shutdown(boolean terminateNCService) throws Exception {
CCNCFunctions.ShutdownRequestFunction sdrf = new
CCNCFunctions.ShutdownRequestFunction(terminateNCService);
- ipcHandle.send(-1, sdrf, null);
+ ensureIpcHandle().send(-1, sdrf, null);
}
@Override
public void sendApplicationMessageToNC(byte[] data, DeploymentId
deploymentId, String nodeId) throws Exception {
CCNCFunctions.SendApplicationMessageFunction fn = new
CCNCFunctions.SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void takeThreadDump(String requestId) throws Exception {
CCNCFunctions.ThreadDumpRequestFunction fn = new
CCNCFunctions.ThreadDumpRequestFunction(requestId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2fe0e27..0587a55 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -80,7 +80,6 @@
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
import org.apache.hyracks.ipc.impl.IPCSystem;
import
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -279,10 +278,9 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- IIPCHandle ccIPCHandle = ipc.getHandle(
+ this.ccs = new ClusterControllerRemoteProxy(ipc,
new InetSocketAddress(ncConfig.getClusterAddress(),
ncConfig.getClusterPort()),
ncConfig.getClusterConnectRetries());
- this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index fe2bcae..9efd70e 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -99,7 +99,7 @@
while (true) {
synchronized (this) {
handle = ipcHandleMap.get(remoteAddress);
- if (handle == null) {
+ if (handle == null || !handle.isConnected()) {
handle = new IPCHandle(system, remoteAddress);
pendingConnections.add(handle);
networkThread.selector.wakeup();
--
To view, visit https://asterix-gerrit.ics.uci.edu/1828
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I069dcd59898021054462c8213fb623df2deec598
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>