>From Michael Blow <[email protected]>:

Michael Blow has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20797?usp=email )

Change subject: [ASTERIXDB-3689][HYR][NET] MuxDemux negotiation hardening
......................................................................

[ASTERIXDB-3689][HYR][NET] MuxDemux negotiation hardening

(cherry picked from commit 0646f96bf6aa)

Ext-ref: MB-70113,MB-70114
Change-Id: I3887c992ad73f501f074ff5756f6204506043d65
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20797
Tested-by: Michael Blow <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
2 files changed, 74 insertions(+), 44 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved; Verified




diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 3189ea1..0837b29 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -30,6 +30,8 @@
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
 import org.apache.hyracks.util.JSONUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -43,6 +45,8 @@
  * @author vinayakb
  */
 public class MuxDemux {
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private final InetSocketAddress localAddress;

     private final IChannelOpenListener channelOpenListener;
@@ -56,7 +60,7 @@

     private final MuxDemuxPerformanceCounters perfCounters;

-    private final IChannelInterfaceFactory channelInterfaceFatory;
+    private final IChannelInterfaceFactory channelInterfaceFactory;

     /**
      * Constructor.
@@ -80,7 +84,7 @@
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
         this.maxConnectionAttempts = maxConnectionAttempts;
-        this.channelInterfaceFatory = channelInterfaceFactory;
+        this.channelInterfaceFactory = channelInterfaceFactory;
         outgoingConnectionMap = new HashMap<>();
         incomingConnectionMap = new HashMap<>();
         this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
@@ -90,7 +94,10 @@
                 synchronized (MuxDemux.this) {
                     mConn = 
outgoingConnectionMap.get(connection.getRemoteAddress());
                 }
-                assert mConn != null;
+                if (mConn == null) {
+                    throw new IllegalStateException(
+                            "connectionEstablished called for unknown 
connection: " + connection.getRemoteAddress());
+                }
                 mConn.setTCPConnection(connection);
                 connection.setEventListener(mConn);
                 connection.setAttachment(mConn);
@@ -110,10 +117,19 @@
                 MultiplexedConnection mConn;
                 synchronized (MuxDemux.this) {
                     mConn = outgoingConnectionMap.get(remoteAddress);
-                    assert mConn != null;
+                    if (mConn == null) {
+                        throw new IllegalStateException(
+                                "connectionFailure called for unknown 
connection: " + remoteAddress);
+                    }
                     int nConnectionAttempts = mConn.getConnectionAttempts();
                     if (nConnectionAttempts > 
MuxDemux.this.maxConnectionAttempts) {
-                        outgoingConnectionMap.remove(remoteAddress);
+                        if (outgoingConnectionMap.remove(remoteAddress) == 
null) {
+                            LOGGER.warn("connection to {} failed after {} 
attempts, but connection was already removed",
+                                    remoteAddress, nConnectionAttempts);
+                        } else {
+                            LOGGER.debug("connection to {} failed after {} 
attempts, giving up", remoteAddress,
+                                    nConnectionAttempts);
+                        }
                         mConn.setConnectionFailure(new 
IOException(remoteAddress.toString() + ": " + error, error));
                     } else {
                         mConn.setConnectionAttempts(nConnectionAttempts + 1);
@@ -126,9 +142,19 @@
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
                     if (connection.getType() == 
TCPConnection.ConnectionType.OUTGOING) {
-                        
outgoingConnectionMap.remove(connection.getRemoteAddress());
+                        if 
(outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+                            LOGGER.warn("outgoing connection to {} already 
removed on close",
+                                    connection.getRemoteAddress());
+                        } else {
+                            LOGGER.debug("removed outgoing connection to {} on 
close", connection.getRemoteAddress());
+                        }
                     } else if (connection.getType() == 
TCPConnection.ConnectionType.INCOMING) {
-                        
incomingConnectionMap.remove(connection.getRemoteAddress());
+                        if 
(incomingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+                            LOGGER.warn("incoming connection from {} already 
removed on close",
+                                    connection.getRemoteAddress());
+                        } else {
+                            LOGGER.debug("removed incoming connection from {} 
on close", connection.getRemoteAddress());
+                        }
                     }
                 }
             }
@@ -200,7 +226,7 @@
      * @return
      */
     public IChannelInterfaceFactory getChannelInterfaceFactory() {
-        return channelInterfaceFatory;
+        return channelInterfaceFactory;
     }

     public synchronized JsonNode getState() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index b75e3c6..d6ffe23 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -110,7 +110,7 @@

     @Override
     public String toString() {
-        return "TCPEndpoint [Local Address: " + localAddress + "]";
+        return "TCPEndpoint [" + (localAddress == null ? "<outgoing>" : "Local 
Address: " + localAddress) + "]";
     }

     private class IOThread extends Thread {
@@ -127,7 +127,7 @@
         private final Selector selector;

         public IOThread() throws IOException {
-            super("TCPEndpoint IO Thread [" + localAddress + "]");
+            super("TCPEndpoint IO Thread [" + (localAddress == null ? 
"<outgoing>" : localAddress) + "]");
             setDaemon(true);
             setPriority(Thread.NORM_PRIORITY);
             this.pendingConnections = new ArrayList<>();
@@ -144,44 +144,44 @@
                 try {
                     int n = selector.select();
                     collectOutstandingWork();
-                    if (!workingPendingConnections.isEmpty()) {
-                        for (InetSocketAddress address : 
workingPendingConnections) {
-                            SocketChannel channel = SocketChannel.open();
-                            register(channel);
-                            boolean connect = false;
-                            boolean failure = false;
-                            try {
-                                connect = channel.connect(address);
-                            } catch (IOException e) {
-                                failure = true;
-                                synchronized (connectionListener) {
-                                    
connectionListener.connectionFailure(address, e);
-                                }
-                            }
-                            if (!failure) {
-                                if (!connect) {
-                                    SelectionKey key = 
channel.register(selector, SelectionKey.OP_CONNECT);
-                                    key.attach(address);
-                                } else {
-                                    socketConnected(address, channel);
-                                }
+                    for (Iterator<InetSocketAddress> iterator = 
workingPendingConnections.iterator(); iterator
+                            .hasNext();) {
+                        InetSocketAddress address = iterator.next();
+                        iterator.remove();
+                        SocketChannel channel = SocketChannel.open();
+                        register(channel);
+                        boolean connect = false;
+                        boolean failure = false;
+                        try {
+                            connect = channel.connect(address);
+                        } catch (IOException e) {
+                            failure = true;
+                            synchronized (connectionListener) {
+                                connectionListener.connectionFailure(address, 
e);
                             }
                         }
-                        workingPendingConnections.clear();
+                        if (!failure) {
+                            if (!connect) {
+                                SelectionKey key = channel.register(selector, 
SelectionKey.OP_CONNECT);
+                                key.attach(address);
+                            } else {
+                                socketConnected(address, channel);
+                            }
+                        }
                     }
-                    if (!workingIncomingConnections.isEmpty()) {
-                        for (SocketChannel channel : 
workingIncomingConnections) {
-                            register(channel);
-                            connectionReceived(channel);
-                        }
-                        workingIncomingConnections.clear();
+                    for (Iterator<SocketChannel> iterator = 
workingIncomingConnections.iterator(); iterator
+                            .hasNext();) {
+                        SocketChannel channel = iterator.next();
+                        iterator.remove();
+                        register(channel);
+                        connectionReceived(channel);
                     }
                     synchronized (handshakeCompletedConnections) {
-                        if (!handshakeCompletedConnections.isEmpty()) {
-                            for (final PendingHandshakeConnection conn : 
handshakeCompletedConnections) {
-                                handshakeCompleted(conn);
-                            }
-                            handshakeCompletedConnections.clear();
+                        for (Iterator<PendingHandshakeConnection> iterator =
+                                handshakeCompletedConnections.iterator(); 
iterator.hasNext();) {
+                            PendingHandshakeConnection conn = iterator.next();
+                            iterator.remove();
+                            handshakeCompleted(conn);
                         }
                     }
                     if (n > 0) {
@@ -257,7 +257,11 @@
                 }
             } catch (Exception e) {
                 LOGGER.error("failed to establish connection after handshake", 
e);
-                handleHandshakeFailure(conn);
+                try {
+                    handleHandshakeFailure(conn);
+                } catch (Exception ex) {
+                    e.addSuppressed(ex);
+                }
             }
         }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20797?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I3887c992ad73f501f074ff5756f6204506043d65
Gerrit-Change-Number: 20797
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-CC: Anon. E. Moose #1000171

Reply via email to