This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new c92a91e GEODE-8656: Fix ping only sent to one gateway receiver when several r… (#5670) c92a91e is described below commit c92a91eae587800c9d24c973e536cdfe8da6f38a Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Tue Nov 24 07:37:20 2020 +0100 GEODE-8656: Fix ping only sent to one gateway receiver when several r… (#5670) * GEODE-8656: Fix ping only sent to one gateway receiver when several receivers share the same hostname-for-senders When serveral receivers have the same hostname-for-senders only one of them received pings. The reason was that the structure holding the endpoints was a Map whose key was the server location. As all the endpoints for the remote gateway senders shared the same server location, only one received the ping. The structure holding endpoints in the endpoint manager has been updated to use as key a ServerLocationAndMemberId class instead of ServerLocation. --- ...iversWithSamePortAndHostnameForSendersTest.java | 258 +++++++++++++++++++++ .../org/apache/geode/cache/wan/docker-compose.yml | 73 ++++++ .../org/apache/geode/cache/wan/haproxy.cfg | 39 ++++ .../org/apache/geode/cache/wan/scripts/forever | 20 ++ .../cache/wan/scripts/geode-starter-create.gfsh | 21 ++ .../cache/wan/scripts/geode-starter-locator.gfsh | 19 ++ .../cache/wan/scripts/geode-starter-server1.gfsh | 19 ++ .../cache/wan/scripts/geode-starter-server2.gfsh | 19 ++ .../internal/AutoConnectionSourceDUnitTest.java | 8 +- .../tier/sockets/UpdatePropagationDUnitTest.java | 7 +- .../cache/client/internal/ConnectionImpl.java | 7 +- .../geode/cache/client/internal/Endpoint.java | 28 ++- .../cache/client/internal/EndpointManager.java | 5 +- .../cache/client/internal/EndpointManagerImpl.java | 28 ++- .../cache/client/internal/LiveServerPinger.java | 1 + .../cache/client/internal/OpExecutorImpl.java | 5 +- .../apache/geode/cache/client/internal/PingOp.java | 4 + .../geode/cache/client/internal/PoolImpl.java | 15 +- .../geode/internal/cache/GemFireCacheImpl.java | 10 +- .../cache/tier/InternalClientMembership.java | 5 +- .../client/internal/OpExecutorImplJUnitTest.java | 5 +- .../internal/ServerLocationAndMemberIdTest.java | 47 ++++ .../tier/sockets/DurableClientCQDUnitTest.java | 9 +- .../geode/internal/cache/wan/WANTestBase.java | 18 -- ...SenderEventRemoteDispatcherIntegrationTest.java | 14 +- .../wan/GatewaySenderEventRemoteDispatcher.java | 5 +- 26 files changed, 623 insertions(+), 66 deletions(-) diff --git a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java new file mode 100644 index 0000000..6be6066 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.wan; + +import static com.palantir.docker.compose.execution.DockerComposeExecArgument.arguments; +import static com.palantir.docker.compose.execution.DockerComposeExecOption.options; +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; + +import com.palantir.docker.compose.DockerComposeRule; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.client.sni.NotOnWindowsDockerRule; +import org.apache.geode.distributed.Locator; +import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.PoolStats; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.categories.WanTest; + + +/** + * These tests use two Geode sites: + * + * - One site (the remote one) consisting of a 2-server, 1-locator Geode cluster. + * The servers host a partition region (region-wan) and have gateway senders to receive events + * from the other site with the same value for hostname-for-senders and listening on the + * same port (2324). + * The servers and locator run each inside a Docker container and are not route-able + * from the host (where this JUnit test is running). + * Another Docker container is running the HAProxy image and it's set up as a TCP load balancer. + * The other site connects to the locator and to the gateway receivers via the + * TCP load balancer that forwards traffic directed to the 20334 port to the locator and + * traffic directed to the 2324 port to the receivers in a round robin fashion. + * + * - Another site consisting of a 1-server, 1-locator Geode cluster. + * The server hosts a partition region (region-wan) and has a parallel gateway receiver + * to send events to the remote site. + * + * The aim of the tests is verify that when several gateway receivers in a remote site + * share the same port and hostname-for-senders, the pings sent from the gateway senders + * reach the right gateway receiver and not just any of the receivers. Failure to do this + * may result in the closing of connections by a gateway receiver for not having + * received the ping in time. + */ +@Category({WanTest.class}) +public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { + + private static final int NUM_SERVERS = 2; + + private static Cache cache; + + private static final URL DOCKER_COMPOSE_PATH = + SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.class + .getResource("docker-compose.yml"); + + // Docker compose does not work on windows in CI. Ignore this test on windows + // Using a RuleChain to make sure we ignore the test before the rule comes into play + @ClassRule + public static NotOnWindowsDockerRule docker = + new NotOnWindowsDockerRule(() -> DockerComposeRule.builder() + .file(DOCKER_COMPOSE_PATH.getPath()).build()); + + @Rule + public DistributedRule distributedRule = + DistributedRule.builder().withVMCount(NUM_SERVERS + 1).build(); + + @BeforeClass + public static void beforeClass() throws Exception { + // Start locator + docker.get().exec(options("-T"), "locator", + arguments("gfsh", "run", "--file=/geode/scripts/geode-starter-locator.gfsh")); + // Start server1 + docker.get().exec(options("-T"), "server1", + arguments("gfsh", "run", "--file=/geode/scripts/geode-starter-server1.gfsh")); + // Start server2 + docker.get().exec(options("-T"), "server2", + arguments("gfsh", "run", "--file=/geode/scripts/geode-starter-server2.gfsh")); + // Create partition region and gateway receiver + docker.get().exec(options("-T"), "locator", + arguments("gfsh", "run", "--file=/geode/scripts/geode-starter-create.gfsh")); + } + + public SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest() { + super(); + } + + @Test + public void testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceivers() + throws InterruptedException { + String senderId = "ln"; + String regionName = "region-wan"; + final int remoteLocPort = 20334; + + int locPort = createLocator(VM.getVM(0), 1, remoteLocPort); + + VM vm1 = VM.getVM(1); + createCache(vm1, locPort); + + // We must use more than one dispatcher thread. With just one dispatcher thread, only one + // connection will be created by the sender towards one of the receivers and it will be + // monitored by the one ping thread for that remote receiver. + // With more than one thread, several connections will be opened and there should be one ping + // thread per remote receiver. + createGatewaySender(vm1, senderId, 2, true, 5, + 5, GatewaySender.DEFAULT_ORDER_POLICY); + + createPartitionedRegion(vm1, regionName, senderId, 0, 10); + + int NUM_PUTS = 10; + + putKeyValues(vm1, NUM_PUTS, regionName); + + // Wait longer than the value set in the receivers for + // maximum-time-between-pings: 10000 (see geode-starter-create.gfsh) + // to verify that connections are not closed + // by the receivers because each has received the pings timely. + int maxTimeBetweenPingsInReceiver = 15000; + Thread.sleep(maxTimeBetweenPingsInReceiver); + + int senderPoolDisconnects = getSenderPoolDisconnects(vm1, senderId); + assertEquals(0, senderPoolDisconnects); + } + + private int createLocator(VM memberVM, int dsId, int remoteLocPort) { + return memberVM.invoke("create locator", () -> { + Properties props = new Properties(); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + return Locator.startLocatorAndDS(0, new File(""), props) + .getPort(); + }); + } + + private static void createCache(VM vm, Integer locPort) { + vm.invoke(() -> { + Properties props = new Properties(); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + CacheFactory cacheFactory = new CacheFactory(props); + cache = cacheFactory.create(); + }); + } + + public static void createGatewaySender(VM vm, String dsName, int remoteDsId, + boolean isParallel, Integer batchSize, + int numDispatchers, + GatewaySender.OrderPolicy orderPolicy) { + vm.invoke(() -> { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + InternalGatewaySenderFactory gateway = + (InternalGatewaySenderFactory) cache.createGatewaySenderFactory(); + gateway.setParallel(isParallel); + gateway.setBatchSize(batchSize); + gateway.setDispatcherThreads(numDispatchers); + gateway.setOrderPolicy(orderPolicy); + gateway.create(dsName, remoteDsId); + + } finally { + exln.remove(); + } + }); + } + + private static void createPartitionedRegion(VM vm, String regionName, String senderIds, + Integer redundantCopies, Integer totalNumBuckets) { + vm.invoke(() -> { + IgnoredException exp = + IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + IgnoredException exp1 = + IgnoredException.addIgnoredException(PartitionOfflineException.class.getName()); + try { + RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setRecoveryDelay(0); + fact.setPartitionAttributes(pfact.create()); + Region r = fact.create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + } + }); + } + + private static int getSenderPoolDisconnects(VM vm, String senderId) { + return vm.invoke(() -> { + AbstractGatewaySender sender = + (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId); + assertNotNull(sender); + PoolStats poolStats = sender.getProxy().getStats(); + return poolStats.getDisConnects(); + }); + } + + private static void putKeyValues(VM vm, int numPuts, String region) { + final HashMap<Integer, Integer> keyValues = new HashMap<>(); + for (int i = 0; i < numPuts; i++) { + keyValues.put(i, i); + } + vm.invoke(() -> putGivenKeyValue(region, keyValues)); + } + + private static void putGivenKeyValue(String regionName, Map<Integer, Integer> keyValues) { + Region<Integer, Integer> r = cache.getRegion(SEPARATOR + regionName); + assertNotNull(r); + for (Object key : keyValues.keySet()) { + r.put((Integer) key, keyValues.get(key)); + } + } + +} diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/docker-compose.yml b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/docker-compose.yml new file mode 100644 index 0000000..817e770 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/docker-compose.yml @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: '3' +services: + locator: + container_name: 'locator' + image: 'geode:develop' + hostname: locator + expose: + - '20334' + - '1099' + entrypoint: 'sh' + command: '-c /geode/scripts/forever' + networks: + geode-wan-test: + volumes: + - ./geode-config:/geode/config:ro + - ./scripts:/geode/scripts + server1: + container_name: 'server1' + image: 'geode:develop' + hostname: server1 + expose: + - '40404' + - '2324' + entrypoint: 'sh' + command: '-c /geode/scripts/forever' + networks: + geode-wan-test: + volumes: + - ./geode-config:/geode/config:ro + - ./scripts:/geode/scripts + server2: + container_name: 'server2' + image: 'geode:develop' + hostname: server2 + expose: + - '40404' + - '2324' + entrypoint: 'sh' + command: '-c /geode/scripts/forever' + networks: + geode-wan-test: + volumes: + - ./geode-config:/geode/config:ro + - ./scripts:/geode/scripts + haproxy: + container_name: 'haproxy' + image: 'haproxy:2.1' + ports: + - "20334:20334" + - "2324:2324" + networks: + geode-wan-test: + volumes: + - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro +networks: + geode-wan-test: + diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/haproxy.cfg b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/haproxy.cfg new file mode 100644 index 0000000..2af867d --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/haproxy.cfg @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +frontend locator + bind *:20334 + mode tcp + default_backend locators + log stdout format raw local0 debug + +frontend receiver + bind *:2324 + mode tcp + default_backend receivers + log stdout format raw local0 debug + +backend locators + mode tcp + server locator locator:20334 + +backend receivers + mode tcp + balance roundrobin + server server1 server1:2324 + server server2 server2:2324 + diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/forever b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/forever new file mode 100755 index 0000000..4fecfa8 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/forever @@ -0,0 +1,20 @@ +#!/usr/bin/env sh + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +while true; do sleep 600; done diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-create.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-create.gfsh new file mode 100644 index 0000000..c7ee35e --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-create.gfsh @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +connect --locator=locator[20334] +create region --name=region-wan --type=PARTITION +create gateway-receiver --hostname-for-senders=localhost --start-port=2324 --end-port=2324 --maximum-time-between-pings=10000 + diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-locator.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-locator.gfsh new file mode 100644 index 0000000..b339617 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-locator.gfsh @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +start locator --name=locator --port=20334 --connect=false --redirect-output --enable-cluster-configuration=true --hostname-for-clients=localhost --J=-Dgemfire.distributed-system-id=2 + diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server1.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server1.gfsh new file mode 100644 index 0000000..b69eb38 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server1.gfsh @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +start server --name=server1 --locators=locator[20334] + diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server2.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server2.gfsh new file mode 100644 index 0000000..85019c5 --- /dev/null +++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-starter-server2.gfsh @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +start server --name=server2 --locators=locator[20334] + diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceDUnitTest.java index d90bf65..fe56f4e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/AutoConnectionSourceDUnitTest.java @@ -41,7 +41,7 @@ import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.client.Pool; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.AvailablePort; import org.apache.geode.management.membership.ClientMembership; import org.apache.geode.management.membership.ClientMembershipEvent; @@ -496,12 +496,12 @@ public class AutoConnectionSourceDUnitTest extends LocatorTestBase { expectedEndpointPorts.add(expectedPort); } await().untilAsserted(() -> { - List<ServerLocation> endpoints; + List<ServerLocationAndMemberId> endpoints; HashSet<Integer> actualEndpointPorts; endpoints = pool.getCurrentServers(); actualEndpointPorts = new HashSet<>(); - for (ServerLocation sl : endpoints) { - actualEndpointPorts.add(sl.getPort()); + for (ServerLocationAndMemberId slAndMemberId : endpoints) { + actualEndpointPorts.add(slAndMemberId.getServerLocation().getPort()); } assertEquals(expectedEndpointPorts, actualEndpointPorts); }); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java index 84ac6a7..b4c717e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java @@ -48,7 +48,7 @@ import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.cache30.CacheSerializableRunnable; -import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.AvailablePort; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; @@ -176,8 +176,9 @@ public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase { */ private boolean hasEndPointWithPort(final PoolImpl pool, final int port) { EndpointManager endpointManager = pool.getEndpointManager(); - final Set<ServerLocation> servers = endpointManager.getEndpointMap().keySet(); - return servers.stream().anyMatch(location -> location.getPort() == port); + final Set<ServerLocationAndMemberId> slAndMemberIds = endpointManager.getEndpointMap().keySet(); + return slAndMemberIds.stream() + .anyMatch(slAndMemberId -> slAndMemberId.getServerLocation().getPort() == port); } private void acquireConnectionsAndPutonK1andK2(String host) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java index bb98f2a..aa0d475 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java @@ -110,6 +110,7 @@ public class ConnectionImpl implements Connection { commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket); } theSocket.setSoTimeout(readTimeout); + endpoint = endpointManager.referenceEndpoint(location, status.getMemberId()); connectFinished = true; endpoint.getStats().incConnections(1); @@ -282,7 +283,11 @@ public class ConnectionImpl implements Connection { synchronized (this) { result = op.attempt(this); } - endpoint.updateLastExecute(); + // Do not call endpoint.updateLastExecute here because it should have been + // called on the final destination endpoint inside LiverServerPinger + if (!(op instanceof PingOp.PingOpImpl)) { + endpoint.updateLastExecute(); + } return result; } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Endpoint.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Endpoint.java index a26076d..bb49837 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Endpoint.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Endpoint.java @@ -97,12 +97,38 @@ public class Endpoint { @Override public String toString() { - return location.toString(); + return location.toString() + "," + getMemberId(); } public DistributedMember getMemberId() { return memberId; } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof Endpoint)) + return false; + final Endpoint other = (Endpoint) obj; + + if (!this.location.equals(other.getLocation())) { + return false; + } + + return this.memberId.equals(other.getMemberId()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = + prime * result + location.hashCode() + memberId.hashCode(); + return result; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManager.java index 47740cc..be5b049 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManager.java @@ -18,6 +18,7 @@ import java.util.Map; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; /** * The endpoint manager keeps track of which servers we are connected to. Other parts of the client @@ -46,7 +47,7 @@ public interface EndpointManager { * * @return a map for ServerLocation->Endpoint */ - Map<ServerLocation, Endpoint> getEndpointMap(); + Map<ServerLocationAndMemberId, Endpoint> getEndpointMap(); void close(); @@ -65,7 +66,7 @@ public interface EndpointManager { * * @return a map of ServerLocation-> ConnectionStats */ - Map<ServerLocation, ConnectionStats> getAllStats(); + Map<ServerLocationAndMemberId, ConnectionStats> getAllStats(); /** * Test hook that returns the number of servers we currently have connections to. diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManagerImpl.java index d41ca20..e9d6fdc 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/EndpointManagerImpl.java @@ -30,6 +30,7 @@ import org.apache.geode.cache.client.PoolManager; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.PoolStats; import org.apache.geode.internal.cache.tier.InternalClientMembership; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -37,8 +38,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService; public class EndpointManagerImpl implements EndpointManager { private static final Logger logger = LogService.getLogger(); - private volatile Map<ServerLocation, Endpoint> endpointMap = Collections.emptyMap(); - private final Map<ServerLocation, ConnectionStats> statMap = new HashMap<>(); + private volatile Map<ServerLocationAndMemberId, Endpoint> endpointMap = Collections.emptyMap(); + private final Map<ServerLocationAndMemberId, ConnectionStats> statMap = new HashMap<>(); private final DistributedSystem ds; private final String poolName; private final EndpointListenerBroadcaster listener = new EndpointListenerBroadcaster(); @@ -56,17 +57,19 @@ public class EndpointManagerImpl implements EndpointManager { @Override public Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId) { - Endpoint endpoint = endpointMap.get(server); + ServerLocationAndMemberId serverLocationAndMemberId = + new ServerLocationAndMemberId(server, memberId.getUniqueId()); + Endpoint endpoint = endpointMap.get(serverLocationAndMemberId); boolean addedEndpoint = false; if (endpoint == null || endpoint.isClosed()) { synchronized (this) { - endpoint = endpointMap.get(server); + endpoint = endpointMap.get(serverLocationAndMemberId); if (endpoint == null || endpoint.isClosed()) { - ConnectionStats stats = getStats(server); - Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<>(endpointMap); + ConnectionStats stats = getStats(serverLocationAndMemberId); + Map<ServerLocationAndMemberId, Endpoint> endpointMapTemp = new HashMap<>(endpointMap); endpoint = new Endpoint(this, ds, server, stats, memberId); listener.clearPdxRegistry(endpoint); - endpointMapTemp.put(server, endpoint); + endpointMapTemp.put(serverLocationAndMemberId, endpoint); endpointMap = Collections.unmodifiableMap(endpointMapTemp); addedEndpoint = true; poolStats.setServerCount(endpointMap.size()); @@ -97,8 +100,9 @@ public class EndpointManagerImpl implements EndpointManager { endpoint.close(); boolean removedEndpoint = false; synchronized (this) { - Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<>(endpointMap); - endpoint = endpointMapTemp.remove(endpoint.getLocation()); + Map<ServerLocationAndMemberId, Endpoint> endpointMapTemp = new HashMap<>(endpointMap); + endpoint = endpointMapTemp.remove(new ServerLocationAndMemberId(endpoint.getLocation(), + endpoint.getMemberId().getUniqueId())); if (endpoint != null) { endpointMap = Collections.unmodifiableMap(endpointMapTemp); removedEndpoint = true; @@ -152,7 +156,7 @@ public class EndpointManagerImpl implements EndpointManager { @Override - public Map<ServerLocation, Endpoint> getEndpointMap() { + public Map<ServerLocationAndMemberId, Endpoint> getEndpointMap() { return endpointMap; } @@ -177,7 +181,7 @@ public class EndpointManagerImpl implements EndpointManager { this.listener.removeListener(listener); } - private synchronized ConnectionStats getStats(ServerLocation location) { + private synchronized ConnectionStats getStats(ServerLocationAndMemberId location) { ConnectionStats stats = statMap.get(location); if (stats == null) { PoolImpl pool = (PoolImpl) PoolManager.find(poolName); @@ -198,7 +202,7 @@ public class EndpointManagerImpl implements EndpointManager { } @Override - public synchronized Map<ServerLocation, ConnectionStats> getAllStats() { + public synchronized Map<ServerLocationAndMemberId, ConnectionStats> getAllStats() { return new HashMap<>(statMap); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java index 623d976..692e3af 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java @@ -87,6 +87,7 @@ public class LiveServerPinger extends EndpointListenerAdapter { public void run2() { if (endpoint.timeToPing(pingIntervalNanos)) { try { + endpoint.updateLastExecute(); PingOp.execute(pool, endpoint.getLocation(), endpoint.getMemberId()); } catch (Exception e) { if (logger.isDebugEnabled()) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java index c73bee0..e061e56 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java @@ -49,6 +49,7 @@ import org.apache.geode.cache.client.internal.pooling.ConnectionManager; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionInvocationTargetException; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.PoolManagerImpl; import org.apache.geode.internal.cache.PutAllPartialResultException; import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException; @@ -314,7 +315,9 @@ public class OpExecutorImpl implements ExecutablePool { if (queueManager != null) { // see if our QueueManager has a connection to this server that we can send // the ping on. - Endpoint ep = endpointManager.getEndpointMap().get(p_server); + ServerLocationAndMemberId slAndMId = new ServerLocationAndMemberId(p_server, + ((PingOp.PingOpImpl) op).getServerID().getUniqueId()); + Endpoint ep = endpointManager.getEndpointMap().get(slAndMId); if (ep != null) { QueueConnections qcs = queueManager.getAllConnectionsNoWait(); conn = qcs.getConnection(ep); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java index 4140fe8..372fefc 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java @@ -99,5 +99,9 @@ public class PingOp { protected void endAttempt(ConnectionStats stats, long start) { stats.endPing(start, hasTimedOut(), hasFailed()); } + + public DistributedMember getServerID() { + return serverID; + } } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java index 2fe2365..51d5192 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java @@ -56,6 +56,7 @@ import org.apache.geode.distributed.PoolCancelledException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.distributed.internal.tcpserver.HostAndPort; import org.apache.geode.internal.admin.ClientStatsManager; import org.apache.geode.internal.cache.EventID; @@ -889,7 +890,7 @@ public class PoolImpl implements InternalPool { } @Override - public Map<ServerLocation, Endpoint> getEndpointMap() { + public Map<ServerLocationAndMemberId, Endpoint> getEndpointMap() { return endpointManager.getEndpointMap(); } @@ -1169,8 +1170,8 @@ public class PoolImpl implements InternalPool { /** * Returns a list of ServerLocation instances; one for each server we are currently connected to. */ - public List<ServerLocation> getCurrentServers() { - Map<ServerLocation, Endpoint> endpointMap = endpointManager.getEndpointMap(); + public List<ServerLocationAndMemberId> getCurrentServers() { + Map<ServerLocationAndMemberId, Endpoint> endpointMap = endpointManager.getEndpointMap(); return new ArrayList<>(endpointMap.keySet()); } @@ -1179,10 +1180,10 @@ public class PoolImpl implements InternalPool { * connected to. */ public List<String> getCurrentServerNames() { - List<ServerLocation> servers = getCurrentServers(); + List<ServerLocationAndMemberId> servers = getCurrentServers(); ArrayList<String> result = new ArrayList<>(servers.size()); - for (ServerLocation sl : servers) { - String name = sl.getHostName() + sl.getPort(); + for (ServerLocationAndMemberId sl : servers) { + String name = sl.getServerLocation().getHostName() + sl.getServerLocation().getPort(); result.add(name); } return result; @@ -1218,7 +1219,7 @@ public class PoolImpl implements InternalPool { // do nothing. } - Map<ServerLocation, Endpoint> endpoints = endpointManager.getEndpointMap(); + Map<ServerLocationAndMemberId, Endpoint> endpoints = endpointManager.getEndpointMap(); for (Endpoint endpoint : endpoints.values()) { logger.debug("PoolImpl Simulating crash of endpoint {}", endpoint); endpointManager.serverCrashed(endpoint); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index e644a06..4605c57 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -202,7 +202,7 @@ import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.ResourceEvent; import org.apache.geode.distributed.internal.ResourceEventsListener; -import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.i18n.LogWriterI18n; @@ -2773,12 +2773,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has Set<InetSocketAddress> result = null; for (Pool pool : pools.values()) { PoolImpl poolImpl = (PoolImpl) pool; - for (ServerLocation serverLocation : poolImpl.getCurrentServers()) { + for (ServerLocationAndMemberId serverLocationAndMemberId : poolImpl.getCurrentServers()) { if (result == null) { result = new HashSet<>(); } - result.add(InetSocketAddress.createUnresolved(serverLocation.getHostName(), - serverLocation.getPort())); + result.add( + InetSocketAddress.createUnresolved( + serverLocationAndMemberId.getServerLocation().getHostName(), + serverLocationAndMemberId.getServerLocation().getPort())); } } if (result == null) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java index 920c86c..50d99c9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java @@ -39,6 +39,7 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalCacheServer; @@ -316,11 +317,11 @@ public class InternalClientMembership { Iterator pools = poolMap.values().iterator(); while (pools.hasNext()) { PoolImpl pi = (PoolImpl) pools.next(); - Map/* <ServerLocation,Endpoint> */ eps = pi.getEndpointMap(); + Map/* <ServerLocationAndMemberId,Endpoint> */ eps = pi.getEndpointMap(); Iterator it = eps.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); - ServerLocation loc = (ServerLocation) entry.getKey(); + ServerLocation loc = ((ServerLocationAndMemberId) entry.getKey()).getServerLocation(); org.apache.geode.cache.client.internal.Endpoint ep = (org.apache.geode.cache.client.internal.Endpoint) entry.getValue(); String server = loc.getHostName() + "[" + loc.getPort() + "]"; diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java index 8f10d4d..e5a5d34 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java @@ -48,6 +48,7 @@ import org.apache.geode.cache.client.internal.pooling.ConnectionManager; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; import org.apache.geode.internal.logging.InternalLogWriter; @@ -570,7 +571,7 @@ public class OpExecutorImplJUnitTest { } @Override - public Map<ServerLocation, Endpoint> getEndpointMap() { + public Map<ServerLocationAndMemberId, Endpoint> getEndpointMap() { return null; } @@ -590,7 +591,7 @@ public class OpExecutorImplJUnitTest { } @Override - public Map<ServerLocation, ConnectionStats> getAllStats() { + public Map<ServerLocationAndMemberId, ConnectionStats> getAllStats() { return null; } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationAndMemberIdTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationAndMemberIdTest.java index 955874d..2e2d3ce 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationAndMemberIdTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationAndMemberIdTest.java @@ -17,6 +17,8 @@ package org.apache.geode.distributed.internal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import java.util.HashMap; + import org.junit.Test; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -66,4 +68,49 @@ public class ServerLocationAndMemberIdTest { assertNotEquals(serverLocationAndMemberId1, serverLocationAndMemberId2); } + + @Test + public void givenTwoObjectsWithSameHostAndPortAndSameDistributedMemberId_whenCompared_thenAreEqual() { + + final ServerLocation serverLocation1 = new ServerLocation("localhost", 1); + InternalDistributedMember idmWithView1 = new InternalDistributedMember("localhost", 1); + idmWithView1.setVmViewId(1); + InternalDistributedMember idmWithView2 = new InternalDistributedMember("localhost", 1); + idmWithView2.setVmViewId(1); + + ServerLocationAndMemberId serverLocationAndMemberId1 = + new ServerLocationAndMemberId(serverLocation1, idmWithView1.getUniqueId()); + ServerLocationAndMemberId serverLocationAndMemberId2 = + new ServerLocationAndMemberId(serverLocation1, idmWithView2.getUniqueId()); + + assertEquals(serverLocationAndMemberId1, serverLocationAndMemberId2); + } + + @Test + public void givenTwoObjectsWithSameHostAndPortAndSameDistributedMemberId_CannotBeAddedTwiceToHashMap() { + + final ServerLocation serverLocation1 = new ServerLocation("localhost", 1); + InternalDistributedMember idmWithView1 = new InternalDistributedMember("localhost", 1); + idmWithView1.setVmViewId(1); + InternalDistributedMember idmWithView2 = new InternalDistributedMember("localhost", 1); + idmWithView2.setVmViewId(1); + + ServerLocationAndMemberId serverLocationAndMemberId1 = + new ServerLocationAndMemberId(serverLocation1, idmWithView1.getUniqueId()); + ServerLocationAndMemberId serverLocationAndMemberId2 = + new ServerLocationAndMemberId(serverLocation1, idmWithView2.getUniqueId()); + + HashMap map = new HashMap<ServerLocationAndMemberId, Integer>(); + map.put(serverLocationAndMemberId1, new Integer(1)); + Integer i = (Integer) map.get(serverLocationAndMemberId2); + assertNotEquals(null, i); + assertEquals(new Integer(1), i); + + map.put(serverLocationAndMemberId2, new Integer(2)); + i = (Integer) map.get(serverLocationAndMemberId1); + assertNotEquals(null, i); + assertEquals(new Integer(2), i); + + assertEquals(serverLocationAndMemberId1, serverLocationAndMemberId2); + } } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java index 9a35897..4b24be9 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCQDUnitTest.java @@ -48,6 +48,7 @@ import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.ClientServerObserverAdapter; import org.apache.geode.internal.cache.ClientServerObserverHolder; import org.apache.geode.test.dunit.AsyncInvocation; @@ -837,9 +838,11 @@ public class DurableClientCQDUnitTest extends DurableClientTestBase { ServerLocation primaryServerLocation = pool.getPrimary(); // Verify the primary server was used and no other server was used - Map<ServerLocation, ConnectionStats> statistics = pool.getEndpointManager().getAllStats(); - for (Map.Entry<ServerLocation, ConnectionStats> entry : statistics.entrySet()) { - int expectedGetDurableCqInvocations = entry.getKey().equals(primaryServerLocation) ? 1 : 0; + Map<ServerLocationAndMemberId, ConnectionStats> statistics = + pool.getEndpointManager().getAllStats(); + for (Map.Entry<ServerLocationAndMemberId, ConnectionStats> entry : statistics.entrySet()) { + int expectedGetDurableCqInvocations = + entry.getKey().getServerLocation().equals(primaryServerLocation) ? 1 : 0; assertEquals(expectedGetDurableCqInvocations, entry.getValue().getGetDurableCqs()); } } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 620b50c..b75e068 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1265,24 +1265,6 @@ public class WANTestBase extends DistributedTestCase { return poolStats.getDisConnects(); } - protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) { - int size = 0; - if (prQ != null) { - Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets(); - List<Integer> thisProcessorBuckets = new ArrayList<Integer>(); - - for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) { - BucketRegion bucket = bucketEntry.getValue(); - int bId = bucket.getId(); - if ((isPrimary && bucket.getBucketAdvisor().isPrimary()) - || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) { - size += bucket.size(); - } - } - } - return size; - } - public static List<Integer> getSenderStatsForDroppedEvents(String senderId) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); GatewaySenderStats statistics = sender.getStatistics(); diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java index 1b17e6d..7938286 100644 --- a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java +++ b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import java.net.InetAddress; import java.util.LinkedList; import java.util.Properties; @@ -35,6 +36,7 @@ import org.apache.geode.cache.client.internal.Endpoint; import org.apache.geode.cache.client.internal.EndpointManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.client.internal.pooling.PooledConnection; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; @@ -61,12 +63,15 @@ public class GatewaySenderEventRemoteDispatcherIntegrationTest { final PoolImpl pool = getPool(); - final ServerLocation serverLocation = mock(ServerLocation.class); + final ServerLocation serverLocation = new ServerLocation("127.0.0.1", 2); final AbstractGatewaySenderEventProcessor eventProcessor = getMockedAbstractGatewaySenderEventProcessor(pool, serverLocation); - final Endpoint endpoint = getMockedEndpoint(serverLocation); + InternalDistributedMember member = + new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1); + + final Endpoint endpoint = getMockedEndpoint(serverLocation, member); final Connection connection = getMockedConnection(serverLocation, endpoint); /* @@ -75,7 +80,7 @@ public class GatewaySenderEventRemoteDispatcherIntegrationTest { * connection */ final EndpointManager endpointManager = pool.getEndpointManager(); - endpointManager.referenceEndpoint(serverLocation, mock(InternalDistributedMember.class)); + endpointManager.referenceEndpoint(serverLocation, member); final GatewaySenderEventRemoteDispatcher dispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessor, connection); @@ -162,9 +167,10 @@ public class GatewaySenderEventRemoteDispatcherIntegrationTest { return eventProcessor; } - private Endpoint getMockedEndpoint(ServerLocation serverLocation) { + private Endpoint getMockedEndpoint(ServerLocation serverLocation, DistributedMember member) { final Endpoint endpoint = mock(Endpoint.class); doReturn(serverLocation).when(endpoint).getLocation(); + doReturn(member).when(endpoint).getMemberId(); return endpoint; } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 4d4453a..1199fb9 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -34,6 +34,7 @@ import org.apache.geode.cache.client.internal.SenderProxy; import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.ServerLocationAndMemberId; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.UpdateAttributesProcessor; import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException; @@ -467,13 +468,13 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis if (e.getCause() instanceof GemFireSecurityException) { gse = new GatewaySenderException(e.getCause()); } else { - List<ServerLocation> servers = this.sender.getProxy().getCurrentServers(); + List<ServerLocationAndMemberId> servers = this.sender.getProxy().getCurrentServers(); String ioMsg; if (servers.size() == 0) { ioMsg = "There are no active servers."; } else { final StringBuilder buffer = new StringBuilder(); - for (ServerLocation server : servers) { + for (ServerLocationAndMemberId server : servers) { String endpointName = String.valueOf(server); if (buffer.length() > 0) { buffer.append(", ");