This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit a5aba9165d2fab083c7c2543f38beb61c82e6c8c Author: Jens Deppe <jde...@pivotal.io> AuthorDate: Tue Jun 18 08:30:15 2019 -0700 Revert "GEODE-6588: Properly type Function execution related interfaces. (#3691)" This reverts commit 6c62540edd5637d5e2bd3a51b11279a2ba825c33. --- .../ClientFunctionTimeoutRegressionTest.java | 10 +-- .../execute/PRFunctionExecutionDUnitTest.java | 75 +++++++++--------- .../FunctionExecutionOnLonerRegressionTest.java | 14 +--- .../org/apache/geode/cache/execute/Execution.java | 21 +++--- .../geode/cache/execute/FunctionService.java | 27 +++---- .../internal/deadlock/GemFireDeadlockDetector.java | 17 ++--- .../internal/cache/execute/AbstractExecution.java | 55 ++++++++------ .../cache/execute/DefaultResultCollector.java | 9 +-- .../execute/DistributedRegionFunctionExecutor.java | 30 +++++--- .../cache/execute/FunctionExecutionService.java | 25 +++--- .../internal/cache/execute/InternalExecution.java | 8 +- .../execute/InternalFunctionExecutionService.java | 29 +++---- .../InternalFunctionExecutionServiceImpl.java | 88 ++++++++-------------- .../cache/execute/LocalResultCollectorImpl.java | 31 ++++---- .../cache/execute/MemberFunctionExecutor.java | 39 +++++----- .../cache/execute/MultiRegionFunctionExecutor.java | 59 ++++++++++----- .../execute/PartitionedRegionFunctionExecutor.java | 41 +++++++--- .../cache/execute/ServerFunctionExecutor.java | 43 ++++------- .../execute/ServerRegionFunctionExecutor.java | 59 +++++++-------- .../internal/cache/snapshot/WindowedExporter.java | 15 ++-- .../mutators/MemberConfigManager.java | 8 +- .../realizers/RegionConfigRealizer.java | 46 ++++++----- 22 files changed, 360 insertions(+), 389 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java index 2fd7a6e..93c88bb 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java @@ -183,14 +183,14 @@ public class ClientFunctionTimeoutRegressionTest implements Serializable { Function<Integer> function = new CheckClientReadTimeout(); FunctionService.registerFunction(function); - Execution<Integer, Boolean, List<Boolean>> execution; + Execution<Integer, Boolean, List<Boolean>> execution = null; if (functionServiceTarget == ExecutionTarget.REGION) { - execution = FunctionService.onRegion(clientCache.getRegion(regionName)); + execution = + FunctionService.onRegion(clientCache.getRegion(regionName)).setArguments(timeout); } else { - execution = FunctionService.onServer(clientCache.getDefaultPool()); + execution = FunctionService.onServer(clientCache.getDefaultPool()).setArguments(timeout); } - execution = execution.setArguments(timeout); ResultCollector<Boolean, List<Boolean>> resultCollector = execution.execute(function); @@ -212,7 +212,7 @@ public class ClientFunctionTimeoutRegressionTest implements Serializable { */ private static class CheckClientReadTimeout implements Function<Integer> { - CheckClientReadTimeout() { + public CheckClientReadTimeout() { // nothing } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java index 55c14ed..273df33 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java @@ -121,7 +121,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test to validate that the function execution is successful on PR with Loner Distributed System */ @Test - public void testFunctionExecution() { + public void testFunctionExecution() throws Exception { Properties config = getDistributedSystemProperties(); config.setProperty(MCAST_PORT, "0"); config.setProperty(LOCATORS, ""); @@ -141,7 +141,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } @Test - public void testHAFunctionExecution() { + public void testHAFunctionExecution() throws Exception { Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0); Function<Void> function = new TestFunction<>(false, TestFunction.TEST_FUNCTION10); @@ -160,7 +160,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test remote execution by a pure accessor which doesn't have the function factory present. */ @Test - public void testRemoteSingleKeyExecution_byName() { + public void testRemoteSingleKeyExecution_byName() throws Exception { VM accessor = getHost(0).getVM(2); VM datastore = getHost(0).getVM(3); @@ -208,7 +208,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * function will send Boolean as last result. factory present. */ @Test - public void testLocalSingleKeyExecution_byName_FunctionInvocationTargetException() { + public void testLocalSingleKeyExecution_byName_FunctionInvocationTargetException() + throws Exception { Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0); region.put(STRING_KEY, 1); @@ -230,7 +231,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * last result. */ @Test - public void testRemoteSingleKeyExecution_byName_FunctionInvocationTargetException() { + public void testRemoteSingleKeyExecution_byName_FunctionInvocationTargetException() + throws Exception { VM accessor = getHost(0).getVM(2); VM datastore = getHost(0).getVM(3); @@ -264,7 +266,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test remote execution by a pure accessor which doesn't have the function factory present. */ @Test - public void testRemoteSingleKeyExecution_byInstance() { + public void testRemoteSingleKeyExecution_byInstance() throws Exception { VM accessor = getHost(0).getVM(2); VM datastore = getHost(0).getVM(3); @@ -312,7 +314,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test remote execution of inline function by a pure accessor */ @Test - public void testRemoteSingleKeyExecution_byInlineFunction() { + public void testRemoteSingleKeyExecution_byInlineFunction() throws Exception { VM accessor = getHost(0).getVM(2); VM datastore = getHost(0).getVM(3); @@ -341,7 +343,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * present. ResultCollector = DefaultResultCollector haveResults = true; */ @Test - public void testRemoteMultiKeyExecution_byName() { + public void testRemoteMultiKeyExecution_byName() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -413,7 +415,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } @Test - public void testRemoteMultiKeyExecution_BucketMoved() { + public void testRemoteMultiKeyExecution_BucketMoved() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -461,7 +463,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } @Test - public void testLocalMultiKeyExecution_BucketMoved() { + public void testLocalMultiKeyExecution_BucketMoved() throws Exception { IgnoredException.addIgnoredException("BucketMovedException"); VM datastore0 = getHost(0).getVM(0); @@ -599,7 +601,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } }); - AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(this::executeFunction); + AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction()); datastore0.invoke(() -> { Thread.sleep(3_000); @@ -651,7 +653,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } }); - AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(this::executeFunction); + AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction()); datastore0.invoke(() -> { Thread.sleep(3000); @@ -667,7 +669,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * DefaultResultCollector haveResults = true; */ @Test - public void testRemoteMultiKeyExecution_byInlineFunction() { + public void testRemoteMultiKeyExecution_byInlineFunction() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -718,7 +720,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * present. ResultCollector = CustomResultCollector haveResults = true; */ @Test - public void testRemoteMultiKeyExecutionWithCollector_byName() { + public void testRemoteMultiKeyExecutionWithCollector_byName() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -775,7 +777,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * present. ResultCollector = DefaultResultCollector haveResults = false; */ @Test - public void testRemoteMultiKeyExecutionNoResult_byName() { + public void testRemoteMultiKeyExecutionNoResult_byName() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -820,7 +822,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { ResultCollector<Void, Void> resultCollector = dataSet.withFilter(keySet).setArguments(true).execute(function.getId()); - assertThatThrownBy(resultCollector::getResult).isInstanceOf(FunctionException.class) + assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class) .hasMessageStartingWith( String.format("Cannot %s result as the Function#hasResult() is false", "return any")); @@ -833,7 +835,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * milliseconds expected result to be 0.(as the execution gets the timeout) */ @Test - public void testRemoteMultiKeyExecution_timeout() { + public void testRemoteMultiKeyExecution_timeout() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -889,7 +891,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * present. ResultCollector = CustomResultCollector haveResults = false; */ @Test - public void testRemoteMultiKeyExecutionWithCollectorNoResult_byName() { + public void testRemoteMultiKeyExecutionWithCollectorNoResult_byName() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -936,7 +938,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { ResultCollector<Object, List<Object>> resultCollector = dataSet.withFilter(keySet).setArguments(true).execute(function.getId()); - assertThatThrownBy(resultCollector::getResult).isInstanceOf(FunctionException.class) + assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class) .hasMessageStartingWith( String.format("Cannot %s result as the Function#hasResult() is false", "return any")); @@ -948,7 +950,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * present. */ @Test - public void testRemoteMultiKeyExecution_byInstance() { + public void testRemoteMultiKeyExecution_byInstance() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -1022,7 +1024,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test bucketFilter functionality */ @Test - public void testBucketFilter_1() { + public void testBucketFilter_1() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -1104,7 +1106,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } @Test - public void testBucketFilterOverride() { + public void testBucketFilterOverride() throws Exception { VM accessor = getHost(0).getVM(3); VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); @@ -1160,7 +1162,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * DefaultResultCollector haveResult = true */ @Test - public void testLocalMultiKeyExecution_byName() { + public void testLocalMultiKeyExecution_byName() throws Exception { PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0); Set<String> keySet = new HashSet<>(); @@ -1208,7 +1210,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Test ability to execute a multi-key function by a local data store */ @Test - public void testLocalMultiKeyExecution_byInstance() { + public void testLocalMultiKeyExecution_byInstance() throws Exception { PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0); Set<String> keySet = new HashSet<>(); @@ -1257,7 +1259,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * works correctly such that there is not extra execution */ @Test - public void testMultiKeyExecutionOnASingleBucket_byName() { + public void testMultiKeyExecutionOnASingleBucket_byName() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1336,7 +1338,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * works correctly such that there is not extra execution */ @Test - public void testMultiKeyExecutionOnASingleBucket_byInstance() { + public void testMultiKeyExecutionOnASingleBucket_byInstance() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1414,7 +1416,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Ensure that the execution is happening all the PR as a whole */ @Test - public void testExecutionOnAllNodes_byName() { + public void testExecutionOnAllNodes_byName() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1475,7 +1477,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Ensure that the execution is happening all the PR as a whole */ @Test - public void testExecutionOnAllNodes_byInstance() { + public void testExecutionOnAllNodes_byInstance() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1535,7 +1537,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Ensure that the execution of inline function is happening all the PR as a whole */ @Test - public void testExecutionOnAllNodes_byInlineFunction() { + public void testExecutionOnAllNodes_byInlineFunction() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1590,7 +1592,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * LocalDataSet */ @Test - public void testExecutionOnAllNodes_LocalReadPR() { + public void testExecutionOnAllNodes_LocalReadPR() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1655,7 +1657,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * LocalDataSet */ @Test - public void testExecutionOnMultiNodes_LocalReadPR() { + public void testExecutionOnMultiNodes_LocalReadPR() throws Exception { VM datastore0 = getHost(0).getVM(0); VM datastore1 = getHost(0).getVM(1); VM datastore2 = getHost(0).getVM(2); @@ -1758,7 +1760,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Assert the {@link RegionFunctionContext} yields the proper objects. */ @Test - public void testLocalDataContext() { + public void testLocalDataContext() throws Exception { VM accessor = getHost(0).getVM(1); VM datastore1 = getHost(0).getVM(2); VM datastore2 = getHost(0).getVM(3); @@ -1841,7 +1843,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { * Assert the {@link RegionFunctionContext} yields the proper objects. */ @Test - public void testLocalDataContextWithColocation() { + public void testLocalDataContextWithColocation() throws Exception { VM accessor = getHost(0).getVM(1); VM datastore1 = getHost(0).getVM(2); VM datastore2 = getHost(0).getVM(3); @@ -1971,9 +1973,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { } }; - Execution<Boolean, Boolean, List<Boolean>> execution = FunctionService.onRegion(rootRegion); ResultCollector<Boolean, List<Boolean>> resultCollector = - execution.withFilter(createKeySet(key1)).execute(function); + FunctionService.onRegion(rootRegion).withFilter(createKeySet(key1)).execute(function); assertThat(resultCollector.getResult()).hasSize(1).containsExactly(true); } @@ -2125,7 +2126,9 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase { private Set<String> createKeySet(final String... keys) { Set<String> keySet = new HashSet<>(); - Collections.addAll(keySet, keys); + for (String key : keys) { + keySet.add(key); + } return keySet; } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java index bfd76cf..fc6efc9 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.Properties; import java.util.Set; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -65,7 +64,6 @@ public class FunctionExecutionOnLonerRegressionTest { private Region<String, String> region; private Set<String> keysForGet; private Set<String> expectedValues; - private Cache cache; @Before public void setUp() { @@ -78,7 +76,7 @@ public class FunctionExecutionOnLonerRegressionTest { DistributionManager dm = ds.getDistributionManager(); assertThat(dm).isInstanceOf(LonerDistributionManager.class); - cache = CacheFactory.create(ds); + Cache cache = CacheFactory.create(ds); RegionFactory<String, String> regionFactory = cache.createRegionFactory(PARTITION); region = regionFactory.create("region"); @@ -96,11 +94,6 @@ public class FunctionExecutionOnLonerRegressionTest { } } - @After - public void tearDown() throws Exception { - cache.close(); - } - private Properties getDistributedSystemProperties() { Properties config = new Properties(); config.setProperty(MCAST_PORT, "0"); @@ -111,10 +104,9 @@ public class FunctionExecutionOnLonerRegressionTest { } @Test - public void executeFunctionOnLonerShouldNotThrowClassCastException() { + public void executeFunctionOnLonerShouldNotThrowClassCastException() throws Exception { Execution<Void, Collection<String>, Collection<String>> execution = - FunctionService.onRegion(region); - execution = execution.withFilter(keysForGet); + FunctionService.onRegion(region).withFilter(keysForGet); ResultCollector<Collection<String>, Collection<String>> resultCollector = execution.execute(new TestFunction()); assertThat(resultCollector.getResult()) diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java b/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java index 5f89c1d..cec0bbc 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java +++ b/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java @@ -26,16 +26,16 @@ import org.apache.geode.cache.LowMemoryException; * This interface is implemented by GemFire. To obtain an instance of it use * {@link FunctionService}. * - * @param <ArgumentT> The type of the argument passed into the function, if any - * @param <ReturnT> The type of results sent by the function - * @param <AggregatorT> The type of the aggregated result returned by the ResultCollector + * @param <IN> The type of the argument passed into the function, if any + * @param <OUT> The type of results sent by the function + * @param <AGG> The type of the aggregated result returned by the ResultCollector * * @since GemFire 6.0 * * @see FunctionService * @see Function */ -public interface Execution<ArgumentT, ReturnT, AggregatorT> { +public interface Execution<IN, OUT, AGG> { /** * Specifies a data filter of routing objects for selecting the GemFire members to execute the @@ -51,7 +51,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * {@link FunctionService#onRegion(org.apache.geode.cache.Region)} * @since GemFire 6.0 */ - Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set<?> filter); + Execution<IN, OUT, AGG> withFilter(Set<?> filter); /** * Specifies the user data passed to the function when it is executed. The function can retrieve @@ -63,7 +63,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * @since Geode 1.2 * */ - Execution<ArgumentT, ReturnT, AggregatorT> setArguments(ArgumentT args); + Execution<IN, OUT, AGG> setArguments(IN args); /** * Specifies the user data passed to the function when it is executed. The function can retrieve @@ -76,7 +76,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * @deprecated use {@link #setArguments(Object)} instead * */ - Execution<ArgumentT, ReturnT, AggregatorT> withArgs(ArgumentT args); + Execution<IN, OUT, AGG> withArgs(IN args); /** * Specifies the {@link ResultCollector} that will receive the results after the function has been @@ -88,8 +88,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * @see ResultCollector * @since GemFire 6.0 */ - Execution<ArgumentT, ReturnT, AggregatorT> withCollector( - ResultCollector<ReturnT, AggregatorT> rc); + Execution<IN, OUT, AGG> withCollector(ResultCollector<OUT, AGG> rc); /** * Executes the function using its {@linkplain Function#getId() id} @@ -105,7 +104,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * * @since GemFire 6.0 */ - ResultCollector<ReturnT, AggregatorT> execute(String functionId) throws FunctionException; + ResultCollector<OUT, AGG> execute(String functionId) throws FunctionException; /** * Executes the function instance provided. @@ -122,5 +121,5 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> { * * @since GemFire 6.0 */ - ResultCollector<ReturnT, AggregatorT> execute(Function function) throws FunctionException; + ResultCollector<OUT, AGG> execute(Function function) throws FunctionException; } diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java index 9c5262f..9757caa 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java +++ b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java @@ -72,8 +72,7 @@ public class FunctionService { * @throws FunctionException if the region passed in is null * @since GemFire 6.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion( - Region region) { + public static Execution onRegion(Region region) { return getFunctionExecutionService().onRegion(region); } @@ -88,8 +87,7 @@ public class FunctionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - Pool pool) { + public static Execution onServer(Pool pool) { return getFunctionExecutionService().onServer(pool); } @@ -102,8 +100,7 @@ public class FunctionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - Pool pool) { + public static Execution onServers(Pool pool) { return getFunctionExecutionService().onServers(pool); } @@ -120,8 +117,7 @@ public class FunctionService { * pool * @since GemFire 6.5 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - RegionService regionService) { + public static Execution onServer(RegionService regionService) { return getFunctionExecutionService().onServer(regionService); } @@ -136,8 +132,7 @@ public class FunctionService { * pool * @since GemFire 6.5 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - RegionService regionService) { + public static Execution onServers(RegionService regionService) { return getFunctionExecutionService().onServers(regionService); } @@ -151,8 +146,7 @@ public class FunctionService { * @throws FunctionException if distributedMember is null * @since GemFire 7.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedMember distributedMember) { + public static Execution onMember(DistributedMember distributedMember) { return getFunctionExecutionService().onMember(distributedMember); } @@ -171,8 +165,7 @@ public class FunctionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - String... groups) { + public static Execution onMembers(String... groups) { return getFunctionExecutionService().onMembers(groups); } @@ -185,8 +178,7 @@ public class FunctionService { * @throws FunctionException if distributedMembers is null * @since GemFire 7.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - Set<DistributedMember> distributedMembers) { + public static Execution onMembers(Set<DistributedMember> distributedMembers) { return getFunctionExecutionService().onMembers(distributedMembers); } @@ -201,8 +193,7 @@ public class FunctionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - String... groups) { + public static Execution onMember(String... groups) { return getFunctionExecutionService().onMember(groups); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java index 007c028..1fa4d63 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java @@ -67,7 +67,7 @@ public class GemFireDeadlockDetector { @Override public synchronized Serializable getResult(long timeout, TimeUnit unit) - throws FunctionException { + throws FunctionException, InterruptedException { return null; } @@ -91,18 +91,15 @@ public class GemFireDeadlockDetector { }; - Execution<DistributedMember, HashSet<Dependency>, Serializable> onMembersExecution; - Execution<DistributedMember, HashSet<Dependency>, Serializable> withCollectorExecution; + Execution execution; if (targetMembers != null) { - onMembersExecution = FunctionService.onMembers(targetMembers); - withCollectorExecution = onMembersExecution.withCollector(collector); + execution = FunctionService.onMembers(targetMembers).withCollector(collector); } else { - onMembersExecution = FunctionService.onMembers(); - withCollectorExecution = onMembersExecution.withCollector(collector); + execution = FunctionService.onMembers().withCollector(collector); } - ((AbstractExecution) withCollectorExecution).setIgnoreDepartedMembers(true); - collector = withCollectorExecution.execute(new CollectDependencyFunction()); + ((AbstractExecution) execution).setIgnoreDepartedMembers(true); + collector = execution.execute(new CollectDependencyFunction()); // Wait for results collector.getResult(); @@ -130,7 +127,7 @@ public class GemFireDeadlockDetector { InternalDistributedMember member = instance.getDistributedMember(); Set<Dependency> dependencies = DeadlockDetector.collectAllDependencies(member); - context.getResultSender().lastResult(dependencies); + context.getResultSender().lastResult((Serializable) dependencies); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java index d015be6..c2378eb 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java @@ -49,8 +49,7 @@ import org.apache.geode.internal.logging.LogService; * @since GemFire 5.8LA * */ -public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> - implements InternalExecution<ArgumentT, ReturnT, AggregatorT> { +public abstract class AbstractExecution implements InternalExecution { private static final Logger logger = LogService.getLogger(); @@ -70,7 +69,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> protected volatile boolean isClientServerMode = false; - protected Set<String> failedNodes = new HashSet<>(); + protected Set<String> failedNodes = new HashSet<String>(); protected boolean isFnSerializationReqd; @@ -99,7 +98,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> @MakeNotStatic private static final ConcurrentHashMap<String, byte[]> idToFunctionAttributes = - new ConcurrentHashMap<>(); + new ConcurrentHashMap<String, byte[]>(); public static final byte NO_HA_NO_HASRESULT_NO_OPTIMIZEFORWRITE = 0; @@ -206,7 +205,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> return this.filter; } - public AbstractExecution<ArgumentT, ReturnT, AggregatorT> setIsReExecute() { + public AbstractExecution setIsReExecute() { this.isReExecute = true; if (this.executionNodesListener != null) { this.executionNodesListener.reset(); @@ -259,7 +258,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> public void executeFunctionOnLocalPRNode(final Function fn, final FunctionContext cx, final PartitionedRegionFunctionResultSender sender, DistributionManager dm, boolean isTx) { if (dm instanceof ClusterDistributionManager && !isTx) { - if (ServerConnection.isExecuteFunctionOnLocalNodeOnly() == 1) { + if (ServerConnection.isExecuteFunctionOnLocalNodeOnly().byteValue() == 1) { ServerConnection.executeFunctionOnLocalNodeOnly((byte) 3);// executed locally executeFunctionLocally(fn, cx, sender, dm); if (!sender.isLastResultReceived() && fn.hasResult()) { @@ -270,12 +269,15 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> } else { final ClusterDistributionManager newDM = (ClusterDistributionManager) dm; - newDM.getFunctionExecutor().execute(() -> { - executeFunctionLocally(fn, cx, sender, newDM); - if (!sender.isLastResultReceived() && fn.hasResult()) { - ((InternalResultSender) sender).setException(new FunctionException( - String.format("The function, %s, did not send last result", - fn.getId()))); + newDM.getFunctionExecutor().execute(new Runnable() { + @Override + public void run() { + executeFunctionLocally(fn, cx, sender, newDM); + if (!sender.isLastResultReceived() && fn.hasResult()) { + ((InternalResultSender) sender).setException(new FunctionException( + String.format("The function, %s, did not send last result", + fn.getId()))); + } } }); } @@ -296,12 +298,15 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> final ResultSender sender, DistributionManager dm, final boolean isTx) { if (dm instanceof ClusterDistributionManager && !isTx) { final ClusterDistributionManager newDM = (ClusterDistributionManager) dm; - newDM.getFunctionExecutor().execute(() -> { - executeFunctionLocally(fn, cx, sender, newDM); - if (!((InternalResultSender) sender).isLastResultReceived() && fn.hasResult()) { - ((InternalResultSender) sender).setException(new FunctionException( - String.format("The function, %s, did not send last result", - fn.getId()))); + newDM.getFunctionExecutor().execute(new Runnable() { + @Override + public void run() { + executeFunctionLocally(fn, cx, sender, newDM); + if (!((InternalResultSender) sender).isLastResultReceived() && fn.hasResult()) { + ((InternalResultSender) sender).setException(new FunctionException( + String.format("The function, %s, did not send last result", + fn.getId()))); + } } }); } else { @@ -330,7 +335,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> fn.execute(cx); stats.endFunctionExecution(start, fn.hasResult()); } catch (FunctionInvocationTargetException fite) { - FunctionException functionException; + FunctionException functionException = null; if (fn.isHA()) { functionException = new FunctionException(new InternalFunctionInvocationTargetException(fite.getMessage())); @@ -339,7 +344,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> } handleException(functionException, fn, cx, sender, dm); } catch (BucketMovedException bme) { - FunctionException functionException; + FunctionException functionException = null; if (fn.isHA()) { functionException = new FunctionException(new InternalFunctionInvocationTargetException(bme)); @@ -357,7 +362,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> } @Override - public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) { + public ResultCollector execute(final String functionName) { if (functionName == null) { throw new FunctionException( "The input function for the execute function request is null"); @@ -374,7 +379,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> } @Override - public ResultCollector<ReturnT, AggregatorT> execute(Function function) throws FunctionException { + public ResultCollector execute(Function function) throws FunctionException { if (function == null) { throw new FunctionException( "The input function for the execute function request is null"); @@ -424,7 +429,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> return this.ignoreDepartedMembers; } - protected abstract ResultCollector<ReturnT, AggregatorT> executeFunction(Function fn); + protected abstract ResultCollector executeFunction(Function fn); /** * validates whether a function should execute in presence of transaction and HeapCritical @@ -437,8 +442,8 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT> */ public abstract void validateExecution(Function function, Set targetMembers); - public LocalResultCollector<ReturnT, AggregatorT> getLocalResultCollector(Function function, - final ResultCollector<ReturnT, AggregatorT> rc) { + public LocalResultCollector<?, ?> getLocalResultCollector(Function function, + final ResultCollector<?, ?> rc) { if (rc instanceof LocalResultCollector) { return (LocalResultCollector) rc; } else { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java index ea5b8b3..2d8e167 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java @@ -15,7 +15,6 @@ package org.apache.geode.internal.cache.execute; import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.execute.Function; @@ -33,9 +32,9 @@ import org.apache.geode.distributed.DistributedMember; * @since GemFire 6.0 * */ -public class DefaultResultCollector implements ResultCollector<Object, List<Object>> { +public class DefaultResultCollector implements ResultCollector { - private List<Object> resultList = new ArrayList<>(); + private ArrayList<Object> resultList = new ArrayList<Object>(); public DefaultResultCollector() {} @@ -58,7 +57,7 @@ public class DefaultResultCollector implements ResultCollector<Object, List<Obje * @throws FunctionException if something goes wrong while retrieving the result */ @Override - public List<Object> getResult() throws FunctionException { + public Object getResult() throws FunctionException { return this.resultList; // this is full result } @@ -82,7 +81,7 @@ public class DefaultResultCollector implements ResultCollector<Object, List<Obje * @throws FunctionException if something goes wrong while retrieving the result */ @Override - public List<Object> getResult(long timeout, TimeUnit unit) throws FunctionException { + public Object getResult(long timeout, TimeUnit unit) throws FunctionException { return this.resultList; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java index a745d76..18e6f84 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java @@ -45,8 +45,7 @@ import org.apache.geode.internal.cache.control.MemoryThresholds; * @since GemFire 5.8 LA * */ -public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> { +public class DistributedRegionFunctionExecutor extends AbstractExecution { private final LocalRegion region; @@ -61,6 +60,16 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> this.region = (LocalRegion) r; } + private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe) { + super(drfe); + this.region = drfe.region; + if (drfe.filter != null) { + this.filter.clear(); + this.filter.addAll(drfe.filter); + } + this.sender = drfe.sender; + } + public DistributedRegionFunctionExecutor(DistributedRegion region, Set filter2, Object args, MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) { if (args != null) { @@ -212,8 +221,7 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter( - Set<Integer> bucketIDs) { + public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { if (bucketIDs != null && !bucketIDs.isEmpty()) { throw new IllegalArgumentException( String.format("Buckets as filter cannot be applied to a non partitioned region: %s", @@ -274,12 +282,14 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> @Override public String toString() { - return "[ DistributedRegionFunctionExecutor:" - + "args=" - + this.args - + ";region=" - + this.region.getName() - + "]"; + final StringBuffer buf = new StringBuffer(); + buf.append("[ DistributedRegionFunctionExecutor:"); + buf.append("args="); + buf.append(this.args); + buf.append(";region="); + buf.append(this.region.getName()); + buf.append("]"); + return buf.toString(); } /* diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java index bcc481f..6916728 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java @@ -50,8 +50,7 @@ public interface FunctionExecutionService { * @throws FunctionException if the region passed in is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion( - Region region); + Execution onRegion(Region region); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -64,7 +63,7 @@ public interface FunctionExecutionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(Pool pool); + Execution onServer(Pool pool); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -75,7 +74,7 @@ public interface FunctionExecutionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(Pool pool); + Execution onServers(Pool pool); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -90,8 +89,7 @@ public interface FunctionExecutionService { * pool * @since GemFire 6.5 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - RegionService regionService); + Execution onServer(RegionService regionService); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -104,8 +102,7 @@ public interface FunctionExecutionService { * pool * @since GemFire 6.5 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - RegionService regionService); + Execution onServers(RegionService regionService); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -117,8 +114,7 @@ public interface FunctionExecutionService { * @throws FunctionException if distributedMember is null * @since GemFire 7.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedMember distributedMember); + Execution onMember(DistributedMember distributedMember); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -135,8 +131,7 @@ public interface FunctionExecutionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - String... groups); + Execution onMembers(String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -147,8 +142,7 @@ public interface FunctionExecutionService { * @throws FunctionException if distributedMembers is null * @since GemFire 7.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - Set<DistributedMember> distributedMembers); + Execution onMembers(Set<DistributedMember> distributedMembers); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -161,8 +155,7 @@ public interface FunctionExecutionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - String... groups); + Execution onMember(String... groups); /** * Returns the {@link Function} defined by the functionId, returns null if no function is found diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java index 635cd19..ecb088f 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java @@ -28,11 +28,9 @@ import org.apache.geode.cache.execute.ResultCollector; * @since GemFire 5.8LA * */ -public interface InternalExecution<ArgumentT, ReturnT, AggregatorT> - extends Execution<ArgumentT, ReturnT, AggregatorT> { +public interface InternalExecution extends Execution { - InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument( - MemberMappedArgument argument); + InternalExecution withMemberMappedArgument(MemberMappedArgument argument); /** * Specifies a filter of bucketIDs for selecting the GemFire members to execute the function on. @@ -46,7 +44,7 @@ public interface InternalExecution<ArgumentT, ReturnT, AggregatorT> * {@link FunctionService#onRegion(org.apache.geode.cache.Region)} * @since Geode 1.0 */ - InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(Set<Integer> bucketIDs); + InternalExecution withBucketFilter(Set<Integer> bucketIDs); /** * If true, function execution waits for all exceptions from target nodes <br> diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java index 24fc6f8..6fb529a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java @@ -52,8 +52,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * * @see MultiRegionFunctionContext */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegions( - Set<Region> regions); + Execution onRegions(Set<Region> regions); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -66,8 +65,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * pool * @since GemFire 6.5 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - RegionService regionService, String... groups); + Execution onServers(RegionService regionService, String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -82,8 +80,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * pool * @since GemFire 6.5 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - RegionService regionService, String... groups); + Execution onServer(RegionService regionService, String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -94,8 +91,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(Pool pool, - String... groups); + Execution onServers(Pool pool, String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -108,8 +104,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(Pool pool, - String... groups); + Execution onServer(Pool pool, String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -123,9 +118,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * @since GemFire 6.0 * */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedSystem system, - DistributedMember distributedMember); + Execution onMember(DistributedSystem system, DistributedMember distributedMember); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -137,8 +130,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * @throws FunctionException if DistributedSystem instance passed is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - DistributedSystem system, String... groups); + Execution onMembers(DistributedSystem system, String... groups); /** * Uses {@code RANDOM_onMember} for tests. @@ -146,8 +138,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * <p> * TODO: maybe merge with {@link #onMembers(DistributedSystem, String...)} */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedSystem system, String... groups); + Execution onMember(DistributedSystem system, String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -159,7 +150,5 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi * @throws FunctionException if DistributedSystem instance passed is null * @since GemFire 6.0 */ - <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - DistributedSystem system, - Set<DistributedMember> distributedMembers); + Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java index 954e344..3416c93 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java @@ -67,50 +67,42 @@ public class InternalFunctionExecutionServiceImpl // FunctionExecutionService API ---------------------------------------------------------------- @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - Pool pool) { + public Execution onServer(Pool pool) { return onServer(pool, EMPTY_GROUPS); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - Pool pool) { + public Execution onServers(Pool pool) { return onServers(pool, EMPTY_GROUPS); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - RegionService regionService) { + public Execution onServer(RegionService regionService) { return onServer(regionService, EMPTY_GROUPS); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - RegionService regionService) { + public Execution onServers(RegionService regionService) { return onServers(regionService, EMPTY_GROUPS); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedMember distributedMember) { + public Execution onMember(DistributedMember distributedMember) { return onMember(getDistributedSystem(), distributedMember); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - String... groups) { + public Execution onMembers(String... groups) { return onMembers(getDistributedSystem(), groups); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - Set<DistributedMember> distributedMembers) { + public Execution onMembers(Set<DistributedMember> distributedMembers) { return onMembers(getDistributedSystem(), distributedMembers); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - String... groups) { + public Execution onMember(String... groups) { return onMember(getDistributedSystem(), groups); } @@ -119,8 +111,7 @@ public class InternalFunctionExecutionServiceImpl } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion( - Region region) { + public Execution onRegion(Region region) { if (region == null) { throw new FunctionException("Region instance passed is null"); } @@ -146,12 +137,12 @@ public class InternalFunctionExecutionServiceImpl } if (isClientRegion(region)) { - return new ServerRegionFunctionExecutor<>(region, proxyCache); + return new ServerRegionFunctionExecutor(region, proxyCache); } if (PartitionRegionHelper.isPartitionedRegion(region)) { - return new PartitionedRegionFunctionExecutor<>(region); + return new PartitionedRegionFunctionExecutor(region); } - return new DistributedRegionFunctionExecutor<>(region); + return new DistributedRegionFunctionExecutor(region); } @Override @@ -214,8 +205,7 @@ public class InternalFunctionExecutionServiceImpl // InternalFunctionExecutionService OnServerGroups API ----------------------------------------- @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - Pool pool, String... groups) { + public Execution onServer(Pool pool, String... groups) { if (pool == null) { throw new FunctionException( String.format("%s passed is null", "Pool instance ")); @@ -225,12 +215,11 @@ public class InternalFunctionExecutionServiceImpl throw new UnsupportedOperationException(); } - return new ServerFunctionExecutor<>(pool, false, groups); + return new ServerFunctionExecutor(pool, false, groups); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - Pool pool, String... groups) { + public Execution onServers(Pool pool, String... groups) { if (pool == null) { throw new FunctionException( String.format("%s passed is null", "Pool instance ")); @@ -240,13 +229,11 @@ public class InternalFunctionExecutionServiceImpl throw new UnsupportedOperationException(); } - return new ServerFunctionExecutor<>(pool, true, groups); + return new ServerFunctionExecutor(pool, true, groups); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer( - RegionService regionService, - String... groups) { + public Execution onServer(RegionService regionService, String... groups) { if (regionService == null) { throw new FunctionException(String.format("%s passed is null", "RegionService instance ")); @@ -262,16 +249,13 @@ public class InternalFunctionExecutionServiceImpl } } else { ProxyCache proxyCache = (ProxyCache) regionService; - return new ServerFunctionExecutor<>(proxyCache.getUserAttributes().getPool(), false, - proxyCache, + return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), false, proxyCache, groups); } } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers( - RegionService regionService, - String... groups) { + public Execution onServers(RegionService regionService, String... groups) { if (regionService == null) { throw new FunctionException(String.format("%s passed is null", "RegionService instance ")); @@ -287,8 +271,7 @@ public class InternalFunctionExecutionServiceImpl } } else { ProxyCache proxyCache = (ProxyCache) regionService; - return new ServerFunctionExecutor<>(proxyCache.getUserAttributes().getPool(), true, - proxyCache, + return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), true, proxyCache, groups); } } @@ -296,9 +279,7 @@ public class InternalFunctionExecutionServiceImpl // InternalFunctionExecutionService InDistributedSystem API ------------------------------------ @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedSystem system, - DistributedMember distributedMember) { + public Execution onMember(DistributedSystem system, DistributedMember distributedMember) { if (system == null) { throw new FunctionException(String.format("%s passed is null", "DistributedSystem instance ")); @@ -307,19 +288,17 @@ public class InternalFunctionExecutionServiceImpl throw new FunctionException(String.format("%s passed is null", "DistributedMember instance ")); } - return new MemberFunctionExecutor<>(system, distributedMember); + return new MemberFunctionExecutor(system, distributedMember); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - DistributedSystem system, - String... groups) { + public Execution onMembers(DistributedSystem system, String... groups) { if (system == null) { throw new FunctionException(String.format("%s passed is null", "DistributedSystem instance ")); } if (groups.length == 0) { - return new MemberFunctionExecutor<>(system); + return new MemberFunctionExecutor(system); } Set<DistributedMember> members = new HashSet<>(); for (String group : groups) { @@ -329,13 +308,11 @@ public class InternalFunctionExecutionServiceImpl throw new FunctionException(String.format("No members found in group(s) %s", Arrays.toString(groups))); } - return new MemberFunctionExecutor<>(system, members); + return new MemberFunctionExecutor(system, members); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember( - DistributedSystem system, - String... groups) { + public Execution onMember(DistributedSystem system, String... groups) { if (system == null) { throw new FunctionException(String.format("%s passed is null", "DistributedSystem instance ")); @@ -356,13 +333,11 @@ public class InternalFunctionExecutionServiceImpl throw new FunctionException(String.format("No members found in group(s) %s", Arrays.toString(groups))); } - return new MemberFunctionExecutor<>(system, members); + return new MemberFunctionExecutor(system, members); } @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers( - DistributedSystem system, - Set<DistributedMember> distributedMembers) { + public Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers) { if (system == null) { throw new FunctionException(String.format("%s passed is null", "DistributedSystem instance ")); @@ -371,14 +346,13 @@ public class InternalFunctionExecutionServiceImpl throw new FunctionException(String.format("%s passed is null", "distributedMembers set ")); } - return new MemberFunctionExecutor<>(system, distributedMembers); + return new MemberFunctionExecutor(system, distributedMembers); } // InternalFunctionExecutionService OnRegions API ---------------------------------------------- @Override - public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegions( - Set<Region> regions) { + public Execution onRegions(Set<Region> regions) { if (regions == null) { throw new IllegalArgumentException( String.format("The input %s for the execute function request is null", @@ -398,7 +372,7 @@ public class InternalFunctionExecutionServiceImpl "FunctionService#onRegions() is not supported for cache clients in client server mode"); } } - return new MultiRegionFunctionExecutor<>(regions); + return new MultiRegionFunctionExecutor(regions); } // InternalFunctionExecutionService unregisterAllFunctions API --------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java index 2c804dc..4bb2c6b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java @@ -25,9 +25,9 @@ import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ReplyProcessor21; -public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<OUT, AGG> { +public class LocalResultCollectorImpl implements LocalResultCollector { - private final ResultCollector<OUT, AGG> userRC; + private final ResultCollector userRC; private CountDownLatch latch = new CountDownLatch(1); @@ -39,19 +39,18 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< private FunctionException functionException = null; - private Function function; + private Function function = null; - private AbstractExecution<?, OUT, AGG> execution; + private AbstractExecution execution = null; - public LocalResultCollectorImpl(Function function, ResultCollector<OUT, AGG> rc, - Execution execution) { + public LocalResultCollectorImpl(Function function, ResultCollector rc, Execution execution) { this.function = function; this.userRC = rc; this.execution = (AbstractExecution) execution; } @Override - public synchronized void addResult(DistributedMember memberID, OUT resultOfSingleExecution) { + public synchronized void addResult(DistributedMember memberID, Object resultOfSingleExecution) { if (resultsCleared) { return; } @@ -62,7 +61,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< if (t.getCause() != null) { t = t.getCause(); } - this.userRC.addResult(memberID, (OUT) t); + this.userRC.addResult(memberID, t); } else { if (!(t instanceof InternalFunctionException)) { if (this.functionException == null) { @@ -79,7 +78,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< } this.functionException.addException(t); } else { - this.userRC.addResult(memberID, (OUT) t.getCause()); + this.userRC.addResult(memberID, t.getCause()); } } } else { @@ -105,7 +104,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< } @Override - public AGG getResult() throws FunctionException { + public Object getResult() throws FunctionException { if (this.resultCollected) { throw new FunctionException( "Function results already collected"); @@ -124,7 +123,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< .getCause() instanceof InternalFunctionInvocationTargetException) { clearResults(); this.execution = this.execution.setIsReExecute(); - ResultCollector<OUT, AGG> newRc; + ResultCollector newRc = null; if (execution.isFnSerializationReqd()) { newRc = this.execution.execute(this.function); } else { @@ -135,12 +134,13 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< } throw this.functionException; } else { - return this.userRC.getResult(); + Object result = this.userRC.getResult(); + return result; } } @Override - public AGG getResult(long timeout, TimeUnit unit) + public Object getResult(long timeout, TimeUnit unit) throws FunctionException, InterruptedException { boolean resultReceived = false; @@ -166,7 +166,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< .getCause() instanceof InternalFunctionInvocationTargetException) { clearResults(); this.execution = this.execution.setIsReExecute(); - ResultCollector<OUT, AGG> newRc; + ResultCollector newRc = null; if (execution.isFnSerializationReqd()) { newRc = this.execution.execute(this.function); } else { @@ -177,7 +177,8 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector< } throw this.functionException; } else { - return this.userRC.getResult(timeout, unit); + Object result = this.userRC.getResult(timeout, unit); + return result; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java index 15f4a04..a18c560 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java @@ -36,9 +36,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; -public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> - implements Execution<ArgumentT, ReturnT, AggregatorT> { +public class MemberFunctionExecutor extends AbstractExecution { protected InternalDistributedSystem ds; @@ -97,7 +95,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @SuppressWarnings("unchecked") - private ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function, + private ResultCollector executeFunction(final Function function, ResultCollector resultCollector) { final DistributionManager dm = this.ds.getDistributionManager(); final Set dest = new HashSet(this.members); @@ -111,8 +109,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> final InternalDistributedMember localVM = this.ds.getDistributionManager().getDistributionManagerId(); - final LocalResultCollector<ReturnT, AggregatorT> localRC = - getLocalResultCollector(function, resultCollector); + final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector); boolean remoteOnly = false; boolean localOnly = false; if (!dest.contains(localVM)) { @@ -132,7 +129,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> boolean isTx = false; InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { - isTx = cache.getTxManager().getTXState() != null; + isTx = cache.getTxManager().getTXState() == null ? false : true; } final FunctionContext context = new FunctionContextImpl(cache, function.getId(), getArgumentsForMember(localVM.getId()), resultSender); @@ -140,7 +137,8 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } if (!dest.isEmpty()) { - HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>(); + HashMap<InternalDistributedMember, Object> memberArgs = + new HashMap<InternalDistributedMember, Object>(); Iterator<DistributedMember> iter = dest.iterator(); while (iter.hasNext()) { InternalDistributedMember recip = (InternalDistributedMember) iter.next(); @@ -150,7 +148,8 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC, function, memberArgs, dest, resultSender); - return resultReceiver.getFunctionResultFrom(dest, function, this); + ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this); + return reply; } return localRC; } @@ -185,7 +184,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - protected ResultCollector<ReturnT, AggregatorT> executeFunction(Function function) { + protected ResultCollector executeFunction(Function function) { if (function.hasResult()) { ResultCollector rc = this.rc; if (rc == null) { @@ -199,56 +198,54 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) { + public Execution setArguments(Object args) { if (args == null) { throw new IllegalArgumentException( String.format("The input %s for the execute function request is null", "args")); } - return new MemberFunctionExecutor<>(this, args); + return new MemberFunctionExecutor(this, args); } // Changing the object!! @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) { + public Execution withArgs(Object args) { return setArguments(args); } // Changing the object!! @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) { + public Execution withCollector(ResultCollector rs) { if (rs == null) { throw new IllegalArgumentException( String.format("The input %s for the execute function request is null", "Result Collector")); } - return new MemberFunctionExecutor<>(this, rs); + return new MemberFunctionExecutor(this, rs); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set filter) { + public Execution withFilter(Set filter) { throw new FunctionException( String.format("Cannot specify %s for data independent functions", "filter")); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter( - Set<Integer> bucketIDs) { + public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { throw new FunctionException( String.format("Cannot specify %s for data independent functions", "bucket as filter")); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument( - MemberMappedArgument argument) { + public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) { if (argument == null) { throw new IllegalArgumentException( String.format("The input %s for the execute function request is null", "MemberMappedArgs")); } - return new MemberFunctionExecutor<>(this, argument); + return new MemberFunctionExecutor(this, argument); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java index 1b1fd1f..be26cb9 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java @@ -40,8 +40,7 @@ import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; -public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> { +public class MultiRegionFunctionExecutor extends AbstractExecution { private final Set<Region> regions; @@ -51,6 +50,33 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> this.regions = regions; } + private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor drfe) { + super(drfe); + this.regions = drfe.regions; + if (drfe.filter != null) { + this.filter.clear(); + this.filter.addAll(drfe.filter); + } + this.sender = drfe.sender; + } + + private MultiRegionFunctionExecutor(Set<Region> regions, Set filter2, Object args, + MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) { + if (args != null) { + this.args = args; + } else if (memberMappedArg != null) { + this.memberMappedArg = memberMappedArg; + this.isMemberMappedArgument = true; + } + this.sender = resultSender; + if (filter2 != null) { + this.filter.clear(); + this.filter.addAll(filter2); + } + this.regions = regions; + this.isClientServerMode = true; + } + private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor, MemberMappedArgument argument) { super(executor); @@ -143,8 +169,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter( - Set<Integer> bucketIDs) { + public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { throw new FunctionException( String.format("Cannot specify %s for multi region function", "bucket as filter")); @@ -175,7 +200,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> final Map<InternalDistributedMember, Set<String>> memberToRegionMap = calculateMemberToRegionMap(); final Set<InternalDistributedMember> dest = - new HashSet<>(memberToRegionMap.keySet()); + new HashSet<InternalDistributedMember>(memberToRegionMap.keySet()); if (dest.isEmpty()) { throw new FunctionException( @@ -189,7 +214,6 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } setExecutionNodes(dest); - assert cache != null; final InternalDistributedMember localVM = cache.getMyId(); final LocalResultCollector<?, ?> localResultCollector = getLocalResultCollector(function, resultCollector); @@ -208,7 +232,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> // if member is local VM dest.remove(localVM); Set<String> regionPathSet = memberToRegionMap.get(localVM); - Set<Region> regions = new HashSet<>(); + Set<Region> regions = new HashSet<Region>(); if (regionPathSet != null) { InternalCache cache1 = GemFireCacheImpl.getInstance(); for (String regionPath : regionPathSet) { @@ -218,12 +242,12 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> final FunctionContextImpl context = new MultiRegionFunctionContextImpl(cache, function.getId(), getArgumentsForMember(localVM.getId()), resultSender, regions, this.isReExecute); - boolean isTx = cache.getTxManager().getTXState() != null; + boolean isTx = cache.getTxManager().getTXState() == null ? false : true; executeFunctionOnLocalNode(function, context, resultSender, dm, isTx); } if (!dest.isEmpty()) { HashMap<InternalDistributedMember, Object> memberArgs = - new HashMap<>(); + new HashMap<InternalDistributedMember, Object>(); for (InternalDistributedMember recip : dest) { memberArgs.put(recip, getArgumentsForMember(recip.getId())); } @@ -237,9 +261,10 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } private Map<InternalDistributedMember, Set<String>> calculateMemberToRegionMap() { - Map<InternalDistributedMember, Set<String>> memberToRegions = new HashMap<>(); + Map<InternalDistributedMember, Set<String>> memberToRegions = + new HashMap<InternalDistributedMember, Set<String>>(); // nodes is maintained for node pruning logic - Set<InternalDistributedMember> nodes = new HashSet<>(); + Set<InternalDistributedMember> nodes = new HashSet<InternalDistributedMember>(); for (Region region : regions) { DataPolicy dp = region.getAttributes().getDataPolicy(); if (region instanceof PartitionedRegion) { @@ -251,7 +276,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> InternalDistributedMember localVm = cache.getMyId(); Set<String> regions = memberToRegions.get(localVm); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(pr.getFullPath()); memberToRegions.put(localVm, regions); @@ -260,7 +285,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> for (InternalDistributedMember member : prMembers) { Set<String> regions = memberToRegions.get(member); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(pr.getFullPath()); memberToRegions.put(member, regions); @@ -280,7 +305,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> added = true; Set<String> regions = memberToRegions.get(member); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(dr.getFullPath()); memberToRegions.put(member, regions); @@ -294,7 +319,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> .toArray()[new Random().nextInt(replicates.size())]); Set<String> regions = memberToRegions.get(member); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(dr.getFullPath()); memberToRegions.put(member, regions); @@ -305,7 +330,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> InternalDistributedMember local = cache.getMyId(); Set<String> regions = memberToRegions.get(local); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(region.getFullPath()); memberToRegions.put(local, regions); @@ -316,7 +341,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> InternalDistributedMember local = cache.getMyId(); Set<String> regions = memberToRegions.get(local); if (regions == null) { - regions = new HashSet<>(); + regions = new HashSet<String>(); } regions.add(region.getFullPath()); memberToRegions.put(local, regions); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java index f1aea8b..27728e3 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache.execute; +import java.util.Iterator; import java.util.Set; import org.apache.geode.cache.Region; @@ -28,8 +29,7 @@ import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; -public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> { +public class PartitionedRegionFunctionExecutor extends AbstractExecution { private final PartitionedRegion pr; @@ -48,6 +48,21 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> this.pr = (PartitionedRegion) r; } + private PartitionedRegionFunctionExecutor(PartitionedRegionFunctionExecutor prfe) { + super(prfe); + this.pr = prfe.pr; + this.executeOnBucketSet = prfe.executeOnBucketSet; + this.isPRSingleHop = prfe.isPRSingleHop; + this.isReExecute = prfe.isReExecute; + if (prfe.filter != null) { + this.filter.clear(); + this.filter.addAll(prfe.filter); + } + if (prfe.sender != null) { + this.sender = prfe.sender; + } + } + private PartitionedRegionFunctionExecutor(PartitionedRegionFunctionExecutor prfe, MemberMappedArgument argument) { // super copies args, rc and memberMappedArgument @@ -234,7 +249,9 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> bucketIDs.retainAll(actualBucketSet); - for (int bid : bucketIDs) { + Iterator<Integer> it = bucketIDs.iterator(); + while (it.hasNext()) { + int bid = it.next(); if (!actualBucketSet.contains(bid)) { throw new FunctionException("Bucket " + bid + " does not exist."); } @@ -299,14 +316,16 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> @Override public String toString() { - return "[ PartitionedRegionFunctionExecutor:" - + "args=" - + this.args - + ";filter=" - + this.filter - + ";region=" - + this.pr.getName() - + "]"; + final StringBuffer buf = new StringBuffer(); + buf.append("[ PartitionedRegionFunctionExecutor:"); + buf.append("args="); + buf.append(this.args); + buf.append(";filter="); + buf.append(this.filter); + buf.append(";region="); + buf.append(this.pr.getName()); + buf.append("]"); + return buf.toString(); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java index 8dcb163..9cbc4dd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java @@ -14,7 +14,6 @@ */ package org.apache.geode.internal.cache.execute; -import java.util.List; import java.util.Set; import org.apache.geode.cache.client.Pool; @@ -33,9 +32,7 @@ import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector; -public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> - implements Execution<ArgumentT, ReturnT, AggregatorT> { +public class ServerFunctionExecutor extends AbstractExecution { private PoolImpl pool; @@ -82,9 +79,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> this.isMemberMappedArgument = true; } - protected ResultCollector<ReturnT, AggregatorT> executeFunction(final String functionId, - boolean result, - boolean isHA, + protected ResultCollector executeFunction(final String functionId, boolean result, boolean isHA, boolean optimizeForWrite) { try { if (proxyCache != null) { @@ -113,7 +108,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - protected ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function) { + protected ResultCollector executeFunction(final Function function) { byte hasResult = 0; try { if (proxyCache != null) { @@ -126,7 +121,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> if (function.hasResult()) { // have Results hasResult = 1; if (this.rc == null) { // Default Result Collector - ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector(); + ResultCollector defaultCollector = new DefaultResultCollector(); return executeOnServer(function, defaultCollector, hasResult); } else { // Custome Result COllector return executeOnServer(function, this.rc, hasResult); @@ -141,9 +136,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } - private ResultCollector<ReturnT, AggregatorT> executeOnServer(Function function, - ResultCollector rc, - byte hasResult) { + private ResultCollector executeOnServer(Function function, ResultCollector rc, byte hasResult) { FunctionStats stats = FunctionStats.getFunctionStats(function.getId()); try { validateExecution(function, null); @@ -165,9 +158,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } } - private ResultCollector<ReturnT, AggregatorT> executeOnServer(String functionId, - ResultCollector rc, - byte hasResult, + private ResultCollector executeOnServer(String functionId, ResultCollector rc, byte hasResult, boolean isHA, boolean optimizeForWrite) { FunctionStats stats = FunctionStats.getFunctionStats(functionId); try { @@ -237,54 +228,52 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set filter) { + public Execution withFilter(Set filter) { throw new FunctionException( String.format("Cannot specify %s for data independent functions", "filter")); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter( - Set<Integer> bucketIDs) { + public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { throw new FunctionException( String.format("Cannot specify %s for data independent functions", "buckets as filter")); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) { + public Execution setArguments(Object args) { if (args == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "args")); } - return new ServerFunctionExecutor<>(this, args); + return new ServerFunctionExecutor(this, args); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) { + public Execution withArgs(Object args) { return setArguments(args); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) { + public Execution withCollector(ResultCollector rs) { if (rs == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "Result Collector")); } - return new ServerFunctionExecutor<>(this, rs); + return new ServerFunctionExecutor(this, rs); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument( - MemberMappedArgument argument) { + public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) { if (argument == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "MemberMapped Args")); } - return new ServerFunctionExecutor<>(this, argument); + return new ServerFunctionExecutor(this, argument); } @Override @@ -295,7 +284,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) { + public ResultCollector execute(final String functionName) { if (functionName == null) { throw new FunctionException( "The input function for the execute function request is null"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java index f5d5cf2..150b0ae 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java @@ -14,7 +14,6 @@ */ package org.apache.geode.internal.cache.execute; -import java.util.List; import java.util.Set; import org.apache.logging.log4j.Logger; @@ -41,8 +40,7 @@ import org.apache.geode.internal.logging.LogService; * @see FunctionService#onRegion(Region) * * @since GemFire 5.8 LA */ -public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> - extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> { +public class ServerRegionFunctionExecutor extends AbstractExecution { private static final Logger logger = LogService.getLogger(); private final LocalRegion region; @@ -115,29 +113,28 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set fltr) { + public Execution withFilter(Set fltr) { if (fltr == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "filter")); } this.executeOnBucketSet = false; - return new ServerRegionFunctionExecutor<>(this, fltr); + return new ServerRegionFunctionExecutor(this, fltr); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter( - Set<Integer> bucketIDs) { + public InternalExecution withBucketFilter(Set<Integer> bucketIDs) { if (bucketIDs == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "buckets as filter")); } - return new ServerRegionFunctionExecutor<>(this, bucketIDs, true /* execute on bucketset */); + return new ServerRegionFunctionExecutor(this, bucketIDs, true /* execute on bucketset */); } @Override - protected ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function) { + protected ResultCollector executeFunction(final Function function) { byte hasResult = 0; try { if (proxyCache != null) { @@ -150,7 +147,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> if (function.hasResult()) { // have Results hasResult = 1; if (this.rc == null) { // Default Result Collector - ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector(); + ResultCollector defaultCollector = new DefaultResultCollector(); return executeOnServer(function, defaultCollector, hasResult); } else { // Custome Result COllector return executeOnServer(function, this.rc, hasResult); @@ -164,8 +161,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } } - protected ResultCollector<ReturnT, AggregatorT> executeFunction(final String functionId, - boolean resultReq, + protected ResultCollector executeFunction(final String functionId, boolean resultReq, boolean isHA, boolean optimizeForWrite) { try { if (proxyCache != null) { @@ -178,7 +174,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> if (resultReq) { // have Results hasResult = 1; if (this.rc == null) { // Default Result Collector - ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector(); + ResultCollector defaultCollector = new DefaultResultCollector(); return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite); } else { // Custome Result COllector return executeOnServer(functionId, this.rc, hasResult, isHA, optimizeForWrite); @@ -192,8 +188,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } } - private ResultCollector<ReturnT, AggregatorT> executeOnServer(Function function, - ResultCollector collector, + private ResultCollector executeOnServer(Function function, ResultCollector collector, byte hasResult) throws FunctionException { ServerRegionProxy srp = getServerRegionProxy(); FunctionStats stats = FunctionStats.getFunctionStats(function.getId(), this.region.getSystem()); @@ -282,10 +277,13 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } return srp; } else { - String message = srp + ": " - + "No available connection was found. Server Region Proxy is not available for this region " - + region.getName(); - throw new FunctionException(message); + StringBuilder message = new StringBuilder(); + message.append(srp).append(": "); + message + .append( + "No available connection was found. Server Region Proxy is not available for this region ") + .append(region.getName()); + throw new FunctionException(message.toString()); } } @@ -295,45 +293,44 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> @Override public String toString() { - return "[ ServerRegionExecutor:" + "args=" + this.args - + " ;filter=" + this.filter + " ;region=" + this.region.getName() - + "]"; + return new StringBuffer().append("[ ServerRegionExecutor:").append("args=").append(this.args) + .append(" ;filter=").append(this.filter).append(" ;region=").append(this.region.getName()) + .append("]").toString(); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) { + public Execution setArguments(Object args) { if (args == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "args")); } - return new ServerRegionFunctionExecutor<>(this, args); + return new ServerRegionFunctionExecutor(this, args); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) { + public Execution withArgs(Object args) { return setArguments(args); } @Override - public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) { + public Execution withCollector(ResultCollector rs) { if (rs == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "Result Collector")); } - return new ServerRegionFunctionExecutor<>(this, rs); + return new ServerRegionFunctionExecutor(this, rs); } @Override - public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument( - MemberMappedArgument argument) { + public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) { if (argument == null) { throw new FunctionException( String.format("The input %s for the execute function request is null", "MemberMappedArgument")); } - return new ServerRegionFunctionExecutor<>(this, argument); + return new ServerRegionFunctionExecutor(this, argument); } @Override @@ -347,7 +344,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT> } @Override - public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) { + public ResultCollector execute(final String functionName) { if (functionName == null) { throw new FunctionException( "The input function for the execute function request is null"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java index 72e9105..52cd434 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.geode.cache.EntryDestroyedException; import org.apache.geode.cache.Region; -import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; @@ -77,17 +76,15 @@ public class WindowedExporter<K, V> implements Exporter<K, V> { SnapshotPacket last = new SnapshotPacket(); DistributedMember me = region.getCache().getDistributedSystem().getDistributedMember(); - WindowedArgs<K, V> args = new WindowedArgs<>(me, options); + WindowedArgs<K, V> args = new WindowedArgs<K, V>(me, options); WindowedExportCollector results = new WindowedExportCollector(local, last); try { // Since the ExportCollector already is a LocalResultsCollector it's ok not // to keep the reference to the ResultsCollector returned from execute(). // Normally discarding the reference can cause issues if GC causes the // weak ref in ProcessorKeeper21 to be collected!! - Execution<Object, Object, BlockingQueue<SnapshotPacket>> onRegion = - FunctionService.onRegion(region); - InternalExecution exec = - (InternalExecution) onRegion.setArguments(args).withCollector(results); + InternalExecution exec = (InternalExecution) FunctionService.onRegion(region) + .setArguments(args).withCollector(results); // Ensure that our collector gets all exceptions so we can shut down the // queue properly. @@ -189,7 +186,7 @@ public class WindowedExporter<K, V> implements Exporter<K, V> { try { int bufferSize = 0; - List<SnapshotRecord> buffer = new ArrayList<>(); + List<SnapshotRecord> buffer = new ArrayList<SnapshotRecord>(); DistributedMember me = region.getCache().getDistributedSystem().getDistributedMember(); for (Iterator<Entry<K, V>> iter = region.entrySet().iterator(); iter.hasNext() && !window.isAborted();) { @@ -277,10 +274,10 @@ public class WindowedExporter<K, V> implements Exporter<K, V> { this.end = end; done = new AtomicBoolean(false); - members = new ConcurrentHashMap<>(); + members = new ConcurrentHashMap<DistributedMember, Integer>(); // cannot bound queue to exert back pressure - entries = new LinkedBlockingQueue<>(); + entries = new LinkedBlockingQueue<SnapshotPacket>(); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java index e6ed0ff..eb6549a 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java @@ -97,11 +97,9 @@ public class MemberConfigManager implements ConfigurationManager<MemberConfig> { private ArrayList<MemberInformation> getMemberInformation( Set<DistributedMember> distributedMembers) { - Execution<DistributedMember, MemberInformation, ArrayList<MemberInformation>> execution = - FunctionService.onMembers(distributedMembers); - ResultCollector<MemberInformation, ArrayList<MemberInformation>> resultCollector = - execution.execute(new GetMemberInformationFunction()); - return resultCollector.getResult(); + Execution execution = FunctionService.onMembers(distributedMembers); + ResultCollector<?, ?> resultCollector = execution.execute(new GetMemberInformationFunction()); + return (ArrayList<MemberInformation>) resultCollector.getResult(); } @VisibleForTesting diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java index 88fc336..23eecda 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java @@ -62,8 +62,7 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> * @param regionPath this is the full path of the region */ public void create(RegionConfig regionConfig, String regionPath, Cache cache) { - RegionFactory<Object, Object> factory = - getRegionFactory(cache, regionConfig.getRegionAttributes()); + RegionFactory factory = getRegionFactory(cache, regionConfig.getRegionAttributes()); RegionPath regionPathData = new RegionPath(regionPath); String regionName = regionPathData.getName(); String parentRegionPath = regionPathData.getParent(); @@ -76,9 +75,8 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> factory.createSubregion(parentRegion, regionName); } - private <K, V> RegionFactory<K, V> getRegionFactory(Cache cache, - RegionAttributesType regionAttributes) { - RegionFactory<K, V> factory = cache.createRegionFactory(); + private RegionFactory getRegionFactory(Cache cache, RegionAttributesType regionAttributes) { + RegionFactory factory = cache.createRegionFactory(); factory.setDataPolicy(DataPolicy.fromString(regionAttributes.getDataPolicy().name())); @@ -87,13 +85,13 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> } if (regionAttributes.getCacheLoader() != null) { - factory + ((RegionFactory<Object, Object>) factory) .setCacheLoader(DeclarableTypeInstantiator.newInstance(regionAttributes.getCacheLoader(), cache)); } if (regionAttributes.getCacheWriter() != null) { - factory + ((RegionFactory<Object, Object>) factory) .setCacheWriter(DeclarableTypeInstantiator.newInstance(regionAttributes.getCacheWriter(), cache)); } @@ -104,25 +102,25 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> for (int i = 0; i < configListeners.size(); i++) { listeners[i] = DeclarableTypeInstantiator.newInstance(configListeners.get(i), cache); } - factory.initCacheListeners(listeners); + ((RegionFactory<Object, Object>) factory).initCacheListeners(listeners); } final String keyConstraint = regionAttributes.getKeyConstraint(); final String valueConstraint = regionAttributes.getValueConstraint(); if (keyConstraint != null && !keyConstraint.isEmpty()) { - Class<K> keyConstraintClass = + Class<Object> keyConstraintClass = CliUtil.forName(keyConstraint, CliStrings.CREATE_REGION__KEYCONSTRAINT); - factory.setKeyConstraint(keyConstraintClass); + ((RegionFactory<Object, Object>) factory).setKeyConstraint(keyConstraintClass); } if (valueConstraint != null && !valueConstraint.isEmpty()) { - Class<V> valueConstraintClass = + Class<Object> valueConstraintClass = CliUtil.forName(valueConstraint, CliStrings.CREATE_REGION__VALUECONSTRAINT); - factory.setValueConstraint(valueConstraintClass); + ((RegionFactory<Object, Object>) factory).setValueConstraint(valueConstraintClass); } if (regionAttributes.getCompressor() != null) { - factory + ((RegionFactory<Object, Object>) factory) .setCompressor(DeclarableTypeInstantiator.newInstance(regionAttributes.getCompressor())); } @@ -134,13 +132,13 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> if (regionAttributes.getEntryIdleTime() != null) { RegionAttributesType.ExpirationAttributesType eitl = regionAttributes.getEntryIdleTime(); - factory.setEntryIdleTimeout( + ((RegionFactory<Object, Object>) factory).setEntryIdleTimeout( new ExpirationAttributes(Integer.valueOf(eitl.getTimeout()), ExpirationAction.fromXmlString(eitl.getAction()))); if (eitl.getCustomExpiry() != null) { - factory.setCustomEntryIdleTimeout( + ((RegionFactory<Object, Object>) factory).setCustomEntryIdleTimeout( DeclarableTypeInstantiator.newInstance(eitl.getCustomExpiry(), cache)); } @@ -148,12 +146,12 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> if (regionAttributes.getEntryTimeToLive() != null) { RegionAttributesType.ExpirationAttributesType ettl = regionAttributes.getEntryTimeToLive(); - factory.setEntryTimeToLive( + ((RegionFactory<Object, Object>) factory).setEntryTimeToLive( new ExpirationAttributes(Integer.valueOf(ettl.getTimeout()), ExpirationAction.fromXmlString(ettl.getAction()))); if (ettl.getCustomExpiry() != null) { - factory + ((RegionFactory<Object, Object>) factory) .setCustomEntryTimeToLive(DeclarableTypeInstantiator.newInstance(ettl.getCustomExpiry(), cache)); } @@ -161,14 +159,14 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> if (regionAttributes.getRegionIdleTime() != null) { RegionAttributesType.ExpirationAttributesType ritl = regionAttributes.getRegionIdleTime(); - factory.setRegionIdleTimeout( + ((RegionFactory<Object, Object>) factory).setRegionIdleTimeout( new ExpirationAttributes(Integer.valueOf(ritl.getTimeout()), ExpirationAction.fromXmlString(ritl.getAction()))); } if (regionAttributes.getRegionTimeToLive() != null) { RegionAttributesType.ExpirationAttributesType rttl = regionAttributes.getRegionTimeToLive(); - factory.setRegionTimeToLive( + ((RegionFactory<Object, Object>) factory).setRegionTimeToLive( new ExpirationAttributes(Integer.valueOf(rttl.getTimeout()), ExpirationAction.fromXmlString(rttl.getAction()))); } @@ -209,12 +207,12 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> if (regionAttributes.getGatewaySenderIds() != null) { Arrays.stream(regionAttributes.getGatewaySenderIds().split(",")) - .forEach(factory::addGatewaySenderId); + .forEach(gsi -> factory.addGatewaySenderId(gsi)); } if (regionAttributes.getAsyncEventQueueIds() != null) { Arrays.stream(regionAttributes.getAsyncEventQueueIds().split(",")) - .forEach(factory::addAsyncEventQueueId); + .forEach(gsi -> factory.addAsyncEventQueueId(gsi)); } factory.setConcurrencyChecksEnabled(regionAttributes.isConcurrencyChecksEnabled()); @@ -233,7 +231,7 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> return factory; } - private PartitionAttributesImpl convertToRegionFactoryPartitionAttributes( + PartitionAttributesImpl convertToRegionFactoryPartitionAttributes( RegionAttributesType.PartitionAttributes configAttributes, Cache cache) { PartitionAttributesImpl partitionAttributes = new PartitionAttributesImpl(); if (configAttributes == null) { @@ -278,9 +276,9 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig> if (configAttributes.getPartitionListeners() != null) { List<DeclarableType> configListeners = configAttributes.getPartitionListeners(); - for (DeclarableType configListener : configListeners) { + for (int i = 0; i < configListeners.size(); i++) { partitionAttributes.addPartitionListener( - DeclarableTypeInstantiator.newInstance(configListener, cache)); + DeclarableTypeInstantiator.newInstance(configListeners.get(i), cache)); } }