This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push: new 7838d5d521 GEODE-9484: Improve sending message to multy destinations (#7664) (#7853) 7838d5d521 is described below commit 7838d5d5210ab3afe243b2045242d7bfd6acc6ed Author: Mario Kevo <48509719+mk...@users.noreply.github.com> AuthorDate: Mon Sep 12 10:02:59 2022 +0200 GEODE-9484: Improve sending message to multy destinations (#7664) (#7853) Co-authored-by: Mario Ivanac <48509724+miva...@users.noreply.github.com> --- ....java => UpdatePropagationDistributedTest.java} | 107 ++++++++++++++++--- ...ava => UpdatePropagationPRDistributedTest.java} | 2 +- .../geode/internal/tcp/CloseConnectionTest.java | 2 +- .../geode/internal/tcp/TCPConduitDUnitTest.java | 2 +- .../distributed/internal/direct/DirectChannel.java | 44 +++++--- .../org/apache/geode/internal/tcp/Connection.java | 6 +- .../apache/geode/internal/tcp/ConnectionTable.java | 30 ++++-- .../org/apache/geode/internal/tcp/TCPConduit.java | 118 ++++++++++++++++++--- .../internal/tcp/ConnectionTransmissionTest.java | 2 +- .../apache/geode/internal/tcp/TCPConduitTest.java | 97 ++++++++++++++--- 10 files changed, 343 insertions(+), 67 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java similarity index 78% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java index 0b99a144e5..055780782f 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java @@ -20,6 +20,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -50,6 +51,8 @@ import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.distributed.internal.ServerLocationAndMemberId; +import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.NetworkUtils; @@ -68,53 +71,89 @@ import org.apache.geode.util.internal.GeodeGlossary; * the same across servers */ @Category({ClientSubscriptionTest.class}) -public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase { +public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { private static final String REGION_NAME = "UpdatePropagationDUnitTest_region"; private VM server1 = null; private VM server2 = null; + private VM server3 = null; private VM client1 = null; private VM client2 = null; private int PORT1; private int PORT2; + private int PORT3; + + private final int minNumEntries = 2; + + private String hostnameServer1; + private String hostnameServer3; @Override public final void postSetUp() throws Exception { disconnectAllFromDS(); final Host host = Host.getHost(0); - // Server1 VM + server1 = host.getVM(0); - // Server2 VM server2 = host.getVM(1); - // Client 1 VM - client1 = host.getVM(2); + server3 = host.getVM(2); - // client 2 VM - client2 = host.getVM(3); + client1 = host.getVM(3); - PORT1 = server1.invoke(this::createServerCache); - PORT2 = server2.invoke(this::createServerCache); + client2 = host.getVM(4); - client1.invoke( - () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2)); - client2.invoke( - () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2)); + PORT1 = server1.invoke(() -> createServerCache()); + PORT2 = server2.invoke(() -> createServerCache()); + PORT3 = server3.invoke(() -> createServerCache()); + + hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost()); + hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost()); IgnoredException.addIgnoredException("java.net.SocketException"); IgnoredException.addIgnoredException("Unexpected IOException"); } + + + @Test + public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception { + client1.invoke( + () -> createClientCache(hostnameServer1, PORT1)); + client2.invoke( + () -> createClientCache(hostnameServer3, PORT3)); + int entries = 20; + AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries)); + + // Wait for some entries to be put + server1.invoke(this::verifyMinEntriesInserted); + + // Simulate crash + server2.invoke(() -> { + MembershipManagerHelper.crashDistributedSystem(getSystemStatic()); + }); + + invocation.await(); + + int notNullEntriesIn1 = client1.invoke(() -> getNotNullEntriesNumber(entries)); + int notNullEntriesIn3 = client2.invoke(() -> getNotNullEntriesNumber(entries)); + assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1); + } + /** * This tests whether the updates are received by other clients or not , if there are situation of * Interest List fail over */ @Test public void updatesAreProgegatedAfterFailover() { + client1.invoke( + () -> createClientCache(hostnameServer1, PORT1, PORT2)); + client2.invoke( + () -> createClientCache(hostnameServer1, PORT1, PORT2)); + // First create entries on both servers via the two client client1.invoke(this::createEntriesK1andK2); client2.invoke(this::createEntriesK1andK2); @@ -248,6 +287,18 @@ public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase { .addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME); } + private void createClientCache(String host, Integer port1) { + Properties props = new Properties(); + props.setProperty(LOCATORS, ""); + ClientCacheFactory cf = new ClientCacheFactory(); + cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false) + .setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000) + .setPoolReadTimeout(100).setPoolPingInterval(300); + ClientCache cache = getClientCache(cf); + cache.createClientRegionFactory(ClientRegionShortcut.PROXY) + .create(REGION_NAME); + } + private Integer createServerCache() throws Exception { Cache cache = getCache(); RegionAttributes attrs = createCacheServerAttributes(); @@ -305,6 +356,36 @@ public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase { }); } + private void verifyMinEntriesInserted() { + await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + REGION_NAME)) + .hasSizeGreaterThan(minNumEntries)); + } + + private void doPuts(int entries) throws Exception { + Region<String, String> r1 = getCache().getRegion(REGION_NAME); + assertThat(r1).isNotNull(); + for (int i = 0; i < entries; i++) { + try { + r1.put("" + i, "" + i); + } catch (Exception e) { + } + Thread.sleep(1000); + } + } + + private int getNotNullEntriesNumber(int entries) { + int notNullEntries = 0; + Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME); + assertThat(r1).isNotNull(); + for (int i = 0; i < entries; i++) { + Object value = r1.get("" + i, "" + i); + if (value != null) { + notNullEntries++; + } + } + return notNullEntries; + } + private static class EventTrackingCacheListener extends CacheListenerAdapter { List<EntryEvent> receivedEvents = new ArrayList<>(); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java similarity index 93% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java index 47721ceb2c..77d903ee0e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java @@ -21,7 +21,7 @@ import org.apache.geode.cache.RegionAttributes; /** * subclass of UpdatePropagationDUnitTest to exercise partitioned regions */ -public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest { +public class UpdatePropagationPRDistributedTest extends UpdatePropagationDistributedTest { @Override protected RegionAttributes createCacheServerAttributes() { diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java index cdb5432399..5aeba3fac2 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java @@ -110,7 +110,7 @@ public class CloseConnectionTest implements Serializable { InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem(); InternalDistributedMember otherMember = distributedSystem.getDistributionManager() .getOtherNormalDistributionManagerIds().iterator().next(); - Connection connection = conTable.getConduit().getConnection(otherMember, true, false, + Connection connection = conTable.getConduit().getConnection(otherMember, true, System.currentTimeMillis(), 15000, 0); await().untilAsserted(() -> { // grab the shared, ordered "sender" connection to vm0. It should have a residual diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java index 41d64c67f6..794d6e093d 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java @@ -110,7 +110,7 @@ public class TCPConduitDUnitTest extends DistributedTestCase { assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue(); Connection sharedUnordered = connectionTable.get(otherMember, false, - System.currentTimeMillis(), 15000, 0); + System.currentTimeMillis(), 15000, 0, false); sharedUnordered.requestClose("for testing"); // the sender connection has been closed so we should only have 2 senders now assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index a8a7bb8c20..eaac79f2b8 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -281,11 +281,17 @@ public class DirectChannel { directReply = false; } if (ce != null) { - if (failedCe != null) { - failedCe.getMembers().addAll(ce.getMembers()); - failedCe.getCauses().addAll(ce.getCauses()); + + if (!retry) { + retryInfo = ce; } else { - failedCe = ce; + + if (failedCe != null) { + failedCe.getMembers().addAll(ce.getMembers()); + failedCe.getCauses().addAll(ce.getCauses()); + } else { + failedCe = ce; + } } ce = null; } @@ -293,6 +299,9 @@ public class DirectChannel { if (failedCe != null) { throw failedCe; } + if (retryInfo != null) { + continue; + } return bytesWritten; } @@ -338,7 +347,12 @@ public class DirectChannel { } if (ce != null) { - retryInfo = ce; + if (retryInfo != null) { + retryInfo.getMembers().addAll(ce.getMembers()); + retryInfo.getCauses().addAll(ce.getCauses()); + } else { + retryInfo = ce; + } ce = null; } @@ -423,13 +437,13 @@ public class DirectChannel { * @param retry whether this is a retransmission * @param ackTimeout the ack warning timeout * @param ackSDTimeout the ack severe alert timeout - * @param cons a list to hold the connections + * @param connectionsList a list to hold the connections * @return null if everything went okay, or a ConnectExceptions object if some connections * couldn't be obtained */ private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg, InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry, - long ackTimeout, long ackSDTimeout, List cons) { + long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) { ConnectExceptions ce = null; for (InternalDistributedMember destination : destinations) { if (destination == null) { @@ -458,12 +472,18 @@ public class DirectChannel { if (ackTimeout > 0) { startTime = System.currentTimeMillis(); } - Connection con = conduit.getConnection(destination, preserveOrder, retry, startTime, - ackTimeout, ackSDTimeout); + final Connection connection; + if (!retry) { + connection = conduit.getFirstScanForConnection(destination, preserveOrder, startTime, + ackTimeout, ackSDTimeout); + } else { + connection = conduit.getConnection(destination, preserveOrder, startTime, + ackTimeout, ackSDTimeout); + } - con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 - cons.add(con); - if (con.isSharedResource() && msg instanceof DirectReplyMessage) { + connection.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 + connectionsList.add(connection); + if (connection.isSharedResource() && msg instanceof DirectReplyMessage) { DirectReplyMessage directMessage = (DirectReplyMessage) msg; directMessage.registerProcessor(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 44205d4d63..9e921d7d03 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -961,7 +961,7 @@ public class Connection implements Runnable { final ConnectionTable t, final boolean preserveOrder, final InternalDistributedMember remoteAddr, final boolean sharedResource, - final long startTime, final long ackTimeout, final long ackSATimeout) + final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry) throws IOException, DistributedSystemDisconnectedException { boolean success = false; Connection conn = null; @@ -1021,7 +1021,9 @@ public class Connection implements Runnable { // do not change the text of this exception - it is looked for in exception handlers throw new IOException("Cannot form connection to alert listener " + remoteAddr); } - + if (doNotRetry) { + throw new IOException("Connection not created in first try to " + remoteAddr); + } // Wait briefly... interrupted = Thread.interrupted() || interrupted; try { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index f54f7bd9cd..f1d157d27f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -269,6 +269,7 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackThreshold the ms ack-wait-threshold, or zero * @param ackSAThreshold the ms ack-severe_alert-threshold, or zero + * @param doNotRetry whether we should perform reattempt to create connection * @return the Connection, or null if someone else already created or closed it * @throws IOException if unable to connect */ @@ -276,13 +277,14 @@ public class ConnectionTable { boolean sharedResource, boolean preserveOrder, Map<DistributedMember, Object> m, PendingConnection pc, long startTime, long ackThreshold, - long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException { + long ackSAThreshold, boolean doNotRetry) + throws IOException, DistributedSystemDisconnectedException { // handle new pending connection Connection con = null; try { long senderCreateStartTime = owner.getStats().startSenderCreate(); con = Connection.createSender(owner.getMembership(), this, preserveOrder, id, - sharedResource, startTime, ackThreshold, ackSAThreshold); + sharedResource, startTime, ackThreshold, ackSAThreshold, doNotRetry); owner.getStats().incSenders(sharedResource, preserveOrder, senderCreateStartTime); } finally { // our connection failed to notify anyone waiting for our pending con @@ -350,11 +352,14 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero + * @param doNotRetryWaitForConnection whether we should perform reattempt (or wait) to create + * connection * @return the new Connection, or null if an error * @throws IOException if unable to create the connection */ private Connection getSharedConnection(InternalDistributedMember id, boolean scheduleTimeout, - boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) + boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout, + boolean doNotRetryWaitForConnection) throws IOException, DistributedSystemDisconnectedException { final Map<DistributedMember, Object> m = @@ -387,7 +392,7 @@ public class ConnectionTable { logger.debug("created PendingConnection {}", pc); } result = handleNewPendingConnection(id, true, preserveOrder, m, pc, - startTime, ackTimeout, ackSATimeout); + startTime, ackTimeout, ackSATimeout, doNotRetryWaitForConnection); if (!preserveOrder && scheduleTimeout) { scheduleIdleTimeout(result); } @@ -400,6 +405,10 @@ public class ConnectionTable { throw new IOException("Cannot form connection to alert listener " + id); } + if (doNotRetryWaitForConnection) { + return null; + } + result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(), startTime, ackTimeout, ackSATimeout); if (logger.isDebugEnabled()) { @@ -425,11 +434,13 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero + * @param doNotRetry whether we should perform reattempt to create connection * @return the connection, or null if an error * @throws IOException if the connection could not be created */ Connection getThreadOwnedConnection(InternalDistributedMember id, long startTime, long ackTimeout, - long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { + long ackSATimeout, boolean doNotRetry) + throws IOException, DistributedSystemDisconnectedException { Connection result; // Look for result in the thread local @@ -449,7 +460,7 @@ public class ConnectionTable { // OK, we have to create a new connection. long senderCreateStartTime = owner.getStats().startSenderCreate(); result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime, - ackTimeout, ackSATimeout); + ackTimeout, ackSATimeout, doNotRetry); owner.getStats().incSenders(false, true, senderCreateStartTime); if (logger.isDebugEnabled()) { logger.debug("ConnectionTable: created an ordered connection: {}", result); @@ -521,11 +532,12 @@ public class ConnectionTable { * @param startTime the ms clock start time * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero + * @param doNotRetry whether we should perform reattempt to create connection * @return the new Connection, or null if a problem * @throws IOException if the connection could not be created */ protected Connection get(InternalDistributedMember id, boolean preserveOrder, long startTime, - long ackTimeout, long ackSATimeout) + long ackTimeout, long ackSATimeout, boolean doNotRetry) throws IOException, DistributedSystemDisconnectedException { if (closed) { owner.getCancelCriterion().checkCancelInProgress(null); @@ -535,9 +547,9 @@ public class ConnectionTable { boolean threadOwnsResources = threadOwnsResources(); if (!preserveOrder || !threadOwnsResources) { result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout, - ackSATimeout); + ackSATimeout, doNotRetry); } else { - result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout); + result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout, doNotRetry); } if (result != null) { Assert.assertTrue(result.getPreserveOrder() == preserveOrder); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index 4d6d9c8216..843b49c25f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -719,7 +719,6 @@ public class TCPConduit implements Runnable { * * @param memberAddress the IDS associated with the remoteId * @param preserveOrder whether this is an ordered or unordered connection - * @param retry false if this is the first attempt * @param startTime the time this operation started * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero) * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted @@ -728,7 +727,7 @@ public class TCPConduit implements Runnable { * @return the connection */ public Connection getConnection(InternalDistributedMember memberAddress, - final boolean preserveOrder, boolean retry, long startTime, long ackTimeout, + final boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { if (stopped) { throw new DistributedSystemDisconnectedException("The conduit is stopped"); @@ -742,7 +741,7 @@ public class TCPConduit implements Runnable { try { // If this is the second time through this loop, we had problems. // Tear down the connection so that it gets rebuilt. - if (retry || conn != null) { // not first time in loop + if (conn != null) { // not first time in loop if (!membership.memberExists(memberAddress) || membership.isShunned(memberAddress) || membership.shutdownInProgress()) { @@ -777,18 +776,15 @@ public class TCPConduit implements Runnable { // Close the connection (it will get rebuilt later). getStats().incReconnectAttempts(); - if (conn != null) { - try { - if (logger.isDebugEnabled()) { - logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", - conn, memberInTrouble); - } - conn.closeForReconnect("closing before retrying"); - } catch (CancelException ex) { - throw ex; - } catch (Exception ex) { - // ignored + try { + if (logger.isDebugEnabled()) { + logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", + conn, memberInTrouble); } + conn.closeForReconnect("closing before retrying"); + } catch (CancelException ex) { + throw ex; + } catch (Exception ignored) { } } // not first time in loop @@ -801,7 +797,7 @@ public class TCPConduit implements Runnable { do { retryForOldConnection = false; conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, - ackSATimeout); + ackSATimeout, false); if (conn == null) { // conduit may be closed - otherwise an ioexception would be thrown problem = new IOException( @@ -909,6 +905,98 @@ public class TCPConduit implements Runnable { } } + /** + * Return a connection to the given member. This method performs quick scan for connection. + * Only one attempt to create a connection to the given member . + * + * @param memberAddress the IDS associated with the remoteId + * @param preserveOrder whether this is an ordered or unordered connection + * @param startTime the time this operation started + * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero) + * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted + * (or zero) + * + * @return the connection + */ + public Connection getFirstScanForConnection(InternalDistributedMember memberAddress, + final boolean preserveOrder, long startTime, long ackTimeout, + long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { + if (stopped) { + throw new DistributedSystemDisconnectedException("The conduit is stopped"); + } + + Connection connection = null; + stopper.checkCancelInProgress(null); + boolean interrupted = Thread.interrupted(); + try { + + Exception problem = null; + try { + connection = getConnectionThatIsNotClosed(memberAddress, preserveOrder, startTime, + ackTimeout, ackSATimeout); + + // we have a connection; fall through and return it + } catch (ConnectionException e) { + // Race condition between acquiring the connection and attempting + // to use it: another thread closed it. + problem = e; + // No need to retry since Connection.createSender has already + // done retries and now member is really unreachable for some reason + // even though it may be in the view + } catch (IOException e) { + problem = e; + // don't keep trying to connect to an alert listener + if (AlertingAction.isThreadAlerting()) { + if (logger.isDebugEnabled()) { + logger.debug("Giving up connecting to alert listener {}", memberAddress); + } + } + } + + if (problem != null) { + if (problem instanceof IOException) { + if (problem.getMessage() != null + && problem.getMessage().startsWith("Cannot form connection to alert listener")) { + throw new AlertingIOException((IOException) problem); + } + throw (IOException) problem; + } + throw new IOException( + String.format("Problem connecting to %s", memberAddress), problem); + } + // Success! + + return connection; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private Connection getConnectionThatIsNotClosed(InternalDistributedMember memberAddress, + final boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) + throws IOException, ConnectionException { + boolean debugEnabled = logger.isDebugEnabled(); + Connection connection; + while (true) { + connection = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, + ackSATimeout, true); + if (connection == null) { + throw new IOException("Unable to reconnect to server; possible shutdown: " + memberAddress); + } + + if (!connection.isClosing() && connection.getRemoteAddress().equals(memberAddress)) { + return connection; + } + if (debugEnabled) { + logger.debug("Got an old connection for {}: {}@{}", memberAddress, connection, + connection.hashCode()); + } + connection.closeOldConnection("closing old connection"); + } + } + @Override public String toString() { return String.valueOf(id); diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java index 5a041eb167..906a021dec 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java @@ -168,7 +168,7 @@ public class ConnectionTransmissionTest { senderAddr.setDirectChannelPort(conduit.getPort()); return spy(Connection.createSender(membership, writerTable, true, remoteAddr, true, - System.currentTimeMillis(), 1000, 1000)); + System.currentTimeMillis(), 1000, 1000, false)); } private Connection createReceiverConnectionOnFirstAccept(final ServerSocketChannel acceptorSocket, diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java index 392a5993c0..e1b3ddf49e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java @@ -94,7 +94,8 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(eq(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -102,7 +103,7 @@ public class TCPConduitTest { AlertingAction.execute(() -> { Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -123,13 +124,14 @@ public class TCPConduitTest { doThrow(new IOException("Cannot form connection to alert listener")) // getConnection will loop indefinitely until connectionTable returns connection .doReturn(connection) - .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(eq(member))) .thenReturn(true); when(membership.isShunned(same(member))) .thenReturn(false); - Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + Connection value = tcpConduit.getConnection(member, false, 0L, 0L, 0L); assertThat(value) .isSameAs(connection); @@ -143,12 +145,13 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(eq(member))) .thenReturn(false); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -164,14 +167,15 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -188,7 +192,8 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -197,7 +202,7 @@ public class TCPConduitTest { .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -214,7 +219,8 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + .when(connectionTable) + .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -223,7 +229,7 @@ public class TCPConduitTest { .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -231,6 +237,73 @@ public class TCPConduitTest { .hasMessage("Abandoned because shutdown is in progress"); } + @Test + public void getFirstScanForConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class), + new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + + AlertingAction.execute(() -> { + Throwable thrown = catchThrowable(() -> { + tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(AlertingIOException.class); + }); + } + + @Test + public void getFirstScanForConnectionRethrows_ifCaughtIOException_whileNotAlerting() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class), + new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + Connection connection = mock(Connection.class); + doThrow(new IOException("Connection not created in first try")) + .doReturn(connection) + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(IOException.class); + } + + + @Test + public void getFirstScanForConnectionRethrows_ifCaughtIOException_whithoutMessage() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class), + new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + Connection connection = mock(Connection.class); + doThrow(new IOException()) + .doReturn(connection) + .when(connectionTable) + .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(IOException.class); + } + private Runnable doNothing() { return () -> { // nothing