This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7258 in repository https://gitbox.apache.org/repos/asf/geode.git
commit e9ebda3b4fb2b64ed708019f987e6ffc8c8202d2 Author: zhouxh <gz...@pivotal.io> AuthorDate: Fri Oct 18 10:27:46 2019 -0700 GEODE-7258: The function retry logic is modified to handle exception thrown, while trying to connect to a server thats shutdown/closed. Co-authored-by: Anil <aging...@pivotal.io> Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> --- .../cache/client/internal/ExecuteFunctionOp.java | 10 ++--- .../client/internal/ExecuteRegionFunctionOp.java | 15 ++----- .../internal/ExecuteRegionFunctionSingleHopOp.java | 3 +- .../geode/cache/client/internal/PoolImpl.java | 24 +++++++++++ .../cache/client/internal/ServerRegionProxy.java | 8 ++-- .../client/internal/SingleHopClientExecutor.java | 17 ++------ .../internal/pooling/ConnectionManagerImpl.java | 4 +- .../internal/ExecuteFunctionOpRetryTest.java | 5 +-- .../internal/ExecuteFunctionTestSupport.java | 8 ++-- .../internal/ExecuteRegionFunctionOpRetryTest.java | 5 ++- .../ExecuteRegionFunctionSingleHopOpRetryTest.java | 9 ++--- .../LuceneSearchWithRollingUpgradeDUnit.java | 46 +++++++++++++++------- ...tResultsAfterClientAndServersAreRolledOver.java | 5 ++- ...ntAndServersAreRolledOverAllBucketsCreated.java | 5 +-- 14 files changed, 93 insertions(+), 71 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java index 09f926c..91baa4d 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java @@ -24,7 +24,6 @@ import java.util.function.Supplier; import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.execute.Function; @@ -83,8 +82,8 @@ public class ExecuteFunctionOp { } else { boolean reexecute = false; + int maxRetryAttempts = -1; - int maxRetryAttempts = pool.getRetryAttempts(); if (!isHA) { maxRetryAttempts = 0; } @@ -107,11 +106,8 @@ public class ExecuteFunctionOp { } catch (ServerConnectivityException se) { - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = pool.calculateRetryAttempts(se); } if ((maxRetryAttempts--) < 1) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java index eba38a6..776ea2e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.client.NoAvailableServersException; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl; @@ -67,17 +66,14 @@ public class ExecuteRegionFunctionOp { /** * Does a execute Function on a server using connections from the given pool to communicate with * the server. - * - * @param pool the pool to use to communicate with the server. - * @param resultCollector is used to collect the results from the Server - * @param maxRetryAttempts Maximum number of retry attempts */ static void execute(ExecutablePool pool, ResultCollector resultCollector, - int maxRetryAttempts, boolean isHA, + int retryAttempts, boolean isHA, ExecuteRegionFunctionOpImpl op, boolean isReexecute, Set<String> failedNodes) { + int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : -1; if (!isHA) { maxRetryAttempts = 0; } @@ -107,11 +103,8 @@ public class ExecuteRegionFunctionOp { throw failedException; } catch (ServerConnectivityException se) { - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se); } if ((maxRetryAttempts--) < 1) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java index eb162829..8fa8510 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java @@ -66,7 +66,6 @@ public class ExecuteRegionFunctionSingleHopOp { ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, Map<ServerLocation, ? extends HashSet> serverToFilterMap, - int mRetryAttempts, boolean isHA, final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction, final Supplier<AbstractOp> executeRegionFunctionOpSupplier) { @@ -87,7 +86,7 @@ public class ExecuteRegionFunctionSingleHopOp { final int retryAttempts = SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA, - resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool)); + resultCollector, failedNodes, ((PoolImpl) pool)); if (isDebugEnabled) { logger.debug( diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java index 3dbd99f..bb12253 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java @@ -41,6 +41,7 @@ import org.apache.geode.cache.NoSubscriptionServersAvailableException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionService; import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.SubscriptionNotEnabledException; import org.apache.geode.cache.client.internal.pooling.ConnectionManager; @@ -1581,4 +1582,27 @@ public class PoolImpl implements InternalPool { public int getSubscriptionTimeoutMultiplier() { return subscriptionTimeoutMultiplier; } + + public int calculateRetryAttempts(Throwable cause) { + + int maxRetryAttempts = getRetryAttempts(); + + if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { + // If the retryAttempt is set to default(-1). Try executing on all servers once. + // As calculating number of servers involves sending message to locator, it is + // done only when there is an exception. + if (cause instanceof ServerConnectivityException + && cause.getMessage().contains("Could not create a new connection")) { + // The client was unable to establish a connection before sending the + // request. + maxRetryAttempts = getConnectionSource().getAllServers().size(); + } else { + // The request was sent once. + maxRetryAttempts = getConnectionSource().getAllServers().size() - 1; + } + } + + return maxRetryAttempts; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java index 9481cb4..cd15529 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java @@ -699,7 +699,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc hasResult, emptySet(), true, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor, - resultCollector, serverToBuckets, retryAttempts, function.isHA(), + resultCollector, serverToBuckets, function.isHA(), regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } } else { @@ -725,7 +725,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc hasResult, emptySet(), isBucketFilter, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts, + serverRegionExecutor, resultCollector, serverToFilterMap, function.isHA(), regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } @@ -786,7 +786,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc emptySet(), true, isHA, optimizeForWrite, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA, + serverRegionExecutor, resultCollector, serverToBuckets, isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } @@ -810,7 +810,7 @@ public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAc emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs); ExecuteRegionFunctionSingleHopOp.execute(pool, region, - serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts, + serverRegionExecutor, resultCollector, serverToFilterMap, isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier); } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java index 40a128c..c4c3eb6 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java @@ -30,7 +30,6 @@ import org.apache.geode.GemFireException; import org.apache.geode.InternalGemFireException; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl; @@ -89,11 +88,10 @@ public class SingleHopClientExecutor { static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA, ResultCollector rc, Set<String> failedNodes, - final int retryAttemptsArg, final PoolImpl pool) { - ClientMetadataService cms = region.getCache().getClientMetadataService(); - int maxRetryAttempts = 0; + ClientMetadataService cms; + int maxRetryAttempts = -1; if (callableTasks != null && !callableTasks.isEmpty()) { List futures = null; @@ -120,15 +118,8 @@ public class SingleHopClientExecutor { throw new InternalGemFireException(e.getMessage()); } catch (ExecutionException ee) { - if (maxRetryAttempts == 0) { - maxRetryAttempts = retryAttemptsArg; - } - - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1; + if (maxRetryAttempts == -1) { + maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause()); } if (ee.getCause() instanceof InternalFunctionInvocationTargetException) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java index a2a343f..e37899d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java @@ -70,6 +70,7 @@ import org.apache.geode.security.GemFireSecurityException; public class ConnectionManagerImpl implements ConnectionManager { private static final Logger logger = LogService.getLogger(); private static final int NOT_WAITING = -1; + public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server "; private final String poolName; private final PoolStats poolStats; @@ -321,8 +322,7 @@ public class ConnectionManagerImpl implements ConnectionManager { return connection; } - throw new ServerConnectivityException( - "Could not create a new connection to server " + server); + throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java index ae9ae13..bb11a04 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java @@ -228,9 +228,8 @@ public class ExecuteFunctionOpRetryTest { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())), - failureMode)); - - when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts); + failureMode), + retryAttempts); args = null; memberMappedArg = mock(MemberMappedArgument.class); diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java index c91816f..1d0cad5 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.client.internal; import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -91,8 +92,6 @@ class ExecuteFunctionTestSupport { * This method has to be {@code static} because it is called before * {@link ExecuteFunctionTestSupport} is constructed. * - * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()} - * methods on {@link PoolImpl} * @param failureMode is the {@link FailureMode} that determines the kind of exception * to {@code throw} */ @@ -149,7 +148,7 @@ class ExecuteFunctionTestSupport { ExecuteFunctionTestSupport( final HAStatus haStatus, final FailureMode failureMode, - final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) { + final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) { final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class); when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS); @@ -174,6 +173,9 @@ class ExecuteFunctionTestSupport { executablePool = mock(PoolImpl.class); when(executablePool.getConnectionSource()).thenReturn(connectionSource); + when(executablePool.getRetryAttempts()).thenReturn(retryAttempts); + when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class))) + .thenCallRealMethod(); addPoolMockBehavior.accept(executablePool, failureMode); } diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java index 74f6748..e92bad1 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java @@ -309,7 +309,7 @@ public class ExecuteRegionFunctionOpRetryTest { default: throw new AssertionError("unknown FailureMode type: " + failureMode); } - }); + }, retryAttempts); executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), @@ -325,7 +325,8 @@ public class ExecuteRegionFunctionOpRetryTest { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())), - failureMode)); + failureMode), + retryAttempts); reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java index aef00fb..5649b1b 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java @@ -137,7 +137,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { } private void createMocks(final HAStatus haStatus, - final FailureMode failureModeArg) { + final FailureMode failureModeArg, Integer retryAttempts) { testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg, (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when( @@ -146,7 +146,8 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean())), - failureMode)); + failureMode), + retryAttempts); serverToFilterMap = new HashMap<>(); serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>()); @@ -158,7 +159,7 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { final int retryAttempts, final int expectTries, final FailureMode failureMode) { - createMocks(haStatus, failureMode); + createMocks(haStatus, failureMode, retryAttempts); executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts, testSupport.getExecutablePool(), @@ -182,7 +183,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { () -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute( executablePool, testSupport.getRegion(), executor, resultCollector, serverToFilterMap, - retryAttempts, testSupport.toBoolean(haStatus), executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl( testSupport.getRegion().getFullPath(), FUNCTION_NAME, @@ -199,7 +199,6 @@ public class ExecuteRegionFunctionSingleHopOpRetryTest { ignoreServerConnectivityException( () -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(), executor, resultCollector, serverToFilterMap, - retryAttempts, function.isHA(), executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl( testSupport.getRegion().getFullPath(), function, diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java index 2a8cf9f..020961d 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java @@ -68,14 +68,16 @@ import org.apache.geode.test.version.VersionManager; public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase { - @Parameterized.Parameters(name = "from_v{0}, with reindex={1}") + @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}") public static Collection<Object[]> data() { Collection<String> luceneVersions = getLuceneVersions(); Collection<Object[]> rval = new ArrayList<>(); luceneVersions.forEach(v -> { - rval.add(new Object[] {v, true}); - rval.add(new Object[] {v, false}); + rval.add(new Object[] {v, true, true}); + rval.add(new Object[] {v, false, true}); }); + rval.add(new Object[] {VersionManager.CURRENT_VERSION, true, true}); + rval.add(new Object[] {VersionManager.CURRENT_VERSION, true, false}); return rval; } @@ -84,6 +86,10 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu // Lucene Compatibility checks start with Apache Geode v1.2.0 // Removing the versions older than v1.2.0 result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0); + + // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling + // upgrade for 1.10.0. The change was verified by rolling from develop to develop. + result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0); if (result.size() < 1) { throw new RuntimeException("No older versions of Geode were found to test against"); } else { @@ -108,6 +114,9 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu @Parameterized.Parameter(1) public Boolean reindex; + @Parameterized.Parameter(2) + public Boolean singleHopEnabled; + private void deleteVMFiles() { System.out.println("deleting files in vm" + VM.getCurrentVMNum()); File pwd = new File("."); @@ -145,20 +154,24 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu return props; } - VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut, - String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) { - VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled); + VM rollClientToCurrentAndCreateRegion(VM oldClient, + ClientRegionShortcut shortcut, + String regionName, String[] hostNames, int[] locatorPorts, + boolean subscriptionEnabled, boolean singleHopEnabled) { + VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled, + singleHopEnabled); // recreate region on "rolled" client invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient); return rollClient; } - private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts, - boolean subscriptionEnabled) { + private VM rollClientToCurrent(VM oldClient, String[] hostNames, + int[] locatorPorts, + boolean subscriptionEnabled, boolean singleHopEnabled) { oldClient.invoke(invokeCloseCache()); VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId()); rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, - subscriptionEnabled)); + subscriptionEnabled, singleHopEnabled)); rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL)); return rollClient; } @@ -203,14 +216,17 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu cacheServer.start(); } - CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties, - final String[] hosts, final int[] ports, boolean subscriptionEnabled) { + CacheSerializableRunnable invokeCreateClientCache( + final Properties systemProperties, + final String[] hosts, final int[] ports, boolean subscriptionEnabled, + boolean singleHopEnabled) { return new CacheSerializableRunnable("execute: createClientCache") { @Override public void run2() { try { LuceneSearchWithRollingUpgradeDUnit.cache = - createClientCache(systemProperties, hosts, ports, subscriptionEnabled); + createClientCache(systemProperties, hosts, ports, subscriptionEnabled, + singleHopEnabled); } catch (Exception e) { fail("Error creating client cache", e); } @@ -225,13 +241,15 @@ public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4Distribu } - private static ClientCache createClientCache(Properties systemProperties, String[] hosts, - int[] ports, boolean subscriptionEnabled) { + private static ClientCache createClientCache(Properties systemProperties, + String[] hosts, + int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) { ClientCacheFactory cf = new ClientCacheFactory(systemProperties); if (subscriptionEnabled) { cf.setPoolSubscriptionEnabled(true); cf.setPoolSubscriptionRedundancy(-1); } + cf.setPoolPRSingleHopEnabled(singleHopEnabled); int hostsLength = hosts.length; for (int i = 0; i < hostsLength; i++) { cf.addPoolLocator(hosts[i], ports[i]); diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java index 95c0498..9ef5ca8 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java @@ -70,7 +70,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3); invokeRunnableInVMs( - invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false), + invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false, + singleHopEnabled), client); server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME)); @@ -107,7 +108,7 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol 60, server3); client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName, - hostNames, locatorPorts, false); + hostNames, locatorPorts, false, singleHopEnabled); expectedRegionSize += 10; putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60, 70, server2, server3); diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java index 3acba6c..1d7670c 100644 --- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java +++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java @@ -17,7 +17,6 @@ package org.apache.geode.cache.lucene; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.junit.Assert.assertTrue; -import org.junit.Ignore; import org.junit.Test; import org.apache.geode.cache.RegionShortcut; @@ -32,7 +31,6 @@ import org.apache.geode.test.dunit.VM; public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated extends LuceneSearchWithRollingUpgradeDUnit { - @Ignore("Disabled until GEODE-7258 is fixed") @Test public void test() throws Exception { @@ -85,7 +83,8 @@ public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRol invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1); invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2); invokeRunnableInVMs( - invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false), + invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false, + singleHopEnabled), client); // Create the index on the servers