This is an automated email from the ASF dual-hosted git repository. mcmellawatt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 7cd2f0c GEODE-6065: Continue event processing when hostname lookup fails (#2883) 7cd2f0c is described below commit 7cd2f0c5dfc982148352acbfbb303afaa1358c2a Author: Ryan McMahon <rmcma...@pivotal.io> AuthorDate: Tue Nov 20 18:49:51 2018 -0800 GEODE-6065: Continue event processing when hostname lookup fails (#2883) Co-authored-by: Ryan McMahon <rmcma...@pivotal.io> Co-authored-by: Bill Burcham <bburc...@pivotal.io> --- .../client/internal/pooling/PooledConnection.java | 2 +- .../membership/InternalDistributedMember.java | 22 ++- ...SenderEventRemoteDispatcherIntegrationTest.java | 189 +++++++++++++++++++++ 3 files changed, 208 insertions(+), 5 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java index cb155e6..3cfbb31 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java @@ -37,7 +37,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; * @since GemFire 5.7 * */ -class PooledConnection implements Connection { +public class PooledConnection implements Connection { /* * connection is volatile because we may asynchronously destroy the pooled connection while diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java index 5aaf5c6..cb86b02 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java @@ -30,9 +30,9 @@ import java.util.List; import java.util.Set; import org.apache.geode.DataSerializer; -import org.apache.geode.GemFireConfigException; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.UnsupportedVersionException; +import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DurableClientAttributes; import org.apache.geode.distributed.Role; @@ -94,6 +94,19 @@ public class InternalDistributedMember implements DistributedMember, Externaliza /** product version bit flag */ private static final int VERSION_BIT = 0x8; + @FunctionalInterface + public interface HostnameResolver { + InetAddress getInetAddress(ServerLocation location) throws UnknownHostException; + } + + public static void setHostnameResolver(final HostnameResolver hostnameResolver) { + InternalDistributedMember.hostnameResolver = hostnameResolver; + } + + /** Retrieves an InetAddress given the provided hostname */ + private static HostnameResolver hostnameResolver = + (location) -> InetAddress.getByName(location.getHostName()); + /** * Representing the host name of this member. */ @@ -213,12 +226,13 @@ public class InternalDistributedMember implements DistributedMember, Externaliza public InternalDistributedMember(ServerLocation location) { this.hostName = location.getHostName(); - InetAddress addr = null; + final InetAddress addr; try { - addr = InetAddress.getByName(this.hostName); + addr = hostnameResolver.getInetAddress(location); } catch (UnknownHostException e) { - throw new GemFireConfigException("Unable to resolve server location " + location, e); + throw new ServerConnectivityException("Unable to resolve server location " + location, e); } + netMbr = MemberFactory.newNetMember(addr, location.getPort()); netMbr.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE); versionObj = Version.CURRENT; diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java new file mode 100644 index 0000000..a48c401 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.Properties; + +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.Statistics; +import org.apache.geode.cache.client.internal.Connection; +import org.apache.geode.cache.client.internal.Endpoint; +import org.apache.geode.cache.client.internal.EndpointManager; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.client.internal.pooling.PooledConnection; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PoolFactoryImpl; +import org.apache.geode.internal.cache.PoolManagerImpl; +import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.net.SSLConfigurationFactory; +import org.apache.geode.internal.security.SecurableCommunicationChannel; + +public class GatewaySenderEventRemoteDispatcherIntegrationTest { + + /* + * Sometimes hostname lookup is flaky. We don't want such a failure to cripple our + * event processor. + * + * This test assumes hostname lookup (of IP number) succeeds when establishing the initial + * connection, but fails when constructing the InternalDistributedSystem object in response to a + * remote server crash. + */ + @Test + public void canProcessesEventAfterHostnameLookupFailsInNotifyServerCrashed() throws Exception { + + final PoolImpl pool = getPool(); + + final ServerLocation serverLocation = mock(ServerLocation.class); + + final AbstractGatewaySenderEventProcessor eventProcessor = + getMockedAbstractGatewaySenderEventProcessor(pool, serverLocation); + + final Endpoint endpoint = getMockedEndpoint(serverLocation); + final Connection connection = getMockedConnection(serverLocation, endpoint); + + /* + * In order for listeners to be notified, the endpoint must be referenced by the + * endpointManager so that it can be removed when the RuntimeException() is thrown by the + * connection + */ + final EndpointManager endpointManager = pool.getEndpointManager(); + endpointManager.referenceEndpoint(serverLocation, mock(InternalDistributedMember.class)); + + final GatewaySenderEventRemoteDispatcher dispatcher = + new GatewaySenderEventRemoteDispatcher(eventProcessor, connection); + + /* + * Set a HostnameResolver which simulates a failed + * hostname lookup resulting in an UnknownHostException + */ + InternalDistributedMember.setHostnameResolver(ignored -> { + throw new UnknownHostException("a.b.c"); + }); + + /* + * We have mocked our connection to throw a RuntimeException when readAcknowledgement() is + * called, then in the exception handling for that RuntimeException, the UnknownHostException + * will be thrown when trying to notify listeners of the crash. + */ + dispatcher.readAcknowledgement(); + + /* + * Need to reset the hostname resolver to a real InetAddress resolver as it is static state and + * we do not want it to throw an UnknownHostException in subsequent test runs. + */ + InternalDistributedMember + .setHostnameResolver((location) -> InetAddress.getByName(location.getHostName())); + + /* + * The handling of the UnknownHostException should not result in the event processor being + * stopped, so assert that setIsStopped(true) was never called. + */ + verify(eventProcessor, Mockito.times(0)).setIsStopped(true); + } + + private PoolImpl getPool() { + final DistributionConfig distributionConfig = mock(DistributionConfig.class); + doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig) + .getSecurableCommunicationChannels(); + + SSLConfigurationFactory.setDistributionConfig(distributionConfig); + + final Properties properties = new Properties(); + properties.put(DURABLE_CLIENT_ID, "1"); + + final Statistics statistics = mock(Statistics.class); + + final PoolFactoryImpl.PoolAttributes poolAttributes = + mock(PoolFactoryImpl.PoolAttributes.class); + /* + * These are the minimum pool attributes required + * so that basic validation and setup completes successfully. The values of + * these attributes have no importance to the assertions of the test itself. + */ + doReturn(1).when(poolAttributes).getMaxConnections(); + doReturn((long) 10e8).when(poolAttributes).getPingInterval(); + + final CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + final InternalCache internalCache = mock(InternalCache.class); + doReturn(cancelCriterion).when(internalCache).getCancelCriterion(); + + final InternalDistributedSystem internalDistributedSystem = + mock(InternalDistributedSystem.class); + doReturn(distributionConfig).when(internalDistributedSystem).getConfig(); + doReturn(properties).when(internalDistributedSystem).getProperties(); + doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString()); + + final PoolManagerImpl poolManager = mock(PoolManagerImpl.class); + doReturn(true).when(poolManager).isNormal(); + + final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class); + + return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(), + internalDistributedSystem, internalCache, tMonitoring); + } + + private Connection getMockedConnection(ServerLocation serverLocation, Endpoint endpoint) + throws Exception { + /* + * Mock the connection to throw a RuntimeException() when connection.Execute() is called, + * so that we attempt to notify listeners in the exception handling logic in + * OpExecutorImpl.executeWithPossibleReAuthentication() + */ + final Connection connection = mock(PooledConnection.class); + doReturn(serverLocation).when(connection).getServer(); + doReturn(endpoint).when(connection).getEndpoint(); + doThrow(new RuntimeException()).when(connection).execute(any()); + return connection; + } + + private AbstractGatewaySenderEventProcessor getMockedAbstractGatewaySenderEventProcessor( + PoolImpl pool, ServerLocation serverLocation) { + final AbstractGatewaySender abstractGatewaySender = mock(AbstractGatewaySender.class); + doReturn(serverLocation).when(abstractGatewaySender).getServerLocation(); + doReturn(pool).when(abstractGatewaySender).getProxy(); + + final AbstractGatewaySenderEventProcessor eventProcessor = + mock(AbstractGatewaySenderEventProcessor.class); + doReturn(abstractGatewaySender).when(eventProcessor).getSender(); + return eventProcessor; + } + + private Endpoint getMockedEndpoint(ServerLocation serverLocation) { + final Endpoint endpoint = mock(Endpoint.class); + doReturn(serverLocation).when(endpoint).getLocation(); + return endpoint; + } + +}