Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2617

Change subject: [NO ISSUE][NET] IPC Connections Improvements
......................................................................

[NO ISSUE][NET] IPC Connections Improvements

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Refactor IPCConnectionManager.
- Halt on IPC message serialization failures.
- Ensure channels and handles are closed on connection
  failures.
- Remove IPCHandle unneeded CONNECT_FAILED state.
- Fix RegisterNodeWork failure handling.
- Consistently use NodeControllerRemoteProxy for NC RPC.

Change-Id: I4049b16573c13fcdb1b12c0b6b2a97ee1fcc709e
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
13 files changed, 254 insertions(+), 199 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/17/2617/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index b44a6bb..2b03324 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -105,9 +105,7 @@
             failNode(nodeId);
         }
         try {
-            // TODO(mblow): it seems we should close IPC handles when we're 
done with them (like here)
-            IIPCHandle ncIPCHandle = 
ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
-            ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), 
null);
+            ncState.getNodeController().abortJobs(ccs.getCcId());
         } catch (IPCException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index de7d941..00693df 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -28,11 +28,8 @@
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,36 +49,33 @@
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
-        // TODO(mblow): it seems we should close IPC handles when we're done 
with them (like here)
-        IIPCHandle ncIPCHandle = 
ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
-        CCNCFunctions.NodeRegistrationResult result;
-        Map<IOption, Object> ncConfiguration = new HashMap<>();
+        LOGGER.warn("Registering node: {}", id);
+        NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
+                
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
+        INodeManager nodeManager = ccs.getNodeManager();
         try {
-            LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
-            NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
-                    
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
             NodeControllerState state = new NodeControllerState(nc, reg);
-            INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
             IApplicationConfig cfg = 
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+            final Map<IOption, Object> ncConfiguration = new HashMap<>();
             for (IOption option : cfg.getOptions()) {
                 ncConfiguration.put(option, cfg.get(option));
             }
-            LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+            LOGGER.warn("Registered node: {}", id);
             NodeParameters params = new NodeParameters();
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             params.setDistributedState(ccs.getContext().getDistributedState());
             
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             params.setRegistrationId(registrationId);
-            result = new CCNCFunctions.NodeRegistrationResult(params, null);
+            LOGGER.warn("sending registration response to node {}", id);
+            nc.sendRegistrationResult(params, null);
+            LOGGER.warn("notifying node {} joined", id);
+            ccs.getContext().notifyNodeJoin(id, ncConfiguration);
         } catch (Exception e) {
-            LOGGER.log(Level.WARN, "Node registration failed", e);
-            result = new CCNCFunctions.NodeRegistrationResult(null, e);
+            LOGGER.error("Node {} registration failed", id, e);
+            nodeManager.removeNode(id);
+            nc.sendRegistrationResult(null, e);
         }
-        LOGGER.warn("sending registration response to node");
-        ncIPCHandle.send(-1, result, null);
-        LOGGER.warn("notifying node join");
-        ccs.getContext().notifyNodeJoin(id, ncConfiguration);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9959e34..cc2ed46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -134,7 +134,6 @@
         IPCSystem ipcSystem = Mockito.mock(IPCSystem.class);
         IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class);
         Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
-        Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
         Mockito.when(ipcSystem.getHandle(Mockito.any(), 
Mockito.anyInt())).thenReturn(ipcHandle);
         
Mockito.when(ccs.getExecutor()).thenReturn(Executors.newCachedThreadPool());
         return ccs;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 92764a7..78cd44d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -33,7 +34,9 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public interface INodeController {
     void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
@@ -62,4 +65,22 @@
     void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, 
String nodeId) throws Exception;
 
     void takeThreadDump(String requestId) throws Exception;
+
+    /**
+     * Sends a request to this {@link INodeController} to abort all jobs
+     * started by cluster controller with id {@code ccId}
+     *
+     * @param ccId
+     * @throws IPCException
+     */
+    void abortJobs(CcId ccId) throws IPCException;
+
+    /**
+     * Sends node registration result to this {@link INodeController}.
+     *
+     * @param parameters
+     * @param regFailure
+     * @throws IPCException
+     */
+    void sendRegistrationResult(NodeParameters parameters, Exception 
regFailure) throws IPCException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b78e53f..d6867eb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
@@ -50,6 +51,7 @@
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public class NodeControllerRemoteProxy implements INodeController {
     private final CcId ccId;
@@ -136,6 +138,16 @@
         ipcHandle.send(-1, fn, null);
     }
 
+    @Override
+    public void abortJobs(CcId ccId) throws IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId), null);
+    }
+
+    @Override
+    public void sendRegistrationResult(NodeParameters parameters, Exception 
regFailure) throws IPCException {
+        ipcHandle.send(-1, new 
CCNCFunctions.NodeRegistrationResult(parameters, regFailure), null);
+    }
+
     public InetSocketAddress getAddress() {
         return ipcHandle.getRemoteAddress();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 33d1d60..6d4f173 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -46,7 +46,7 @@
     public void run() {
         NCServiceContext ctx = ncs.getContext();
         try {
-            IMessage data = (IMessage) DeploymentUtils.deserialize(message, 
deploymentId, ctx);;
+            IMessage data = (IMessage) DeploymentUtils.deserialize(message, 
deploymentId, ctx);
             if (ctx.getMessageBroker() != null) {
                 ctx.getMessageBroker().receivedMessage(data, nodeId);
             } else {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
index 912c267..39a29af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
@@ -23,6 +23,5 @@
     CONNECT_SENT,
     CONNECT_RECEIVED,
     CONNECTED,
-    CONNECT_FAILED,
     CLOSED,
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 3e6c64b..040fe03 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -18,12 +18,13 @@
  */
 package org.apache.hyracks.ipc.impl;
 
+import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -39,7 +40,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -100,7 +101,7 @@
 
     void stop() {
         stopped = true;
-        IOUtils.closeQuietly(serverSocketChannel);
+        NetworkUtil.closeQuietly(serverSocketChannel);
         networkThread.selector.wakeup();
     }
 
@@ -121,8 +122,10 @@
                 return handle;
             }
             if (maxRetries < 0 || retries++ < maxRetries) {
-                LOGGER.warn("Connection to " + remoteAddress + " failed; 
retrying" + (maxRetries <= 0 ? ""
-                        : " (retry attempt " + retries + " of " + maxRetries + 
") after " + delay + "ms"));
+                if (LOGGER.isWarnEnabled()) {
+                    LOGGER.warn("Connection to " + remoteAddress + " failed; 
retrying" + (maxRetries <= 0 ? ""
+                            : " (retry attempt " + retries + " of " + 
maxRetries + ") after " + delay + "ms"));
+                }
                 Thread.sleep(delay);
                 delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5));
             } else {
@@ -144,24 +147,6 @@
         networkThread.selector.wakeup();
     }
 
-    private synchronized void collectOutstandingWork() {
-        if (!pendingConnections.isEmpty()) {
-            moveAll(pendingConnections, workingPendingConnections);
-        }
-        if (!sendList.isEmpty()) {
-            moveAll(sendList, workingSendList);
-        }
-    }
-
-    private Message createInitialReqMessage(IPCHandle handle) {
-        Message msg = new Message(handle);
-        msg.setMessageId(system.createMessageId());
-        msg.setRequestMessageId(-1);
-        msg.setFlag(Message.INITIAL_REQ);
-        msg.setPayload(address);
-        return msg;
-    }
-
     private Message createInitialAckMessage(IPCHandle handle, Message req) {
         Message msg = new Message(handle);
         msg.setMessageId(system.createMessageId());
@@ -177,16 +162,18 @@
 
     private class NetworkThread extends Thread {
         private final Selector selector;
-
         private final Set<SocketChannel> openChannels = new HashSet<>();
+        private final BitSet unsentMessagesBitmap = new BitSet();
+        private final List<Message> tempUnsentMessages = new ArrayList<>();
 
-        public NetworkThread() {
+        NetworkThread() {
             super("IPC Network Listener Thread [" + address + "]");
             setDaemon(true);
             try {
                 selector = Selector.open();
+                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new IllegalStateException(e);
             }
         }
 
@@ -200,105 +187,19 @@
         }
 
         private void doRun() {
-            try {
-                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-            } catch (ClosedChannelException e) {
-                throw new RuntimeException(e);
-            }
-            BitSet unsentMessagesBitmap = new BitSet();
-            List<Message> tempUnsentMessages = new ArrayList<>();
             int failingLoops = 0;
             while (!stopped) {
                 try {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Starting Select");
-                    }
                     int n = selector.select();
                     collectOutstandingWork();
                     if (!workingPendingConnections.isEmpty()) {
-                        for (IPCHandle handle : workingPendingConnections) {
-                            SocketChannel channel = SocketChannel.open();
-                            register(channel);
-                            SelectionKey cKey;
-                            if (channel.connect(handle.getRemoteAddress())) {
-                                cKey = channel.register(selector, 
SelectionKey.OP_READ);
-                                handle.setState(HandleState.CONNECT_SENT);
-                                
IPCConnectionManager.this.write(createInitialReqMessage(handle));
-                            } else {
-                                cKey = channel.register(selector, 
SelectionKey.OP_CONNECT);
-                            }
-                            handle.setKey(cKey);
-                            cKey.attach(handle);
-                        }
-                        workingPendingConnections.clear();
+                        establishPendingConnections();
                     }
                     if (!workingSendList.isEmpty()) {
-                        unsentMessagesBitmap.clear();
-                        int len = workingSendList.size();
-                        for (int i = 0; i < len; ++i) {
-                            Message msg = workingSendList.get(i);
-                            LOGGER.debug(() -> "Processing send of message: " 
+ msg);
-                            IPCHandle handle = msg.getIPCHandle();
-                            if (handle.getState() != HandleState.CLOSED) {
-                                if (!handle.full()) {
-                                    while (true) {
-                                        ByteBuffer buffer = 
handle.getOutBuffer();
-                                        buffer.compact();
-                                        boolean success = msg.write(buffer);
-                                        buffer.flip();
-                                        if (success) {
-                                            
system.getPerformanceCounters().addMessageSentCount(1);
-                                            SelectionKey key = handle.getKey();
-                                            key.interestOps(key.interestOps() 
| SelectionKey.OP_WRITE);
-                                        } else {
-                                            if (!buffer.hasRemaining()) {
-                                                handle.resizeOutBuffer();
-                                                continue;
-                                            }
-                                            handle.markFull();
-                                            unsentMessagesBitmap.set(i);
-                                        }
-                                        break;
-                                    }
-                                } else {
-                                    unsentMessagesBitmap.set(i);
-                                }
-                            }
-                        }
-                        copyUnsentMessages(unsentMessagesBitmap, 
tempUnsentMessages);
+                        sendPendingMessages();
                     }
                     if (n > 0) {
-                        for (Iterator<SelectionKey> i = 
selector.selectedKeys().iterator(); i.hasNext();) {
-                            SelectionKey key = i.next();
-                            i.remove();
-                            final SelectableChannel sc = key.channel();
-                            if (key.isReadable()) {
-                                read(key);
-                            } else if (key.isWritable()) {
-                                write(key);
-                            } else if (key.isAcceptable()) {
-                                assert sc == serverSocketChannel;
-                                SocketChannel channel = 
serverSocketChannel.accept();
-                                register(channel);
-                                IPCHandle handle = new IPCHandle(system, null);
-                                SelectionKey cKey = channel.register(selector, 
SelectionKey.OP_READ);
-                                handle.setKey(cKey);
-                                cKey.attach(handle);
-                                handle.setState(HandleState.CONNECT_RECEIVED);
-                            } else if (key.isConnectable()) {
-                                SocketChannel channel = (SocketChannel) sc;
-                                IPCHandle handle = (IPCHandle) 
key.attachment();
-                                if (!finishConnect(channel)) {
-                                    
handle.setState(HandleState.CONNECT_FAILED);
-                                    continue;
-                                }
-
-                                handle.setState(HandleState.CONNECT_SENT);
-                                registerHandle(handle);
-                                key.interestOps(SelectionKey.OP_READ);
-                                
IPCConnectionManager.this.write(createInitialReqMessage(handle));
-                            }
-                        }
+                        processSelectedKeys();
                     }
                     // reset failingLoops on a good loop
                     failingLoops = 0;
@@ -314,25 +215,146 @@
             }
         }
 
-        private void cleanup() {
-            for (Channel channel : openChannels) {
-                IOUtils.closeQuietly(channel);
+        private void processSelectedKeys() {
+            for (Iterator<SelectionKey> i = 
selector.selectedKeys().iterator(); i.hasNext();) {
+                SelectionKey key = i.next();
+                i.remove();
+                final SelectableChannel sc = key.channel();
+                if (key.isReadable()) {
+                    read(key);
+                } else if (key.isWritable()) {
+                    write(key);
+                } else if (key.isAcceptable()) {
+                    assert sc == serverSocketChannel;
+                    accept();
+                } else if (key.isConnectable()) {
+                    finishConnect(key);
+                }
             }
-            openChannels.clear();
-            IOUtils.closeQuietly(selector);
         }
 
-        private boolean finishConnect(SocketChannel channel) {
-            boolean connectFinished = false;
+        private void finishConnect(SelectionKey connectableKey) {
+            SocketChannel channel = (SocketChannel) connectableKey.channel();
+            IPCHandle handle = (IPCHandle) connectableKey.attachment();
+            boolean connected = false;
             try {
-                connectFinished = channel.finishConnect();
-                if (!connectFinished) {
-                    LOGGER.log(Level.WARN, "Channel connect did not finish");
+                connected = channel.finishConnect();
+                if (connected) {
+                    connectableKey.interestOps(SelectionKey.OP_READ);
+                    connectionEstablished(handle);
                 }
             } catch (IOException e) {
-                LOGGER.log(Level.WARN, "Exception finishing channel connect", 
e);
+                LOGGER.warn("Exception finishing connect", e);
+            } finally {
+                if (!connected) {
+                    LOGGER.warn("Failed to finish connect to {}", 
handle.getRemoteAddress());
+                    close(connectableKey, channel);
+                }
             }
-            return connectFinished;
+        }
+
+        private void accept() {
+            SocketChannel channel = null;
+            SelectionKey channelKey = null;
+            try {
+                channel = serverSocketChannel.accept();
+                register(channel);
+                channelKey = channel.register(selector, SelectionKey.OP_READ);
+                IPCHandle handle = new IPCHandle(system, null);
+                handle.setKey(channelKey);
+                channelKey.attach(handle);
+                handle.setState(HandleState.CONNECT_RECEIVED);
+            } catch (IOException e) {
+                LOGGER.error("Failed to accept channel ", e);
+                close(channelKey, channel);
+            }
+        }
+
+        private void establishPendingConnections() {
+            for (IPCHandle handle : workingPendingConnections) {
+                SocketChannel channel = null;
+                SelectionKey channelKey = null;
+                try {
+                    channel = SocketChannel.open();
+                    register(channel);
+                    if (channel.connect(handle.getRemoteAddress())) {
+                        channelKey = channel.register(selector, 
SelectionKey.OP_READ);
+                        connectionEstablished(handle);
+                    } else {
+                        channelKey = channel.register(selector, 
SelectionKey.OP_CONNECT);
+                    }
+                    handle.setKey(channelKey);
+                    channelKey.attach(handle);
+                } catch (IOException e) {
+                    LOGGER.error("Failed to accept channel ", e);
+                    close(channelKey, channel);
+                }
+            }
+            workingPendingConnections.clear();
+        }
+
+        private void connectionEstablished(IPCHandle handle) {
+            handle.setState(HandleState.CONNECT_SENT);
+            registerHandle(handle);
+            IPCConnectionManager.this.write(createInitialReqMessage(handle));
+        }
+
+        private void sendPendingMessages() {
+            unsentMessagesBitmap.clear();
+            int len = workingSendList.size();
+            for (int i = 0; i < len; ++i) {
+                Message msg = workingSendList.get(i);
+                final boolean sent = sendMessage(msg);
+                if (!sent) {
+                    unsentMessagesBitmap.set(i);
+                }
+            }
+            copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
+        }
+
+        private boolean sendMessage(Message msg) {
+            LOGGER.debug("Processing send of message: {}", msg);
+            IPCHandle handle = msg.getIPCHandle();
+            if (handle.getState() == HandleState.CLOSED) {
+                // message will never be sent
+                return true;
+            }
+            if (handle.full()) {
+                return false;
+            }
+            try {
+                while (true) {
+                    ByteBuffer buffer = handle.getOutBuffer();
+                    buffer.compact();
+                    boolean success = msg.write(buffer);
+                    buffer.flip();
+                    if (success) {
+                        system.getPerformanceCounters().addMessageSentCount(1);
+                        SelectionKey key = handle.getKey();
+                        key.interestOps(key.interestOps() | 
SelectionKey.OP_WRITE);
+                        return true;
+                    } else {
+                        if (!buffer.hasRemaining()) {
+                            handle.resizeOutBuffer();
+                            continue;
+                        }
+                        handle.markFull();
+                        return false;
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.fatal("Unrecoverable networking failure; Halting...", 
e);
+                ExitUtil.halt(EC_IMMEDIATE_HALT);
+            }
+            return false;
+        }
+
+        private void cleanup() {
+            for (Channel channel : openChannels) {
+                NetworkUtil.closeQuietly(channel);
+            }
+            openChannels.clear();
+            NetworkUtil.closeQuietly(selector);
         }
 
         private void copyUnsentMessages(BitSet unsentMessagesBitmap, 
List<Message> tempUnsentMessages) {
@@ -396,19 +418,42 @@
         }
 
         private void close(SelectionKey key, SocketChannel sc) {
-            key.cancel();
-            NetworkUtil.closeQuietly(sc);
-            openChannels.remove(sc);
-            final IPCHandle handle = (IPCHandle) key.attachment();
-            handle.close();
+            if (key != null) {
+                final Object attachment = key.attachment();
+                if (attachment != null) {
+                    ((IPCHandle) attachment).close();
+                }
+                key.cancel();
+            }
+            if (sc != null) {
+                NetworkUtil.closeQuietly(sc);
+                openChannels.remove(sc);
+            }
         }
-    }
 
-    private <T> void moveAll(List<T> source, List<T> target) {
-        int len = source.size();
-        for (int i = 0; i < len; ++i) {
-            target.add(source.get(i));
+        private void collectOutstandingWork() {
+            synchronized (IPCConnectionManager.this) {
+                if (!pendingConnections.isEmpty()) {
+                    moveAll(pendingConnections, workingPendingConnections);
+                }
+                if (!sendList.isEmpty()) {
+                    moveAll(sendList, workingSendList);
+                }
+            }
         }
-        source.clear();
+
+        private Message createInitialReqMessage(IPCHandle handle) {
+            Message msg = new Message(handle);
+            msg.setMessageId(system.createMessageId());
+            msg.setRequestMessageId(-1);
+            msg.setFlag(Message.INITIAL_REQ);
+            msg.setPayload(address);
+            return msg;
+        }
+
+        private <T> void moveAll(List<T> source, List<T> target) {
+            target.addAll(source);
+            source.clear();
+        }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index d63bfbd..09c7c97 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -62,10 +62,6 @@
         return system;
     }
 
-    void setRemoteAddress(InetSocketAddress remoteAddress) {
-        this.remoteAddress = remoteAddress;
-    }
-
     @Override
     public long send(long requestId, Object req, Exception exception) throws 
IPCException {
         if (!isConnected()) {
@@ -127,7 +123,6 @@
                     wait();
                     break;
                 case CONNECTED:
-                case CONNECT_FAILED:
                 case CLOSED:
                     return state == HandleState.CONNECTED;
                 default:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index b36e645..b7dcf05 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -66,26 +66,12 @@
         cMgr.stop();
     }
 
-    public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws 
IPCException {
-        return getHandle(remoteAddress, 0);
-    }
-
     public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries) throws IPCException {
         return getHandle(remoteAddress, maxRetries, 0);
     }
 
     public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress) 
throws IPCException {
         return getReconnectingHandle(remoteAddress, 1);
-    }
-
-    public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, 
int reconnectAttempts)
-            throws IPCException {
-        return getHandle(remoteAddress, 0, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
-    }
-
-    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries, int reconnectAttempts)
-            throws IPCException {
-        return getHandle(remoteAddress, maxRetries, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries, int reconnectAttempts,
@@ -132,4 +118,14 @@
     public IPCPerformanceCounters getPerformanceCounters() {
         return perfCounters;
     }
+
+    private IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, 
int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, 0, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
+    }
+
+    private IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries, int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, maxRetries, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
index db0ed6b..49f439f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -68,21 +68,16 @@
         return delegate.isConnected();
     }
 
-    private IIPCHandle ensureConnected() throws IPCException {
+    private synchronized IIPCHandle ensureConnected() throws IPCException {
         if (delegate.isConnected()) {
             return delegate;
         }
-        synchronized (this) {
-            if (delegate.isConnected()) {
-                return delegate;
-            }
-            LOGGER.warn("ipcHandle {} disconnected; will attempt to reconnect 
{} times", delegate, reconnectAttempts);
-            listener.ipcHandleDisconnected(delegate);
-            delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
-            LOGGER.warn("ipcHandle {} restored", delegate);
-            listener.ipcHandleRestored(delegate);
-            return delegate;
-        }
+        LOGGER.warn("ipcHandle {} disconnected; will attempt to reconnect {} 
times", delegate, reconnectAttempts);
+        listener.ipcHandleDisconnected(delegate);
+        delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+        LOGGER.warn("ipcHandle {} restored", delegate);
+        listener.ipcHandleRestored(delegate);
+        return delegate;
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index cca3abe..70a0e18 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -43,7 +43,7 @@
         IPCSystem client = createClientIPCSystem(rpci);
         client.start();
 
-        IIPCHandle handle = client.getHandle(serverAddr);
+        IIPCHandle handle = client.getHandle(serverAddr, 0);
 
         for (int i = 0; i < 100; ++i) {
             Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), 
Integer.valueOf(2 * i));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 06aeef5..1863b28 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.util;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
@@ -45,12 +46,12 @@
         sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
     }
 
-    public static void closeQuietly(SocketChannel sc) {
-        if (sc.isOpen()) {
+    public static void closeQuietly(Closeable closeable) {
+        if (closeable != null) {
             try {
-                sc.close();
+                closeable.close();
             } catch (IOException e) {
-                LOGGER.warn("Failed to close socket", e);
+                LOGGER.warn("Failed to close", e);
             }
         }
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2617
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4049b16573c13fcdb1b12c0b6b2a97ee1fcc709e
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to