Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1828

Change subject: Attempt to reconnect IPCHandle on connection failure
......................................................................

Attempt to reconnect IPCHandle on connection failure

IPCHandles can become invalid due to network interruption or node
crash/restart.  Automatically retry connection in event of attempt
to use disconnected handle.

Change-Id: I069dcd59898021054462c8213fb623df2deec598
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
5 files changed, 84 insertions(+), 43 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/28/1828/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 79033d8..dc7bad0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,8 +54,8 @@
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
-            INodeController nodeController = new 
NodeControllerRemoteProxy(ncIPCHandle);
-            NodeControllerState state = new 
NodeControllerState(nodeController, reg);
+            INodeController nc = new 
NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+            NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
             IApplicationConfig cfg = 
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 83ef32b..1e4f786 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -34,24 +36,47 @@
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public class ClusterControllerRemoteProxy implements IClusterController {
-    private final IIPCHandle ipcHandle;
+    private static final Logger LOGGER = 
Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
 
-    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    private final IPCSystem ipc;
+    private final InetSocketAddress inetSocketAddress;
+    private final int clusterConnectRetries;
+    private IIPCHandle ipcHandle;
+
+    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress, int clusterConnectRetries) {
+        this.ipc = ipc;
+        this.inetSocketAddress = inetSocketAddress;
+        this.clusterConnectRetries = clusterConnectRetries;
+    }
+
+    private IIPCHandle ensureIpcHandle() throws IPCException {
+        final boolean first = ipcHandle == null;
+        if (first || !ipcHandle.isConnected()) {
+            if (!first) {
+                LOGGER.warning("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, first ? 
clusterConnectRetries : 0);
+            if (!first && ipcHandle.isConnected()) {
+                LOGGER.warning("ipcHandle " + ipcHandle + " restored");
+            }
+        }
+        return ipcHandle;
     }
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
         CCNCFunctions.RegisterNodeFunction fn = new 
CCNCFunctions.RegisterNodeFunction(reg);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
         CCNCFunctions.UnregisterNodeFunction fn = new 
CCNCFunctions.UnregisterNodeFunction(nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
@@ -59,7 +84,7 @@
             throws Exception {
         CCNCFunctions.NotifyTaskCompleteFunction fn = new 
CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
                 nodeId, statistics);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
@@ -67,53 +92,53 @@
             throws Exception {
         CCNCFunctions.NotifyTaskFailureFunction fn = new 
CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
                 exceptions);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws 
Exception {
         CCNCFunctions.NotifyJobletCleanupFunction fn = new 
CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, 
DeploymentStatus status) throws Exception {
         CCNCFunctions.NotifyDeployBinaryFunction fn = new 
CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
                 nodeId, status);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
         CCNCFunctions.NodeHeartbeatFunction fn = new 
CCNCFunctions.NodeHeartbeatFunction(id, hbData);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws 
Exception {
         CCNCFunctions.ReportProfileFunction fn = new 
CCNCFunctions.ReportProfileFunction(id, profiles);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor 
partitionDescriptor) throws Exception {
         CCNCFunctions.RegisterPartitionProviderFunction fn = new 
CCNCFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) 
throws Exception {
         CCNCFunctions.RegisterPartitionRequestFunction fn = new 
CCNCFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
         CCNCFunctions.SendApplicationMessageFunction fn = new 
CCNCFunctions.SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
@@ -123,51 +148,51 @@
         CCNCFunctions.RegisterResultPartitionLocationFunction fn =
                 new 
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, 
orderedResult, emptyResult,
                         partition, nPartitions, networkAddress);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition) throws Exception {
         CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn =
                 new 
CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, 
partition);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, 
int partition) throws Exception {
         CCNCFunctions.ReportResultPartitionFailureFunction fn =
                 new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, 
rsId, partition);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
         CCNCFunctions.ReportDistributedJobFailureFunction fn =
                 new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, 
nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void getNodeControllerInfos() throws Exception {
-        ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), 
null);
+        ensureIpcHandle().send(-1, new 
CCNCFunctions.GetNodeControllersInfoFunction(), null);
     }
 
     @Override
     public void notifyStateDump(String nodeId, String stateDumpId, String 
state) throws Exception {
         CCNCFunctions.StateDumpResponseFunction fn =
                 new CCNCFunctions.StateDumpResponseFunction(nodeId, 
stateDumpId, state);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
     @Override
     public void notifyShutdown(String nodeId) throws Exception{
         CCNCFunctions.ShutdownResponseFunction sdrf = new 
CCNCFunctions.ShutdownResponseFunction(nodeId);
-        ipcHandle.send(-1, sdrf, null);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception {
         CCNCFunctions.ThreadDumpResponseFunction tdrf =
                 new CCNCFunctions.ThreadDumpResponseFunction(nodeId, 
requestId, threadDumpJSON);
-        ipcHandle.send(-1, tdrf, null);
+        ensureIpcHandle().send(-1, tdrf, null);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..fa2c2d4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,10 +18,12 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -35,12 +37,28 @@
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public class NodeControllerRemoteProxy implements INodeController {
-    private final IIPCHandle ipcHandle;
+    private static final Logger LOGGER = 
Logger.getLogger(NodeControllerRemoteProxy.class.getName());
+    private final IPCSystem ipc;
+    private final InetSocketAddress inetSocketAddress;
+    private IIPCHandle ipcHandle;
 
-    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress) {
+        this.ipc = ipc;
+        this.inetSocketAddress = inetSocketAddress;
+    }
+
+    private IIPCHandle ensureIpcHandle() throws IPCException {
+        if (ipcHandle == null || !ipcHandle.isConnected()) {
+            if (ipcHandle != null) {
+                LOGGER.warning("ipcHandle disconnected; retrying connection");
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, 0);
+        }
+        return ipcHandle;
     }
 
     @Override
@@ -49,74 +67,74 @@
             Set<JobFlag> flags) throws Exception {
         CCNCFunctions.StartTasksFunction stf = new 
CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
                 taskDescriptors, connectorPolicies, flags);
-        ipcHandle.send(-1, stf, null);
+        ensureIpcHandle().send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws 
Exception {
         CCNCFunctions.AbortTasksFunction atf = new 
CCNCFunctions.AbortTasksFunction(jobId, tasks);
-        ipcHandle.send(-1, atf, null);
+        ensureIpcHandle().send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
         CCNCFunctions.CleanupJobletFunction cjf = new 
CCNCFunctions.CleanupJobletFunction(jobId, status);
-        ipcHandle.send(-1, cjf, null);
+        ensureIpcHandle().send(-1, cjf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress 
networkAddress) throws Exception {
         CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new 
CCNCFunctions.ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ipcHandle.send(-1, rpaf, null);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) 
throws Exception {
         CCNCFunctions.DeployBinaryFunction rpaf = new 
CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
-        ipcHandle.send(-1, rpaf, null);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
         CCNCFunctions.UnDeployBinaryFunction rpaf = new 
CCNCFunctions.UnDeployBinaryFunction(deploymentId);
-        ipcHandle.send(-1, rpaf, null);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
         CCNCFunctions.DistributeJobFunction fn = new 
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void destroyJob(JobId jobId) throws Exception {
         CCNCFunctions.DestroyJobFunction fn = new 
CCNCFunctions.DestroyJobFunction(jobId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
         CCNCFunctions.StateDumpRequestFunction dsf = new 
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
-        ipcHandle.send(-1, dsf, null);
+        ensureIpcHandle().send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
         CCNCFunctions.ShutdownRequestFunction sdrf = new 
CCNCFunctions.ShutdownRequestFunction(terminateNCService);
-        ipcHandle.send(-1, sdrf, null);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
         CCNCFunctions.SendApplicationMessageFunction fn = new 
CCNCFunctions.SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
         CCNCFunctions.ThreadDumpRequestFunction fn = new 
CCNCFunctions.ThreadDumpRequestFunction(requestId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2fe0e27..0587a55 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -80,7 +80,6 @@
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -279,10 +278,9 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        IIPCHandle ccIPCHandle = ipc.getHandle(
+        this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort()),
                 ncConfig.getClusterConnectRetries());
-        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index fe2bcae..9efd70e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -99,7 +99,7 @@
         while (true) {
             synchronized (this) {
                 handle = ipcHandleMap.get(remoteAddress);
-                if (handle == null) {
+                if (handle == null || !handle.isConnected()) {
                     handle = new IPCHandle(system, remoteAddress);
                     pendingConnections.add(handle);
                     networkThread.selector.wakeup();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1828
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I069dcd59898021054462c8213fb623df2deec598
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to