>From Janhavi Tripurwar <[email protected]>:

Janhavi Tripurwar has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email )

Change subject: [ASTERIXDB-3764][NET]: Avoid reusing failed connections after 
NC restart
......................................................................

[ASTERIXDB-3764][NET]: Avoid reusing failed connections after NC restart

Details:
- Remove failed connections from outgoingConnectionMap before
  creating new ones.
- Expose hasConnectionFailure() in MultiplexedConnection.
- Ensure only the current connection is removed from the
  outgoingConnectionMap on close.
- Add MuxDemuxReconnectTest to verify reconnection logic.

Ext-ref: MB-71644
Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168
Reviewed-by: Janhavi Tripurwar <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
Tested-by: Jenkins <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
A 
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java
3 files changed, 84 insertions(+), 2 deletions(-)

Approvals:
  Jenkins: Verified
  Janhavi Tripurwar: Looks good to me, but someone else must approve
  Michael Blow: Looks good to me, approved

Objections:
  Anon. E. Moose #1000171: Violations found




diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 061c6eee..ee91b46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -472,4 +472,8 @@
         state.set("channels", channels);
         return Optional.of(state);
     }
+
+    synchronized boolean hasConnectionFailure() {
+        return connectionFailure;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 0837b29..1bbb333 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -142,11 +142,16 @@
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
                     if (connection.getType() == 
TCPConnection.ConnectionType.OUTGOING) {
-                        if 
(outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) {
+                        MultiplexedConnection current = 
outgoingConnectionMap.get(connection.getRemoteAddress());
+                        if (current == connection.getAttachment()) {
+                            
outgoingConnectionMap.remove(connection.getRemoteAddress());
+                            LOGGER.debug("removed outgoing connection to {} on 
close", connection.getRemoteAddress());
+                        } else if (current == null) {
                             LOGGER.warn("outgoing connection to {} already 
removed on close",
                                     connection.getRemoteAddress());
                         } else {
-                            LOGGER.debug("removed outgoing connection to {} on 
close", connection.getRemoteAddress());
+                            LOGGER.debug("ignored close for stale outgoing 
connection to {}",
+                                    connection.getRemoteAddress());
                         }
                     } else if (connection.getType() == 
TCPConnection.ConnectionType.INCOMING) {
                         if 
(incomingConnectionMap.remove(connection.getRemoteAddress()) == null) {
@@ -187,6 +192,14 @@
         MultiplexedConnection mConn;
         synchronized (this) {
             mConn = outgoingConnectionMap.get(remoteAddress);
+            if (mConn != null && mConn.hasConnectionFailure()) {
+                if (outgoingConnectionMap.remove(remoteAddress) == null) {
+                    LOGGER.warn("failed connection to {} was already removed", 
remoteAddress);
+                } else {
+                    LOGGER.debug("removed failed connection to {} before 
reconnect", remoteAddress);
+                }
+                mConn = null;
+            }
             if (mConn == null) {
                 mConn = new MultiplexedConnection(this);
                 outgoingConnectionMap.put(remoteAddress, mConn);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java
new file mode 100644
index 0000000..b4ed7f7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MuxDemuxReconnectTest {
+
+    @Test
+    public void testReconnectReplacesFailedCachedConnection() throws Exception 
{
+        MuxDemux server = createMuxDemux();
+        server.start();
+        MuxDemux client = createMuxDemux();
+        client.start();
+
+        InetSocketAddress serverAddress = server.getLocalAddress();
+        MultiplexedConnection firstConnection = client.connect(serverAddress);
+        Assert.assertSame(firstConnection, getOutgoingConnection(client, 
serverAddress));
+
+        firstConnection.setConnectionFailure(new IOException("forced 
connection failure"));
+
+        MultiplexedConnection secondConnection = client.connect(serverAddress);
+        Assert.assertNotSame("connect() should replace a failed cached 
connection", firstConnection, secondConnection);
+        Assert.assertSame(secondConnection, getOutgoingConnection(client, 
serverAddress));
+    }
+
+    private static MuxDemux createMuxDemux() {
+        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), channel -> {
+            // This test only exercises connection caching and reconnect 
behavior.
+        }, 1, 5, FullFrameChannelInterfaceFactory.INSTANCE, 
PlainSocketChannelFactory.INSTANCE);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static MultiplexedConnection getOutgoingConnection(MuxDemux 
muxDemux, InetSocketAddress remoteAddress)
+            throws Exception {
+        Field outgoingConnectionMapField = 
MuxDemux.class.getDeclaredField("outgoingConnectionMap");
+        outgoingConnectionMapField.setAccessible(true);
+        Map<InetSocketAddress, MultiplexedConnection> outgoingConnectionMap =
+                (Map<InetSocketAddress, MultiplexedConnection>) 
outgoingConnectionMapField.get(muxDemux);
+        return outgoingConnectionMap.get(remoteAddress);
+    }
+}
\ No newline at end of file

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: trinity
Gerrit-Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944
Gerrit-Change-Number: 21168
Gerrit-PatchSet: 5
Gerrit-Owner: Janhavi Tripurwar <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Janhavi Tripurwar <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>

Reply via email to