>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20795?usp=email )
( 1 is the latest approved patch-set. No files were changed between the latest approved patch-set and the submitted one. )Change subject: [ASTERIXDB-3689][HYR][NET] MuxDemux negotiation hardening ...................................................................... [ASTERIXDB-3689][HYR][NET] MuxDemux negotiation hardening Ext-ref: MB-70113,MB-70114 Change-Id: I3887c992ad73f501f074ff5756f6204506043d65 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20795 Reviewed-by: Ian Maxon <[email protected]> Tested-by: Michael Blow <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java 2 files changed, 74 insertions(+), 44 deletions(-) Approvals: Michael Blow: Looks good to me, but someone else must approve; Verified Ian Maxon: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java index 3189ea1..0837b29 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java @@ -30,6 +30,8 @@ import org.apache.hyracks.net.protocols.tcp.TCPConnection; import org.apache.hyracks.net.protocols.tcp.TCPEndpoint; import org.apache.hyracks.util.JSONUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -43,6 +45,8 @@ * @author vinayakb */ public class MuxDemux { + private static final Logger LOGGER = LogManager.getLogger(); + private final InetSocketAddress localAddress; private final IChannelOpenListener channelOpenListener; @@ -56,7 +60,7 @@ private final MuxDemuxPerformanceCounters perfCounters; - private final IChannelInterfaceFactory channelInterfaceFatory; + private final IChannelInterfaceFactory channelInterfaceFactory; /** * Constructor. @@ -80,7 +84,7 @@ this.localAddress = localAddress; this.channelOpenListener = listener; this.maxConnectionAttempts = maxConnectionAttempts; - this.channelInterfaceFatory = channelInterfaceFactory; + this.channelInterfaceFactory = channelInterfaceFactory; outgoingConnectionMap = new HashMap<>(); incomingConnectionMap = new HashMap<>(); this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() { @@ -90,7 +94,10 @@ synchronized (MuxDemux.this) { mConn = outgoingConnectionMap.get(connection.getRemoteAddress()); } - assert mConn != null; + if (mConn == null) { + throw new IllegalStateException( + "connectionEstablished called for unknown connection: " + connection.getRemoteAddress()); + } mConn.setTCPConnection(connection); connection.setEventListener(mConn); connection.setAttachment(mConn); @@ -110,10 +117,19 @@ MultiplexedConnection mConn; synchronized (MuxDemux.this) { mConn = outgoingConnectionMap.get(remoteAddress); - assert mConn != null; + if (mConn == null) { + throw new IllegalStateException( + "connectionFailure called for unknown connection: " + remoteAddress); + } int nConnectionAttempts = mConn.getConnectionAttempts(); if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) { - outgoingConnectionMap.remove(remoteAddress); + if (outgoingConnectionMap.remove(remoteAddress) == null) { + LOGGER.warn("connection to {} failed after {} attempts, but connection was already removed", + remoteAddress, nConnectionAttempts); + } else { + LOGGER.debug("connection to {} failed after {} attempts, giving up", remoteAddress, + nConnectionAttempts); + } mConn.setConnectionFailure(new IOException(remoteAddress.toString() + ": " + error, error)); } else { mConn.setConnectionAttempts(nConnectionAttempts + 1); @@ -126,9 +142,19 @@ public void connectionClosed(TCPConnection connection) { synchronized (MuxDemux.this) { if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) { - outgoingConnectionMap.remove(connection.getRemoteAddress()); + if (outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) { + LOGGER.warn("outgoing connection to {} already removed on close", + connection.getRemoteAddress()); + } else { + LOGGER.debug("removed outgoing connection to {} on close", connection.getRemoteAddress()); + } } else if (connection.getType() == TCPConnection.ConnectionType.INCOMING) { - incomingConnectionMap.remove(connection.getRemoteAddress()); + if (incomingConnectionMap.remove(connection.getRemoteAddress()) == null) { + LOGGER.warn("incoming connection from {} already removed on close", + connection.getRemoteAddress()); + } else { + LOGGER.debug("removed incoming connection from {} on close", connection.getRemoteAddress()); + } } } } @@ -200,7 +226,7 @@ * @return */ public IChannelInterfaceFactory getChannelInterfaceFactory() { - return channelInterfaceFatory; + return channelInterfaceFactory; } public synchronized JsonNode getState() { diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java index b75e3c6..d6ffe23 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java @@ -110,7 +110,7 @@ @Override public String toString() { - return "TCPEndpoint [Local Address: " + localAddress + "]"; + return "TCPEndpoint [" + (localAddress == null ? "<outgoing>" : "Local Address: " + localAddress) + "]"; } private class IOThread extends Thread { @@ -127,7 +127,7 @@ private final Selector selector; public IOThread() throws IOException { - super("TCPEndpoint IO Thread [" + localAddress + "]"); + super("TCPEndpoint IO Thread [" + (localAddress == null ? "<outgoing>" : localAddress) + "]"); setDaemon(true); setPriority(Thread.NORM_PRIORITY); this.pendingConnections = new ArrayList<>(); @@ -144,44 +144,44 @@ try { int n = selector.select(); collectOutstandingWork(); - if (!workingPendingConnections.isEmpty()) { - for (InetSocketAddress address : workingPendingConnections) { - SocketChannel channel = SocketChannel.open(); - register(channel); - boolean connect = false; - boolean failure = false; - try { - connect = channel.connect(address); - } catch (IOException e) { - failure = true; - synchronized (connectionListener) { - connectionListener.connectionFailure(address, e); - } - } - if (!failure) { - if (!connect) { - SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); - key.attach(address); - } else { - socketConnected(address, channel); - } + for (Iterator<InetSocketAddress> iterator = workingPendingConnections.iterator(); iterator + .hasNext();) { + InetSocketAddress address = iterator.next(); + iterator.remove(); + SocketChannel channel = SocketChannel.open(); + register(channel); + boolean connect = false; + boolean failure = false; + try { + connect = channel.connect(address); + } catch (IOException e) { + failure = true; + synchronized (connectionListener) { + connectionListener.connectionFailure(address, e); } } - workingPendingConnections.clear(); + if (!failure) { + if (!connect) { + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); + key.attach(address); + } else { + socketConnected(address, channel); + } + } } - if (!workingIncomingConnections.isEmpty()) { - for (SocketChannel channel : workingIncomingConnections) { - register(channel); - connectionReceived(channel); - } - workingIncomingConnections.clear(); + for (Iterator<SocketChannel> iterator = workingIncomingConnections.iterator(); iterator + .hasNext();) { + SocketChannel channel = iterator.next(); + iterator.remove(); + register(channel); + connectionReceived(channel); } synchronized (handshakeCompletedConnections) { - if (!handshakeCompletedConnections.isEmpty()) { - for (final PendingHandshakeConnection conn : handshakeCompletedConnections) { - handshakeCompleted(conn); - } - handshakeCompletedConnections.clear(); + for (Iterator<PendingHandshakeConnection> iterator = + handshakeCompletedConnections.iterator(); iterator.hasNext();) { + PendingHandshakeConnection conn = iterator.next(); + iterator.remove(); + handshakeCompleted(conn); } } if (n > 0) { @@ -257,7 +257,11 @@ } } catch (Exception e) { LOGGER.error("failed to establish connection after handshake", e); - handleHandshakeFailure(conn); + try { + handleHandshakeFailure(conn); + } catch (Exception ex) { + e.addSuppressed(ex); + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20795?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I3887c992ad73f501f074ff5756f6204506043d65 Gerrit-Change-Number: 20795 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
