>From Michael Blow <[email protected]>:
Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20795?usp=email )
Change subject: [NO ISSUE][HYR][NET] MuxDemux negotiation hardening
......................................................................
[NO ISSUE][HYR][NET] MuxDemux negotiation hardening
Ext-ref: MB-70113,MB-70114
Change-Id: I3887c992ad73f501f074ff5756f6204506043d65
---
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
2 files changed, 74 insertions(+), 44 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/95/20795/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 3189ea1..0837b29 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -30,6 +30,8 @@
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
import org.apache.hyracks.util.JSONUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -43,6 +45,8 @@
* @author vinayakb
*/
public class MuxDemux {
+ private static final Logger LOGGER = LogManager.getLogger();
+
private final InetSocketAddress localAddress;
private final IChannelOpenListener channelOpenListener;
@@ -56,7 +60,7 @@
private final MuxDemuxPerformanceCounters perfCounters;
- private final IChannelInterfaceFactory channelInterfaceFatory;
+ private final IChannelInterfaceFactory channelInterfaceFactory;
/**
* Constructor.
@@ -80,7 +84,7 @@
this.localAddress = localAddress;
this.channelOpenListener = listener;
this.maxConnectionAttempts = maxConnectionAttempts;
- this.channelInterfaceFatory = channelInterfaceFactory;
+ this.channelInterfaceFactory = channelInterfaceFactory;
outgoingConnectionMap = new HashMap<>();
incomingConnectionMap = new HashMap<>();
this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
@@ -90,7 +94,10 @@
synchronized (MuxDemux.this) {
mConn =
outgoingConnectionMap.get(connection.getRemoteAddress());
}
- assert mConn != null;
+ if (mConn == null) {
+ throw new IllegalStateException(
+ "connectionEstablished called for unknown
connection: " + connection.getRemoteAddress());
+ }
mConn.setTCPConnection(connection);
connection.setEventListener(mConn);
connection.setAttachment(mConn);
@@ -110,10 +117,19 @@
MultiplexedConnection mConn;
synchronized (MuxDemux.this) {
mConn = outgoingConnectionMap.get(remoteAddress);
- assert mConn != null;
+ if (mConn == null) {
+ throw new IllegalStateException(
+ "connectionFailure called for unknown
connection: " + remoteAddress);
+ }
int nConnectionAttempts = mConn.getConnectionAttempts();
if (nConnectionAttempts >
MuxDemux.this.maxConnectionAttempts) {
- outgoingConnectionMap.remove(remoteAddress);
+ if (outgoingConnectionMap.remove(remoteAddress) ==
null) {
+ LOGGER.warn("connection to {} failed after {}
attempts, but connection was already removed",
+ remoteAddress, nConnectionAttempts);
+ } else {
+ LOGGER.debug("connection to {} failed after {}
attempts, giving up", remoteAddress,
+ nConnectionAttempts);
+ }
mConn.setConnectionFailure(new
IOException(remoteAddress.toString() + ": " + error, error));
} else {
mConn.setConnectionAttempts(nConnectionAttempts + 1);
@@ -126,9 +142,19 @@
public void connectionClosed(TCPConnection connection) {
synchronized (MuxDemux.this) {
if (connection.getType() ==
TCPConnection.ConnectionType.OUTGOING) {
-
outgoingConnectionMap.remove(connection.getRemoteAddress());
+ if
(outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+ LOGGER.warn("outgoing connection to {} already
removed on close",
+ connection.getRemoteAddress());
+ } else {
+ LOGGER.debug("removed outgoing connection to {} on
close", connection.getRemoteAddress());
+ }
} else if (connection.getType() ==
TCPConnection.ConnectionType.INCOMING) {
-
incomingConnectionMap.remove(connection.getRemoteAddress());
+ if
(incomingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+ LOGGER.warn("incoming connection from {} already
removed on close",
+ connection.getRemoteAddress());
+ } else {
+ LOGGER.debug("removed incoming connection from {}
on close", connection.getRemoteAddress());
+ }
}
}
}
@@ -200,7 +226,7 @@
* @return
*/
public IChannelInterfaceFactory getChannelInterfaceFactory() {
- return channelInterfaceFatory;
+ return channelInterfaceFactory;
}
public synchronized JsonNode getState() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index b75e3c6..d6ffe23 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -110,7 +110,7 @@
@Override
public String toString() {
- return "TCPEndpoint [Local Address: " + localAddress + "]";
+ return "TCPEndpoint [" + (localAddress == null ? "<outgoing>" : "Local
Address: " + localAddress) + "]";
}
private class IOThread extends Thread {
@@ -127,7 +127,7 @@
private final Selector selector;
public IOThread() throws IOException {
- super("TCPEndpoint IO Thread [" + localAddress + "]");
+ super("TCPEndpoint IO Thread [" + (localAddress == null ?
"<outgoing>" : localAddress) + "]");
setDaemon(true);
setPriority(Thread.NORM_PRIORITY);
this.pendingConnections = new ArrayList<>();
@@ -144,44 +144,44 @@
try {
int n = selector.select();
collectOutstandingWork();
- if (!workingPendingConnections.isEmpty()) {
- for (InetSocketAddress address :
workingPendingConnections) {
- SocketChannel channel = SocketChannel.open();
- register(channel);
- boolean connect = false;
- boolean failure = false;
- try {
- connect = channel.connect(address);
- } catch (IOException e) {
- failure = true;
- synchronized (connectionListener) {
-
connectionListener.connectionFailure(address, e);
- }
- }
- if (!failure) {
- if (!connect) {
- SelectionKey key =
channel.register(selector, SelectionKey.OP_CONNECT);
- key.attach(address);
- } else {
- socketConnected(address, channel);
- }
+ for (Iterator<InetSocketAddress> iterator =
workingPendingConnections.iterator(); iterator
+ .hasNext();) {
+ InetSocketAddress address = iterator.next();
+ iterator.remove();
+ SocketChannel channel = SocketChannel.open();
+ register(channel);
+ boolean connect = false;
+ boolean failure = false;
+ try {
+ connect = channel.connect(address);
+ } catch (IOException e) {
+ failure = true;
+ synchronized (connectionListener) {
+ connectionListener.connectionFailure(address,
e);
}
}
- workingPendingConnections.clear();
+ if (!failure) {
+ if (!connect) {
+ SelectionKey key = channel.register(selector,
SelectionKey.OP_CONNECT);
+ key.attach(address);
+ } else {
+ socketConnected(address, channel);
+ }
+ }
}
- if (!workingIncomingConnections.isEmpty()) {
- for (SocketChannel channel :
workingIncomingConnections) {
- register(channel);
- connectionReceived(channel);
- }
- workingIncomingConnections.clear();
+ for (Iterator<SocketChannel> iterator =
workingIncomingConnections.iterator(); iterator
+ .hasNext();) {
+ SocketChannel channel = iterator.next();
+ iterator.remove();
+ register(channel);
+ connectionReceived(channel);
}
synchronized (handshakeCompletedConnections) {
- if (!handshakeCompletedConnections.isEmpty()) {
- for (final PendingHandshakeConnection conn :
handshakeCompletedConnections) {
- handshakeCompleted(conn);
- }
- handshakeCompletedConnections.clear();
+ for (Iterator<PendingHandshakeConnection> iterator =
+ handshakeCompletedConnections.iterator();
iterator.hasNext();) {
+ PendingHandshakeConnection conn = iterator.next();
+ iterator.remove();
+ handshakeCompleted(conn);
}
}
if (n > 0) {
@@ -257,7 +257,11 @@
}
} catch (Exception e) {
LOGGER.error("failed to establish connection after handshake",
e);
- handleHandshakeFailure(conn);
+ try {
+ handleHandshakeFailure(conn);
+ } catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20795?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I3887c992ad73f501f074ff5756f6204506043d65
Gerrit-Change-Number: 20795
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>