Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1830

Change subject: Re-register NC with CC on reestablished IPCHandle
......................................................................

Re-register NC with CC on reestablished IPCHandle

In case of failed NC -> CC IPCHandle due to CC crash/restart, the NC
shoud re-register with the CC in order to rejoin the cluster.  The CC
will ignore heartbeats from unregistered nodes.  This assumes
calling StartupTaskRequestMessage is idempotent.  Also improve toString
on IPCHandle.

Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
9 files changed, 104 insertions(+), 18 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/30/1830/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 107b859..7c8e153 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -67,7 +67,8 @@
     protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
-    private boolean stopInitiated = false;
+    private boolean stopInitiated;
+    private boolean startupCompleted;
     private SystemState systemState;
     protected WebManager webManager;
 
@@ -190,6 +191,15 @@
         }
         // Request startup tasks from CC
         StartupTaskRequestMessage.send((NodeControllerService) 
ncServiceCtx.getControllerService(), systemState);
+        startupCompleted = true;
+    }
+
+    @Override
+    public void onRegisterNode() throws Exception {
+        if (startupCompleted) {
+            // Request startup tasks from CC
+            StartupTaskRequestMessage.send((NodeControllerService) 
ncServiceCtx.getControllerService(), systemState);
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 02416c4..64f4e29 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -33,4 +33,5 @@
      */
     IFileDeviceResolver getFileDeviceResolver();
 
+    void onRegisterNode() throws Exception;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
new file mode 100644
index 0000000..b0fa9c7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/INodeControllerService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.service;
+
+public interface INodeControllerService extends IControllerService {
+    void registerNode() throws Exception;
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 98d258f..58ebc98 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.INodeControllerService;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -37,19 +38,34 @@
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public class ClusterControllerRemoteProxy extends ControllerRemoteProxy 
implements IClusterController {
     private static final Logger LOGGER = 
Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
 
+    private final INodeControllerService ncs;
     private final int clusterConnectRetries;
 
-    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress, int clusterConnectRetries) {
+    public ClusterControllerRemoteProxy(INodeControllerService ncs, IPCSystem 
ipc, InetSocketAddress inetSocketAddress,
+            int clusterConnectRetries) {
         super(ipc, inetSocketAddress);
+        this.ncs = ncs;
         this.clusterConnectRetries = clusterConnectRetries;
     }
 
     @Override
+    protected void onIpcRestored() throws IPCException {
+        super.onIpcRestored();
+        // we need to re-register in case the NC -> CC connection was due to 
CC crash
+        try {
+            ncs.registerNode();
+        } catch (Exception e) {
+            throw new IPCException(e);
+        }
+    }
+
+    @Override
     protected int getRetries(boolean first) {
         return first ? clusterConnectRetries : 0;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 44b0e4a..6d0e1be 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -39,16 +39,24 @@
         final boolean first = ipcHandle == null;
         if (first || !ipcHandle.isConnected()) {
             if (!first) {
-                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
+                onIpcDisconnected();
             }
             ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
             if (!first && ipcHandle.isConnected()) {
-                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                onIpcRestored();
             }
         }
         return ipcHandle;
     }
 
+    protected void onIpcDisconnected() throws IPCException {
+        getLogger().warning("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
+    }
+
+    protected void onIpcRestored() throws IPCException {
+        getLogger().warning("ipcHandle " + ipcHandle + " restored");
+    }
+
     protected abstract int getRetries(boolean first);
 
     protected abstract Logger getLogger();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 20f6378..5afc98d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -53,6 +53,11 @@
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // no-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // no-op
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0587a55..ee97ccb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -54,7 +54,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.service.INodeControllerService;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
@@ -86,7 +86,7 @@
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.kohsuke.args4j.CmdLineException;
 
-public class NodeControllerService implements IControllerService {
+public class NodeControllerService implements INodeControllerService {
     private static final Logger LOGGER = 
Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
@@ -156,6 +156,8 @@
     private MessagingNetworkManager messagingNetManager;
 
     private final ConfigManager configManager;
+
+    private NodeRegistration nodeRegistration;
 
     public NodeControllerService(NCConfig config) throws Exception {
         this(config, getApplication(config));
@@ -278,7 +280,7 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        this.ccs = new ClusterControllerRemoteProxy(ipc,
+        this.ccs = new ClusterControllerRemoteProxy(this, ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort()),
                 ncConfig.getClusterConnectRetries());
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -292,21 +294,13 @@
         NetworkAddress meesagingPort = messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress()
                 : null;
         int allCores = osMXBean.getAvailableProcessors();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
+        nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
allCores,
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
-                application.getCapacity(), PidHelper.getPid()));
-
-        synchronized (this) {
-            while (registrationPending) {
-                wait();
-            }
-        }
-        if (registrationException != null) {
-            throw registrationException;
-        }
+                application.getCapacity(), PidHelper.getPid());
+        registerNode();
         serviceCtx.setDistributedState(nodeParameters.getDistributedState());
 
         workQueue.start();
@@ -330,6 +324,25 @@
         application.startupCompleted();
     }
 
+    @Override
+    public void registerNode() throws Exception {
+        LOGGER.info("Registering with Cluster Controller");
+        ccs.registerNode(nodeRegistration);
+
+        synchronized (this) {
+            while (registrationPending) {
+                wait();
+            }
+        }
+        if (registrationException != null) {
+            LOGGER.log(Level.WARNING, "Registering with Cluster Controller 
failed with exception",
+                    registrationException);
+            throw registrationException;
+        }
+        application.onRegisterNode();
+        LOGGER.info("Registering with Cluster Controller complete");
+    }
+
     private void startApplication() throws Exception {
         serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, 
memoryManager, lccm, ncConfig.getAppConfig());
         application.start(serviceCtx, ncConfig.getAppArgsArray());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index b7aa342..2967039 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -40,6 +40,11 @@
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // No-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // No-op
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 1fcdb3c..efd9830 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -192,4 +192,9 @@
     boolean full() {
         return full;
     }
+
+    @Override
+    public String toString() {
+        return "IPCHandle [addr=" + remoteAddress + " state=" + state + "]";
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1830
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to