This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 3927ba9 GEODE-8202: Two-step serial gw sender threads start (#5900) 3927ba9 is described below commit 3927ba9eeed5bee7e7639f926b48409c4b6dd3a7 Author: Alberto Bustamante Reyes <alb3rt...@users.noreply.github.com> AuthorDate: Mon Jan 18 13:57:31 2021 +0100 GEODE-8202: Two-step serial gw sender threads start (#5900) * GEODE-8202: Two-step serial gw sender threads start --- ...iversWithSamePortAndHostnameForSendersTest.java | 127 +++++++++++- .../geode-list-gateway-receivers-server1.gfsh | 20 ++ .../geode-list-gateway-receivers-server2.gfsh | 20 ++ .../codeAnalysis/sanctionedDataSerializables.txt | 8 +- .../geode/cache/configuration/CacheConfig.java | 23 +++ .../org/apache/geode/cache/wan/GatewaySender.java | 10 + .../geode/cache/wan/GatewaySenderFactory.java | 13 ++ .../internal/cache/wan/AbstractGatewaySender.java | 18 ++ .../wan/AbstractGatewaySenderEventProcessor.java | 8 + .../internal/cache/wan/GatewaySenderAdvisor.java | 27 ++- .../cache/wan/GatewaySenderAttributes.java | 6 + ...oncurrentSerialGatewaySenderEventProcessor.java | 57 ++++-- .../geode/internal/cache/xmlcache/CacheXml.java | 5 +- .../internal/cache/xmlcache/CacheXmlGenerator.java | 10 + .../internal/cache/xmlcache/CacheXmlParser.java | 11 ++ .../geode/management/internal/i18n/CliStrings.java | 4 + .../geode.apache.org/schema/cache/cache-1.0.xsd | 1 + .../gfsh/command-pages/create.html.md.erb | 5 + .../cli/commands/CreateGatewaySenderCommand.java | 27 ++- .../cli/functions/GatewaySenderCreateFunction.java | 7 + .../cli/functions/GatewaySenderFunctionArgs.java | 6 + .../sanctioned-geode-gfsh-serializables.txt | 2 +- .../commands/CreateGatewaySenderCommandTest.java | 68 ++++++- ...CreateDestroyGatewaySenderCommandDUnitTest.java | 2 +- .../wan/GatewaySenderEventRemoteDispatcher.java | 93 +++++++-- .../cache/wan/GatewaySenderFactoryImpl.java | 10 +- .../RemoteSerialGatewaySenderEventProcessor.java | 2 +- .../cache/wan/serial/SerialGatewaySenderImpl.java | 1 + ...atewaySenderEventRemoteDispatcherJUnitTest.java | 214 ++++++++++++++++++++- 29 files changed, 751 insertions(+), 54 deletions(-) diff --git a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java index 6be6066..7247362 100644 --- a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java +++ b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java @@ -22,13 +22,17 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; import java.net.URL; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.StringTokenizer; +import java.util.Vector; import com.palantir.docker.compose.DockerComposeRule; import org.junit.BeforeClass; @@ -71,14 +75,8 @@ import org.apache.geode.test.junit.categories.WanTest; * traffic directed to the 2324 port to the receivers in a round robin fashion. * * - Another site consisting of a 1-server, 1-locator Geode cluster. - * The server hosts a partition region (region-wan) and has a parallel gateway receiver + * The server hosts a partition region (region-wan) and has a gateway receiver * to send events to the remote site. - * - * The aim of the tests is verify that when several gateway receivers in a remote site - * share the same port and hostname-for-senders, the pings sent from the gateway senders - * reach the right gateway receiver and not just any of the receivers. Failure to do this - * may result in the closing of connections by a gateway receiver for not having - * received the ping in time. */ @Category({WanTest.class}) public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { @@ -122,6 +120,13 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { super(); } + /** + * The aim of this test is verify that when several gateway receivers in a remote site + * share the same port and hostname-for-senders, the pings sent from the gateway senders + * reach the right gateway receiver and not just any of the receivers. Failure to do this + * may result in the closing of connections by a gateway receiver for not having + * received the ping in time. + */ @Test public void testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceivers() throws InterruptedException { @@ -159,6 +164,105 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { assertEquals(0, senderPoolDisconnects); } + @Test + public void testSerialGatewaySenderThreadsConnectToSameReceiver() { + String senderId = "ln"; + String regionName = "region-wan"; + final int remoteLocPort = 20334; + + int locPort = createLocator(VM.getVM(0), 1, remoteLocPort); + + VM vm1 = VM.getVM(1); + createCache(vm1, locPort); + + createGatewaySender(vm1, senderId, 2, false, 5, + 3, GatewaySender.DEFAULT_ORDER_POLICY); + + createPartitionedRegion(vm1, regionName, senderId, 0, 10); + + assertTrue(allDispatchersConnectedToSameReceiver(1)); + assertTrue(allDispatchersConnectedToSameReceiver(2)); + + } + + @Test + public void testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectToSameServer() { + String senderId = "ln"; + final int remoteLocPort = 20334; + + int locPort = createLocator(VM.getVM(0), 1, remoteLocPort); + + VM vm1 = VM.getVM(1); + createCache(vm1, locPort); + + VM vm2 = VM.getVM(2); + createCache(vm2, locPort); + + createGatewaySender(vm1, senderId, 2, false, 5, + 3, GatewaySender.DEFAULT_ORDER_POLICY); + + Exception exception = + assertThrows(Exception.class, () -> createGatewaySender(vm2, senderId, 2, false, 5, + 3, GatewaySender.DEFAULT_ORDER_POLICY, false)); + assertEquals(exception.getCause().getMessage(), "Cannot create Gateway Sender " + senderId + + " with enforceThreadsConnectSameReceiver false because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver true"); + + } + + private boolean allDispatchersConnectedToSameReceiver(int server) { + + String gfshOutput = runListGatewayReceiversCommandInServer(server); + Vector<String> sendersConnectedToServer = parseSendersConnectedFromGfshOutput(gfshOutput); + String firstSenderId = ""; + for (String senderId : sendersConnectedToServer) { + if (firstSenderId.equals("")) { + firstSenderId = senderId; + } else { + assertEquals("Found two different senders (" + firstSenderId + " and " + senderId + + ") connected to same receiver in server " + server, firstSenderId, senderId); + } + } + return true; + } + + + private String runListGatewayReceiversCommandInServer(int serverN) { + String result = ""; + try { + result = docker.get().exec(options("-T"), "locator", + arguments("gfsh", "run", + "--file=/geode/scripts/geode-list-gateway-receivers-server" + serverN + ".gfsh")); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + return result; + } + } + + private Vector<String> parseSendersConnectedFromGfshOutput(String gfshOutput) { + String lines[] = gfshOutput.split(System.getProperty("line.separator")); + final String sendersConnectedColumnHeader = "Senders Connected"; + String receiverInfo = null; + for (int i = 0; i < lines.length; i++) { + if (lines[i].contains(sendersConnectedColumnHeader)) { + receiverInfo = lines[i + 2]; + break; + } + } + assertNotNull( + "Error parsing gfsh output. '" + sendersConnectedColumnHeader + "' column header not found", + receiverInfo); + String[] tableRow = receiverInfo.split("\\|"); + String sendersConnectedColumnValue = tableRow[3].trim(); + Vector<String> senders = new Vector<String>(); + for (String sender : sendersConnectedColumnValue.split(",")) { + senders.add(sender.trim()); + } + return senders; + } + private int createLocator(VM memberVM, int dsId, int remoteLocPort) { return memberVM.invoke("create locator", () -> { Properties props = new Properties(); @@ -182,6 +286,14 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { boolean isParallel, Integer batchSize, int numDispatchers, GatewaySender.OrderPolicy orderPolicy) { + createGatewaySender(vm, dsName, remoteDsId, isParallel, batchSize, numDispatchers, orderPolicy, + true); + } + + public static void createGatewaySender(VM vm, String dsName, int remoteDsId, + boolean isParallel, Integer batchSize, + int numDispatchers, + GatewaySender.OrderPolicy orderPolicy, boolean enforceThreadsConnectToSameReceiver) { vm.invoke(() -> { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); try { @@ -191,6 +303,7 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { gateway.setBatchSize(batchSize); gateway.setDispatcherThreads(numDispatchers); gateway.setOrderPolicy(orderPolicy); + gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectToSameReceiver); gateway.create(dsName, remoteDsId); } finally { diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh new file mode 100644 index 0000000..a0d61bb --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set variable --name=APP_RESULT_VIEWER --value=200 +connect --locator=locator[20334] +list gateways --receivers-only --member=server1 diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh new file mode 100644 index 0000000..37a16dc --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set variable --name=APP_RESULT_VIEWER --value=200 +connect --locator=locator[20334] +list gateways --receivers-only --member=server2 diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 65d8796..50ce9e4 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1910,10 +1910,12 @@ org/apache/geode/internal/cache/versions/VersionTag,2 fromData,225 toData,254 -org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4 -fromData,283 +org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6 +fromData,17 +fromDataPre_GEODE_1_14_0_0,293 fromDataPre_GFE_8_0_0_0,188 -toData,271 +toData,17 +toDataPre_GEODE_1_14_0_0,281 toDataPre_GFE_8_0_0_0,236 org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2 diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java index f1b119b..85fe90e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java +++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java @@ -2656,6 +2656,8 @@ public class CacheConfig { protected String orderPolicy; @XmlAttribute(name = "group-transaction-events") protected Boolean groupTransactionEvents; + @XmlAttribute(name = "enforce-threads-connect-same-receiver") + protected Boolean enforceThreadsConnectSameReceiver; /** * Gets the value of the gatewayEventFilters property. @@ -3100,6 +3102,27 @@ public class CacheConfig { this.orderPolicy = value; } + /** + * Sets the value of the enforceThreadsConnectSameReceiver property. + * + * allowed object is + * {@link Boolean } + * + */ + public void setEnforceThreadsConnectSameReceiver(Boolean value) { + this.enforceThreadsConnectSameReceiver = value; + } + + /** + * Gets the value of the enforceThreadsConnectSameReceiver property. + * + * possible object is + * {@link Boolean } + * + */ + public Boolean getEnforceThreadsConnectSameReceiver() { + return this.enforceThreadsConnectSameReceiver; + } } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java index ffacf4b..5e0e9f1 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java @@ -147,6 +147,8 @@ public interface GatewaySender { boolean DEFAULT_IS_FOR_INTERNAL_USE = false; + boolean DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER = false; + /** * Retry a connection from sender to receiver after specified time interval (in milliseconds) in * case receiver is not up and running. Default is set to 1000 milliseconds i.e. 1 second. @@ -449,4 +451,12 @@ public interface GatewaySender { * */ void destroy(); + + /** + * Returns enforceThreadsConnectSameReceiver boolean property for this GatewaySender. + * + * @return enforceThreadsConnectSameReceiver boolean property for this GatewaySender + * + */ + boolean getEnforceThreadsConnectSameReceiver(); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java index 7c99214..6c9e92b 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java @@ -191,6 +191,19 @@ public interface GatewaySenderFactory { GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter); /** + * If true, receiver member id is checked by all dispatcher threads when the connection is + * established to ensure they connect to the same receiver. Instead of starting all dispatcher + * threads in parallel, one thread is started first, and after that the rest are started in + * parallel. Default is false. + * + * @param enforceThreadsConnectSameReceiver boolean if true threads will verify if they are + * connected to the same receiver + * + */ + GatewaySenderFactory setEnforceThreadsConnectSameReceiver( + boolean enforceThreadsConnectSameReceiver); + + /** * Creates a <code>GatewaySender</code> to communicate with remote distributed system * * @param id unique id for this SerialGatewaySender diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 7f6c8a1..4ea2c6d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -171,6 +171,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di private ServerLocation serverLocation; + private String expectedReceiverUniqueId = ""; + protected Object queuedEventsSync = new Object(); protected volatile boolean enqueuedAllTempQueueEvents = false; @@ -237,6 +239,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di private final StatisticsClock statisticsClock; + protected boolean enforceThreadsConnectSameReceiver; + protected AbstractGatewaySender() { statisticsClock = disabledClock(); } @@ -275,6 +279,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads; this.serialNumber = DistributionAdvisor.createSerialNumber(); this.isMetaQueue = attrs.isMetaQueue(); + this.enforceThreadsConnectSameReceiver = attrs.getEnforceThreadsConnectSameReceiver(); if (!(this.cache instanceof CacheCreation)) { this.myDSId = this.cache.getInternalDistributedSystem().getDistributionManager() .getDistributedSystemId(); @@ -500,6 +505,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } @Override + public boolean getEnforceThreadsConnectSameReceiver() { + return this.enforceThreadsConnectSameReceiver; + } + + @Override public boolean equals(Object obj) { if (obj == null) { return false; @@ -1429,6 +1439,14 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } } + public void setExpectedReceiverUniqueId(String expectedReceiverUniqueId) { + this.expectedReceiverUniqueId = expectedReceiverUniqueId; + } + + public String getExpectedReceiverUniqueId() { + return this.expectedReceiverUniqueId; + } + /** * Has a reference to a GatewayEventImpl and has a timeout value. */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 0609ec9..294121a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -154,6 +154,14 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread this.threadMonitoring = tMonitoring; } + public void setExpectedReceiverUniqueId(String uniqueId) { + this.sender.setExpectedReceiverUniqueId(uniqueId); + } + + public String getExpectedReceiverUniqueId() { + return this.sender.getExpectedReceiverUniqueId(); + } + public Object getRunningStateLock() { return runningStateLock; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java index adf80cb..6af0866 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java @@ -232,6 +232,15 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { "Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s", sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous())); } + if (sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0)) { + if (sp.enforceThreadsConnectSameReceiver != sender.getEnforceThreadsConnectSameReceiver()) { + throw new IllegalStateException( + String.format( + "Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s", + sp.Id, sp.enforceThreadsConnectSameReceiver, + sender.getEnforceThreadsConnectSameReceiver())); + } + } } /** @@ -532,6 +541,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { public ServerLocation serverLocation; + public boolean enforceThreadsConnectSameReceiver = false; + public GatewaySenderProfile(InternalDistributedMember memberId, int version) { super(memberId, version); } @@ -541,6 +552,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { + fromDataPre_GEODE_1_14_0_0(in, context); + this.enforceThreadsConnectSameReceiver = in.readBoolean(); + } + + public void fromDataPre_GEODE_1_14_0_0(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { super.fromData(in, context); this.Id = DataSerializer.readString(in); this.startTime = in.readLong(); @@ -578,11 +595,18 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { this.serverLocation = new ServerLocation(); InternalDataSerializer.invokeFromData(this.serverLocation, in); } + this.enforceThreadsConnectSameReceiver = in.readBoolean(); } @Override public void toData(DataOutput out, SerializationContext context) throws IOException { + toDataPre_GEODE_1_14_0_0(out, context); + out.writeBoolean(enforceThreadsConnectSameReceiver); + } + + public void toDataPre_GEODE_1_14_0_0(DataOutput out, + SerializationContext context) throws IOException { super.toData(out, context); DataSerializer.writeString(Id, out); out.writeLong(startTime); @@ -617,6 +641,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (serverLocationFound) { InternalDataSerializer.invokeToData(serverLocation, out); } + out.writeBoolean(enforceThreadsConnectSameReceiver); } public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context) @@ -684,7 +709,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Immutable private static final KnownVersion[] serializationVersions = - new KnownVersion[] {KnownVersion.GFE_80}; + new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0}; @Override public KnownVersion[] getSerializationVersions() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java index 1457776..581b576 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java @@ -85,6 +85,9 @@ public class GatewaySenderAttributes { public boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY; + public boolean enforceThreadsConnectSameReceiver = + GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER; + public int getSocketBufferSize() { return this.socketBufferSize; } @@ -205,4 +208,7 @@ public class GatewaySenderAttributes { return this.forwardExpirationDestroy; } + public boolean getEnforceThreadsConnectSameReceiver() { + return this.enforceThreadsConnectSameReceiver; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index 06f74ae..7adf996 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -179,12 +179,27 @@ public class ConcurrentSerialGatewaySenderEventProcessor @Override public void run() { - for (int i = 0; i < this.processors.size(); i++) { - if (logger.isDebugEnabled()) { + boolean isDebugEnabled = logger.isDebugEnabled(); + if (this.sender.getEnforceThreadsConnectSameReceiver()) { + this.processors.get(0).start(); + waitForRunningStatus(this.processors.get(0)); + String receiverUniqueId = this.processors.get(0).getExpectedReceiverUniqueId(); + if (isDebugEnabled) { + logger.debug("First dispatcher is connected to " + receiverUniqueId); + } + for (int j = 1; j < this.processors.size(); j++) { + this.processors.get(j).setExpectedReceiverUniqueId(receiverUniqueId); + } + } + + for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors + .size(); i++) { + if (isDebugEnabled) { logger.debug("Starting the serialProcessor {}", i); } this.processors.get(i).start(); } + try { waitForRunningStatus(); } catch (GatewaySenderException e) { @@ -205,7 +220,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor try { serialProcessor.join(); } catch (InterruptedException e) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("Got InterruptedException while waiting for child threads to finish."); Thread.currentThread().interrupt(); } @@ -219,24 +234,28 @@ public class ConcurrentSerialGatewaySenderEventProcessor throw new UnsupportedOperationException(); } - private void waitForRunningStatus() { - for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { - synchronized (serialProcessor.getRunningStateLock()) { - while (serialProcessor.getException() == null && serialProcessor.isStopped()) { - try { - serialProcessor.getRunningStateLock().wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - Exception ex = serialProcessor.getException(); - if (ex != null) { - throw new GatewaySenderException( - String.format("Could not start a gateway sender %s because of exception %s", - new Object[] {this.sender.getId(), ex.getMessage()}), - ex.getCause()); + private void waitForRunningStatus(SerialGatewaySenderEventProcessor serialProcessor) { + synchronized (serialProcessor.getRunningStateLock()) { + while (serialProcessor.getException() == null && serialProcessor.isStopped()) { + try { + serialProcessor.getRunningStateLock().wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } + Exception ex = serialProcessor.getException(); + if (ex != null) { + throw new GatewaySenderException( + String.format("Could not start a gateway sender %s because of exception %s", + new Object[] {this.sender.getId(), ex.getMessage()}), + ex.getCause()); + } + } + } + + private void waitForRunningStatus() { + for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { + waitForRunningStatus(serialProcessor); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java index 1ac84a4..9970c55 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java @@ -433,8 +433,11 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { protected static final String ORDER_POLICY = "order-policy"; /** The name of the <code>remote-distributed-system</code> attribute */ protected static final String REMOTE_DISTRIBUTED_SYSTEM_ID = "remote-distributed-system-id"; + /** The name of the <code>group-transaction-events</code> attribute */ protected static final String GROUP_TRANSACTION_EVENTS = "group-transaction-events"; - + /** The name of the <code>enforce-threads-connect-same-receiver</code> attribute */ + protected static final String ENFORCE_THREADS_CONNECT_SAME_RECEIVER = + "enforce-threads-connect-same-receiver"; /** The name of the <code>bind-address</code> attribute */ protected static final String BIND_ADDRESS = "bind-address"; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java index ea19b88..04ac07e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java @@ -1394,6 +1394,16 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader { } } + // enforce-threads-connect-same-receiver + if (version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) { + if (generateDefaults() + || sender + .getEnforceThreadsConnectSameReceiver() != GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER) { + atts.addAttribute("", "", ENFORCE_THREADS_CONNECT_SAME_RECEIVER, "", + String.valueOf(sender.getEnforceThreadsConnectSameReceiver())); + } + } + handler.startElement("", GATEWAY_SENDER, GATEWAY_SENDER, atts); for (GatewayEventFilter gef : sender.getGatewayEventFilters()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java index c775086..ce4d211 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java @@ -707,6 +707,17 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { gatewaySenderFactory .setGroupTransactionEvents(Boolean.parseBoolean(groupTransactionEvents)); } + + String enforceThreadsConnectSameReceiver = atts.getValue(ENFORCE_THREADS_CONNECT_SAME_RECEIVER); + if (enforceThreadsConnectSameReceiver == null) { + gatewaySenderFactory + .setEnforceThreadsConnectSameReceiver( + GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER); + } else { + gatewaySenderFactory + .setEnforceThreadsConnectSameReceiver( + Boolean.parseBoolean(enforceThreadsConnectSameReceiver)); + } } private void startGatewayReceiver(Attributes atts) { diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java index 4ce4416..3b08621 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java @@ -2267,6 +2267,10 @@ public class CliStrings { "GatewaySender \"{0}\" created on \"{1}\""; public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS = "Gateway Sender cannot be created until all members are the current version"; + public static final String CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER = + "enforce-threads-connect-same-receiver"; + public static final String CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP = + "Whether or not the sender threads have to verify the receiver member id to verify if they are connected to the same server."; /* start gateway-sender */ public static final String START_GATEWAYSENDER = "start gateway-sender"; diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd index 53b57f2..db6841a 100755 --- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd +++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd @@ -203,6 +203,7 @@ declarative caching XML file elements unless indicated otherwise. <xsd:attribute name="dispatcher-threads" type="xsd:string" use="optional" /> <xsd:attribute name="order-policy" type="xsd:string" use="optional" /> <xsd:attribute name="group-transaction-events" type="xsd:boolean" use="optional" /> + <xsd:attribute name="enforce-threads-connect-same-receiver" type="xsd:boolean" use="optional" /> </xsd:complexType> </xsd:element> diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb index 30b4da1..11b4192 100644 --- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb +++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb @@ -638,7 +638,12 @@ create gateway-sender --id=value --remote-distributed-system-id=value <p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p> <p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p> <p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td> +<td>false</td> </td> +</tr> +<tr> +<td><span class="keyword parmname">\-\-enforce-threads-connect-same-receiver</span></td> +<td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td> <td>false</td> </tr> </tbody> diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java index a6b40b9..474153b 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java @@ -137,14 +137,20 @@ public class CreateGatewaySenderCommand extends SingleGfshCommand { help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER__HELP) String[] gatewayEventFilters, @CliOption(key = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER, - help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] gatewayTransportFilter) { + help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] gatewayTransportFilter, + + @CliOption(key = CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER, + specifiedDefaultValue = "true", + unspecifiedDefaultValue = "false", + help = CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP) Boolean enforceThreadsConnectSameReceiver) { CacheConfig.GatewaySender configuration = buildConfiguration(id, remoteDistributedSystemId, parallel, manualStart, socketBufferSize, socketReadTimeout, enableBatchConflation, batchSize, batchTimeInterval, enablePersistence, diskStoreName, diskSynchronous, maxQueueMemory, alertThreshold, dispatcherThreads, orderPolicy == null ? null : orderPolicy.name(), - gatewayEventFilters, gatewayTransportFilter, groupTransactionEvents); + gatewayEventFilters, gatewayTransportFilter, groupTransactionEvents, + enforceThreadsConnectSameReceiver); GatewaySenderFunctionArgs gatewaySenderFunctionArgs = new GatewaySenderFunctionArgs(configuration); @@ -228,7 +234,8 @@ public class CreateGatewaySenderCommand extends SingleGfshCommand { String orderPolicy, String[] gatewayEventFilters, String[] gatewayTransportFilters, - Boolean groupTransactionEvents) { + Boolean groupTransactionEvents, + Boolean enforceThreadsConnectSameReceiver) { CacheConfig.GatewaySender sender = new CacheConfig.GatewaySender(); sender.setId(id); sender.setRemoteDistributedSystemId(int2string(remoteDSId)); @@ -253,7 +260,7 @@ public class CreateGatewaySenderCommand extends SingleGfshCommand { if (gatewayTransportFilters != null) { sender.getGatewayTransportFilters().addAll(stringsToDeclarableTypes(gatewayTransportFilters)); } - + sender.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver); return sender; } @@ -284,6 +291,10 @@ public class CreateGatewaySenderCommand extends SingleGfshCommand { Boolean batchConflationEnabled = (Boolean) parseResult .getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION); + Boolean enforceThreadsConnectSameReceiver = + (Boolean) parseResult + .getParamValue( + CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER); if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == null) { return ResultModel.createError( @@ -306,6 +317,14 @@ public class CreateGatewaySenderCommand extends SingleGfshCommand { "Gateway Sender cannot be created with both --group-transaction-events and --enable-batch-conflation."); } + if (parallel && enforceThreadsConnectSameReceiver) { + return ResultModel + .createError( + "Option --" + CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER + + " only applies to serial gateway senders."); + + } + return ResultModel.createInfo(""); } } diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java index 00efaa5..9d7e75f 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java @@ -173,6 +173,13 @@ public class GatewaySenderCreateFunction implements InternalFunction<GatewaySend gatewayTransportFilterKlass, CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER)); } } + + Boolean enforceThreadsConnectSameReceiver = + gatewaySenderCreateArgs.getEnforceThreadsConnectSameReceiver(); + if (enforceThreadsConnectSameReceiver != null) { + gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver); + } + return gateway.create(gatewaySenderCreateArgs.getId(), gatewaySenderCreateArgs.getRemoteDistributedSystemId()); } diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java index 2b08ef9..dd70bda 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java @@ -46,6 +46,7 @@ public class GatewaySenderFunctionArgs implements Serializable { // array of fully qualified class names of the filters private final List<String> gatewayEventFilters; private final List<String> gatewayTransportFilters; + private final Boolean enforceThreadsConnectSameReceiver; public GatewaySenderFunctionArgs(CacheConfig.GatewaySender sender) { this.id = sender.getId(); @@ -77,6 +78,7 @@ public class GatewaySenderFunctionArgs implements Serializable { .stream().map(DeclarableType::getClassName) .collect(Collectors.toList())) .orElse(null); + this.enforceThreadsConnectSameReceiver = sender.getEnforceThreadsConnectSameReceiver(); } private Integer string2int(String x) { @@ -158,4 +160,8 @@ public class GatewaySenderFunctionArgs implements Serializable { public List<String> getGatewayTransportFilter() { return this.gatewayTransportFilters; } + + public Boolean getEnforceThreadsConnectSameReceiver() { + return this.enforceThreadsConnectSameReceiver; + } } diff --git a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt index c60d599..b368b1f 100644 --- a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt +++ b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt @@ -65,7 +65,7 @@ org/apache/geode/management/internal/cli/functions/GatewayReceiverCreateFunction org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction,true,8746830191680509335 org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunction,true,1 org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunctionArgs,true,3848480256348119530,id:java/lang/String,ifExists:boolean -org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i [...] +org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i [...] org/apache/geode/management/internal/cli/functions/GetMemberConfigInformationFunction,true,1 org/apache/geode/management/internal/cli/functions/GetRegionDescriptionFunction,true,1 org/apache/geode/management/internal/cli/functions/GetRegionsFunction,true,1 diff --git a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java index 2784ba5..585158d 100644 --- a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java +++ b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java @@ -275,7 +275,7 @@ public class CreateGatewaySenderCommandTest { assertThat(argsArgumentCaptor.getValue().getGatewayEventFilter()).isNotNull().isEmpty(); assertThat(argsArgumentCaptor.getValue().getGatewayTransportFilter()).isNotNull().isEmpty(); assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isNotNull(); - + assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse(); } @Test @@ -347,4 +347,70 @@ public class CreateGatewaySenderCommandTest { assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isFalse(); } + + @Test + public void testEnforceThreadsConnectSameReceiverCannotBeUsedForParallelSenders() { + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1 --parallel --enforce-threads-connect-same-receiver") + .statusIsError() + .containsOutput( + "Option --" + CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER + + " only applies to serial gateway senders."); + } + + @Test + public void testEnforceThreadsConnectSameReceiverIsTrueWhenUsedWithoutValue() { + doReturn(mock(Set.class)).when(command).getMembers(any(), any()); + cliFunctionResult = + new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult"); + functionResults.add(cliFunctionResult); + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver") + .statusIsSuccess(); + verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any()); + + assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue(); + } + + @Test + public void testEnforceThreadsConnectSameReceiverIsFalseWhenSetToFalse() { + doReturn(mock(Set.class)).when(command).getMembers(any(), any()); + cliFunctionResult = + new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult"); + functionResults.add(cliFunctionResult); + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver=false") + .statusIsSuccess(); + verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any()); + + assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse(); + } + + @Test + public void testEnforceThreadsConnectSameReceiverIsTrueWhenSetToTrue() { + doReturn(mock(Set.class)).when(command).getMembers(any(), any()); + cliFunctionResult = + new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult"); + functionResults.add(cliFunctionResult); + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver=true") + .statusIsSuccess(); + verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any()); + + assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue(); + } + + @Test + public void testEnforceThreadsConnectSameReceiverIsFalseByDefault() { + doReturn(mock(Set.class)).when(command).getMembers(any(), any()); + cliFunctionResult = + new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult"); + functionResults.add(cliFunctionResult); + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1") + .statusIsSuccess(); + verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any()); + + assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse(); + } } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java index 734ec8e..854329e 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java @@ -112,7 +112,7 @@ public class CreateDestroyGatewaySenderCommandDUnitTest implements Serializable String xml = locator.getConfigurationPersistenceService().getConfiguration("cluster") .getCacheXmlContent(); assertThat(xml).contains( - "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" enable-persistence=\"false\" disk-synchronous=\"true\" group-transaction-events=\"false\"/>"); + "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" enable-persistence=\"false\" disk-synchronous=\"true\" group-transaction-events=\"false\" enforce-threads-connect-same-receiver=\"false\"/>"); }); // destroy gateway sender and verify AEQs cleaned up diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 1199fb9..4b7e330 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -18,8 +18,10 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.Vector; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.regex.Pattern; import org.apache.logging.log4j.Logger; @@ -63,6 +65,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock(); + protected static final String maxAttemptsReachedConnectingServerIdExceptionMessage = + "Reached max attempts number trying to connect to desired server id"; + /* * Called after each attempt at processing an outbound (dispatch) or inbound (ack) * message, whether the attempt is successful or not. The purpose is testability. @@ -86,7 +91,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) { this.processor = eventProcessor; this.sender = eventProcessor.getSender(); - // this.ackReaderThread = new AckReaderThread(sender); try { initializeConnection(); } catch (GatewaySenderException e) { @@ -362,11 +366,70 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis } } + Connection retryInitializeConnection(Connection con) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + String connectedServerId = con.getEndpoint().getMemberId().getUniqueId(); + String expectedServerId = this.processor.getExpectedReceiverUniqueId(); + + if (expectedServerId.equals("")) { + if (isDebugEnabled) { + logger.debug("First dispatcher connected to server " + connectedServerId); + } + this.processor.setExpectedReceiverUniqueId(connectedServerId); + return con; + } + + int attempt = 0; + final int attemptsPerServer = 5; + int maxAttempts = attemptsPerServer; + Vector<String> notExpectedServerIds = new Vector<String>(); + boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId); + while (!connectedToExpectedReceiver) { + + if (isDebugEnabled) { + logger.debug("Dispatcher wants to connect to [" + expectedServerId + + "] but got connection to [" + connectedServerId + "]"); + } + attempt++; + if (!notExpectedServerIds.contains(connectedServerId)) { + if (isDebugEnabled) { + logger.debug( + "Increasing dispatcher connection max retries number due to connection to unknown server (" + + connectedServerId + ")"); + } + notExpectedServerIds.add(connectedServerId); + maxAttempts += attemptsPerServer; + } + + if (attempt >= maxAttempts) { + throw new ServerConnectivityException(maxAttemptsReachedConnectingServerIdExceptionMessage + + " [" + expectedServerId + "] (" + maxAttempts + " attempts)."); + } + + con.destroy(); + this.sender.getProxy().returnConnection(con); + con = this.sender.getProxy().acquireConnection(); + + connectedServerId = con.getEndpoint().getMemberId().getUniqueId(); + if (connectedServerId.equals(expectedServerId)) { + connectedToExpectedReceiver = true; + } + } + + if (isDebugEnabled) { + logger.debug("Dispatcher connected to expected endpoint " + connectedServerId + + " after " + attempt + " retries."); + } + return con; + } + /** * Initializes the <code>Connection</code>. * */ + @VisibleForTesting void initializeConnection() throws GatewaySenderException, GemFireSecurityException { + final boolean isDebugEnabled = logger.isDebugEnabled(); if (ackReaderThread != null) { ackReaderThread.shutDownAckReaderConnection(connection); } @@ -397,26 +460,24 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis synchronized (this.sender.getLockForConcurrentDispatcher()) { ServerLocation server = this.sender.getServerLocation(); if (server != null) { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server); } con = this.sender.getProxy().acquireConnection(server); } else { - if (logger.isDebugEnabled()) { + if (isDebugEnabled) { logger.debug("ServerLocation is null. Creating new connection. "); } con = this.sender.getProxy().acquireConnection(); - // Acquired connection from pool!! Update the server location - // information in the sender and - // distribute the information to other senders ONLY IF THIS SENDER - // IS - // PRIMARY - if (this.sender.isPrimary()) { - if (sender.getServerLocation() == null) { - sender.setServerLocation(con.getServer()); - } - new UpdateAttributesProcessor(this.sender).distribute(false); + } + if (this.sender.getEnforceThreadsConnectSameReceiver()) { + con = retryInitializeConnection(con); + } + if (this.sender.isPrimary()) { + if (sender.getServerLocation() == null) { + sender.setServerLocation(con.getServer()); } + new UpdateAttributesProcessor(this.sender).distribute(false); } } } @@ -486,6 +547,12 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis "No available connection was found, but the following active servers exist: %s", buffer.toString()); } + if (this.sender.getEnforceThreadsConnectSameReceiver() && e.getMessage() != null) { + if (Pattern.compile(maxAttemptsReachedConnectingServerIdExceptionMessage + ".*") + .matcher(e.getMessage()).find()) { + ioMsg += " " + e.getMessage(); + } + } IOException ex = new IOException(ioMsg); gse = new GatewaySenderException( String.format("%s : Could not connect due to: %s", diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java index 2a7cfd7..c0d2051 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java @@ -199,6 +199,13 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory { } @Override + public GatewaySenderFactory setEnforceThreadsConnectSameReceiver( + boolean enforceThreadsConnectSameReceiver) { + this.attrs.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver; + return this; + } + + @Override public GatewaySender create(String id, int remoteDSId) { int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager() .getDistributedSystemId(); @@ -291,7 +298,6 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory { if (this.cache instanceof GemFireCacheImpl) { sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs); this.cache.addGatewaySender(sender); - if (!this.attrs.isManualStart()) { sender.start(); } @@ -394,5 +400,7 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory { } this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter(); this.attrs.groupTransactionEvents = senderCreation.mustGroupTransactionEvents(); + this.attrs.enforceThreadsConnectSameReceiver = + senderCreation.getEnforceThreadsConnectSameReceiver(); } } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java index f18ce81..ef3e599 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java @@ -35,7 +35,7 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender @Override public void initializeEventDispatcher() { if (logger.isDebugEnabled()) { - logger.debug(" Creating the GatewayEventRemoteDispatcher"); + logger.debug("Creating the GatewayEventRemoteDispatcher"); } // In case of serial there is a way to create gatewaySender and attach // asyncEventListener. Not sure of the use-case but there are dunit tests diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java index 3474b4a..97436a2 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -228,6 +228,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { pf.dispatcherThreads = getDispatcherThreads(); pf.orderPolicy = getOrderPolicy(); pf.serverLocation = this.getServerLocation(); + pf.enforceThreadsConnectSameReceiver = getEnforceThreadsConnectSameReceiver(); } @Override diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java index 7cfe1f5..8b35ab1 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java @@ -14,9 +14,11 @@ */ package org.apache.geode.internal.cache.wan; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -24,11 +26,64 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.apache.geode.cache.client.internal.Connection; +import org.apache.geode.cache.client.internal.Endpoint; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; +import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; public class GatewaySenderEventRemoteDispatcherJUnitTest { + + @Mock + private AbstractGatewaySender senderMock; + + @Mock + private AbstractGatewaySenderEventProcessor eventProcessorMock; + + @InjectMocks + private GatewaySenderEventRemoteDispatcher eventDispatcher; + + @Mock + private PoolImpl poolMock; + + @Mock + private Connection connectionMock; + + @Mock + private ServerQueueStatus serverQueueStatusMock; + + @Mock + private Endpoint endpointMock; + + @Mock + private DistributedMember memberIdMock; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(eventProcessorMock.getSender()).thenReturn(senderMock); + + when(senderMock.isParallel()).thenReturn(false); + when(senderMock.getLockForConcurrentDispatcher()).thenReturn(new Object()); + when(senderMock.getProxy()).thenReturn(poolMock); + + when(poolMock.isDestroyed()).thenReturn(false); + when(poolMock.acquireConnection()).thenReturn(connectionMock); + + when(connectionMock.getQueueStatus()).thenReturn(serverQueueStatusMock); + } + @Test public void getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() { AbstractGatewaySender sender = mock(AbstractGatewaySender.class); @@ -46,7 +101,7 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest { } @Test - public void shuttingDownAckThreadReaderConnectionShouldshutdownTheAckThreadReader() { + public void shuttingDownAckThreadReaderConnectionShouldShutdownTheAckThreadReader() { AbstractGatewaySender sender = mock(AbstractGatewaySender.class); AbstractGatewaySenderEventProcessor eventProcessor = mock(AbstractGatewaySenderEventProcessor.class); @@ -77,4 +132,161 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest { verify(dispatcher, times(1)).initializeConnection(); verify(dispatcher, times(2)).getConnectionLifeCycleLock(); } + + @Test + public void initializeConnectionWithParallelSenderDoesNotRetryInitializeConnection() { + when(senderMock.isParallel()).thenReturn(true); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + dispatcherSpy.initializeConnection(); + + verify(senderMock, times(0)).getLockForConcurrentDispatcher(); + verify(senderMock, times(1)).setServerLocation(any()); + verify(poolMock, times(1)).acquireConnection(); + verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock); + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameRecieverFalseDoesNotRetryInitializeConnection() { + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("receiverId"); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + dispatcherSpy.initializeConnection(); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver(); + verify(poolMock, times(1)).acquireConnection(); + verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock); + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndNoExpectedReceiverIdSetsReceiverIdAndDoesNotReacquireConnection() { + + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("receiverId"); + when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn(""); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + dispatcherSpy.initializeConnection(); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver(); + verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock); + verify(poolMock, times(1)).acquireConnection(); + verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId"); + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverDoesNotReacquireConnection() { + + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("expectedId"); + when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId"); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + dispatcherSpy.initializeConnection(); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver(); + verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock); + verify(poolMock, times(1)).acquireConnection(); + verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any()); + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverOnSecondTryReacquiresConnectionOnce() { + + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId"); + when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId"); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + dispatcherSpy.initializeConnection(); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver(); + verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock); + verify(poolMock, times(2)).acquireConnection(); + verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any()); + + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndNoServersAvailableThrowsException() { + + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("notExpectedId"); + when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId"); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + + String expectedExceptionMessage = + "There are no active servers. " + + GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage + + " [expectedId] (10 attempts)"; + assertThatThrownBy(() -> { + dispatcherSpy.initializeConnection(); + }).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver(); + verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock); + verify(poolMock, times(10)).acquireConnection(); + verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any()); + } + + @Test + public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndServersAvailableThrowsException() { + + when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true); + + when(connectionMock.getEndpoint()).thenReturn(endpointMock); + when(endpointMock.getMemberId()).thenReturn(memberIdMock); + when(memberIdMock.getUniqueId()).thenReturn("notExpectedId"); + when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId"); + List<ServerLocationAndMemberId> currentServers = new ArrayList<>(); + currentServers.add(new ServerLocationAndMemberId(new ServerLocation("host1", 1), "id1")); + currentServers.add(new ServerLocationAndMemberId(new ServerLocation("host2", 2), "id2")); + when(poolMock.getCurrentServers()).thenReturn(currentServers); + + eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null); + GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher); + + String expectedExceptionMessage = + "No available connection was found, but the following active servers exist: host1:1@id1, host2:2@id2 " + + GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage + + " [expectedId] (10 attempts)"; + assertThatThrownBy(() -> { + dispatcherSpy.initializeConnection(); + }).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage); + + verify(senderMock, times(1)).getLockForConcurrentDispatcher(); + verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver(); + verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock); + verify(poolMock, times(10)).acquireConnection(); + verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any()); + } + }