>From Janhavi Tripurwar <[email protected]>:
Janhavi Tripurwar has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email )
Change subject: [ASTERIXDB-3764][NET]: Avoid reusing failed connections after
NC restart
......................................................................
[ASTERIXDB-3764][NET]: Avoid reusing failed connections after NC restart
Details:
- Remove failed connections from outgoingConnectionMap before creating new ones.
- Expose hasConnectionFailure() in MultiplexedConnection.
- Ensure only the current connection is removed from outgoingConnectionMap on
close.
- Add MuxDemuxReconnectTest to verify reconnection logic.
Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944
---
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
A
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/MuxDemuxReconnectTest.java
3 files changed, 88 insertions(+), 3 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/21168/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 061c6eee..87e743d 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -97,7 +97,7 @@
notifyAll();
}
- synchronized void setConnectionFailure(Exception e) {
+ public synchronized void setConnectionFailure(Exception e) {
this.connectionFailure = true;
this.error = e;
notifyAll();
@@ -472,4 +472,8 @@
state.set("channels", channels);
return Optional.of(state);
}
+
+ synchronized boolean hasConnectionFailure() {
+ return connectionFailure;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 0837b29..1bbb333 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -142,11 +142,16 @@
public void connectionClosed(TCPConnection connection) {
synchronized (MuxDemux.this) {
if (connection.getType() ==
TCPConnection.ConnectionType.OUTGOING) {
- if
(outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+ MultiplexedConnection current =
outgoingConnectionMap.get(connection.getRemoteAddress());
+ if (current == connection.getAttachment()) {
+
outgoingConnectionMap.remove(connection.getRemoteAddress());
+ LOGGER.debug("removed outgoing connection to {} on
close", connection.getRemoteAddress());
+ } else if (current == null) {
LOGGER.warn("outgoing connection to {} already
removed on close",
connection.getRemoteAddress());
} else {
- LOGGER.debug("removed outgoing connection to {} on
close", connection.getRemoteAddress());
+ LOGGER.debug("ignored close for stale outgoing
connection to {}",
+ connection.getRemoteAddress());
}
} else if (connection.getType() ==
TCPConnection.ConnectionType.INCOMING) {
if
(incomingConnectionMap.remove(connection.getRemoteAddress()) == null) {
@@ -187,6 +192,14 @@
MultiplexedConnection mConn;
synchronized (this) {
mConn = outgoingConnectionMap.get(remoteAddress);
+ if (mConn != null && mConn.hasConnectionFailure()) {
+ if (outgoingConnectionMap.remove(remoteAddress) == null) {
+ LOGGER.warn("failed connection to {} was already removed",
remoteAddress);
+ } else {
+ LOGGER.debug("removed failed connection to {} before
reconnect", remoteAddress);
+ }
+ mConn = null;
+ }
if (mConn == null) {
mConn = new MultiplexedConnection(this);
outgoingConnectionMap.put(remoteAddress, mConn);
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/MuxDemuxReconnectTest.java
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/MuxDemuxReconnectTest.java
new file mode 100644
index 0000000..33031ab
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/MuxDemuxReconnectTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.tests;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
+import
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MuxDemuxReconnectTest {
+
+ @Test
+ public void testReconnectReplacesFailedCachedConnection() throws Exception
{
+ MuxDemux server = createMuxDemux();
+ server.start();
+ MuxDemux client = createMuxDemux();
+ client.start();
+
+ InetSocketAddress serverAddress = server.getLocalAddress();
+ MultiplexedConnection firstConnection = client.connect(serverAddress);
+ Assert.assertSame(firstConnection, getOutgoingConnection(client,
serverAddress));
+
+ firstConnection.setConnectionFailure(new IOException("forced
connection failure"));
+
+ MultiplexedConnection secondConnection = client.connect(serverAddress);
+ Assert.assertNotSame("connect() should replace a failed cached
connection", firstConnection, secondConnection);
+ Assert.assertSame(secondConnection, getOutgoingConnection(client,
serverAddress));
+ }
+
+ private static MuxDemux createMuxDemux() {
+ return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), channel -> {
+ // This test only exercises connection caching and reconnect
behavior.
+ }, 1, 5, FullFrameChannelInterfaceFactory.INSTANCE,
PlainSocketChannelFactory.INSTANCE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static MultiplexedConnection getOutgoingConnection(MuxDemux
muxDemux, InetSocketAddress remoteAddress)
+ throws Exception {
+ Field outgoingConnectionMapField =
MuxDemux.class.getDeclaredField("outgoingConnectionMap");
+ outgoingConnectionMapField.setAccessible(true);
+ Map<InetSocketAddress, MultiplexedConnection> outgoingConnectionMap =
+ (Map<InetSocketAddress, MultiplexedConnection>)
outgoingConnectionMapField.get(muxDemux);
+ return outgoingConnectionMap.get(remoteAddress);
+ }
+}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944
Gerrit-Change-Number: 21168
Gerrit-PatchSet: 1
Gerrit-Owner: Janhavi Tripurwar <[email protected]>