>From Janhavi Tripurwar <[email protected]>: Janhavi Tripurwar has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email )
Change subject: [ASTERIXDB-3764][NET]: Avoid reusing failed connections after NC restart ...................................................................... [ASTERIXDB-3764][NET]: Avoid reusing failed connections after NC restart Details: - Remove failed connections from outgoingConnectionMap before creating new ones. - Expose hasConnectionFailure() in MultiplexedConnection. - Ensure only the current connection is removed from the outgoingConnectionMap on close. - Add MuxDemuxReconnectTest to verify reconnection logic. Ext-ref: MB-71644 Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168 Reviewed-by: Janhavi Tripurwar <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java A hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java 3 files changed, 84 insertions(+), 2 deletions(-) Approvals: Jenkins: Verified Janhavi Tripurwar: Looks good to me, but someone else must approve Michael Blow: Looks good to me, approved Objections: Anon. E. Moose #1000171: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java index 061c6eee..ee91b46 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java @@ -472,4 +472,8 @@ state.set("channels", channels); return Optional.of(state); } + + synchronized boolean hasConnectionFailure() { + return connectionFailure; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java index 0837b29..1bbb333 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java @@ -142,11 +142,16 @@ public void connectionClosed(TCPConnection connection) { synchronized (MuxDemux.this) { if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) { - if (outgoingConnectionMap.remove(connection.getRemoteAddress()) == null) { + MultiplexedConnection current = outgoingConnectionMap.get(connection.getRemoteAddress()); + if (current == connection.getAttachment()) { + outgoingConnectionMap.remove(connection.getRemoteAddress()); + LOGGER.debug("removed outgoing connection to {} on close", connection.getRemoteAddress()); + } else if (current == null) { LOGGER.warn("outgoing connection to {} already removed on close", connection.getRemoteAddress()); } else { - LOGGER.debug("removed outgoing connection to {} on close", connection.getRemoteAddress()); + LOGGER.debug("ignored close for stale outgoing connection to {}", + connection.getRemoteAddress()); } } else if (connection.getType() == TCPConnection.ConnectionType.INCOMING) { if (incomingConnectionMap.remove(connection.getRemoteAddress()) == null) { @@ -187,6 +192,14 @@ MultiplexedConnection mConn; synchronized (this) { mConn = outgoingConnectionMap.get(remoteAddress); + if (mConn != null && mConn.hasConnectionFailure()) { + if (outgoingConnectionMap.remove(remoteAddress) == null) { + LOGGER.warn("failed connection to {} was already removed", remoteAddress); + } else { + LOGGER.debug("removed failed connection to {} before reconnect", remoteAddress); + } + mConn = null; + } if (mConn == null) { mConn = new MultiplexedConnection(this); outgoingConnectionMap.put(remoteAddress, mConn); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java new file mode 100644 index 0000000..b4ed7f7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxReconnectTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.net.protocols.muxdemux; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory; +import org.junit.Assert; +import org.junit.Test; + +public class MuxDemuxReconnectTest { + + @Test + public void testReconnectReplacesFailedCachedConnection() throws Exception { + MuxDemux server = createMuxDemux(); + server.start(); + MuxDemux client = createMuxDemux(); + client.start(); + + InetSocketAddress serverAddress = server.getLocalAddress(); + MultiplexedConnection firstConnection = client.connect(serverAddress); + Assert.assertSame(firstConnection, getOutgoingConnection(client, serverAddress)); + + firstConnection.setConnectionFailure(new IOException("forced connection failure")); + + MultiplexedConnection secondConnection = client.connect(serverAddress); + Assert.assertNotSame("connect() should replace a failed cached connection", firstConnection, secondConnection); + Assert.assertSame(secondConnection, getOutgoingConnection(client, serverAddress)); + } + + private static MuxDemux createMuxDemux() { + return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), channel -> { + // This test only exercises connection caching and reconnect behavior. + }, 1, 5, FullFrameChannelInterfaceFactory.INSTANCE, PlainSocketChannelFactory.INSTANCE); + } + + @SuppressWarnings("unchecked") + private static MultiplexedConnection getOutgoingConnection(MuxDemux muxDemux, InetSocketAddress remoteAddress) + throws Exception { + Field outgoingConnectionMapField = MuxDemux.class.getDeclaredField("outgoingConnectionMap"); + outgoingConnectionMapField.setAccessible(true); + Map<InetSocketAddress, MultiplexedConnection> outgoingConnectionMap = + (Map<InetSocketAddress, MultiplexedConnection>) outgoingConnectionMapField.get(muxDemux); + return outgoingConnectionMap.get(remoteAddress); + } +} \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21168?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: trinity Gerrit-Change-Id: If76cee14335deffa14c67982a02fe9f0f8aad944 Gerrit-Change-Number: 21168 Gerrit-PatchSet: 5 Gerrit-Owner: Janhavi Tripurwar <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Janhavi Tripurwar <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
