This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new afd082c GEODE-6176: Make FunctionService testable with internal delegates afd082c is described below commit afd082c67895abf6e9467cc4fbf1bf7cc0c28995 Author: Kirk Lund <kl...@apache.org> AuthorDate: Mon Dec 10 13:44:24 2018 -0800 GEODE-6176: Make FunctionService testable with internal delegates Introduce new internal FunctionExecutionService API interfaces: * FunctionExecutionService * InternalFunctionExecutionService Implement interfaces with: * InternalFunctionExecutionServiceImpl Collapse FunctionServiceManager into InternalFunctionExecutionServiceImpl Change the two static API classes to delegate to an instance of InternalFunctionExecutionServiceImpl: * FunctionService (User API) -- only has private internal changes * InternalFunctionService -- now extends FunctionService Geode classes that use the static API classes directly can now be changed to depend on one of the two interfaces using constructor injection to facilitate unit testing without PowerMock. --- .../geode/cache/execute/FunctionService.java | 57 +-- .../internal/InternalDistributedSystem.java | 4 +- .../cache/execute/FunctionExecutionService.java} | 91 +---- .../execute/InternalFunctionExecutionService.java | 154 ++++++++ .../InternalFunctionExecutionServiceImpl.java} | 403 +++++++++------------ .../cache/execute/InternalFunctionService.java | 72 ++-- .../apache/geode/internal/cache/properties.html | 2 +- .../tier/sockets/command/ExecuteFunction70.java | 4 +- 8 files changed, 412 insertions(+), 375 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java index 630bfeb..3566c38 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java +++ b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java @@ -23,11 +23,10 @@ import org.apache.geode.cache.RegionService; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.DistributedSystemDisconnectedException; -import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.execute.FunctionExecutionService; +import org.apache.geode.internal.cache.execute.InternalFunctionExecutionServiceImpl; /** * Provides the entry point into execution of user defined {@linkplain Function}s. @@ -39,9 +38,18 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; * @since GemFire 6.0 */ public class FunctionService { - private static final FunctionServiceManager functionSvcMgr = new FunctionServiceManager(); - FunctionService() {} + private static final FunctionService INSTANCE = + new FunctionService(new InternalFunctionExecutionServiceImpl()); + + private final FunctionExecutionService functionExecutionService; + + /** + * Protected visibility to allow InternalFunctionService to extend FunctionService. + */ + protected FunctionService(FunctionExecutionService functionExecutionService) { + this.functionExecutionService = functionExecutionService; + } /** * Returns an {@link Execution} object that can be used to execute a data dependent function on @@ -63,7 +71,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static Execution onRegion(Region region) { - return functionSvcMgr.onRegion(region); + return getFunctionExecutionService().onRegion(region); } /** @@ -78,7 +86,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static Execution onServer(Pool pool) { - return functionSvcMgr.onServer(pool); + return getFunctionExecutionService().onServer(pool); } /** @@ -91,7 +99,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static Execution onServers(Pool pool) { - return functionSvcMgr.onServers(pool); + return getFunctionExecutionService().onServers(pool); } /** @@ -108,7 +116,7 @@ public class FunctionService { * @since GemFire 6.5 */ public static Execution onServer(RegionService regionService) { - return functionSvcMgr.onServer(regionService); + return getFunctionExecutionService().onServer(regionService); } /** @@ -123,7 +131,7 @@ public class FunctionService { * @since GemFire 6.5 */ public static Execution onServers(RegionService regionService) { - return functionSvcMgr.onServers(regionService); + return getFunctionExecutionService().onServers(regionService); } /** @@ -135,10 +143,9 @@ public class FunctionService { * @param distributedMember defines a member in the distributed system * @throws FunctionException if distributedMember is null * @since GemFire 7.0 - * */ public static Execution onMember(DistributedMember distributedMember) { - return functionSvcMgr.onMember(getDistributedSystem(), distributedMember); + return getFunctionExecutionService().onMember(distributedMember); } /** @@ -157,7 +164,7 @@ public class FunctionService { * @since GemFire 7.0 */ public static Execution onMembers(String... groups) { - return functionSvcMgr.onMembers(getDistributedSystem(), groups); + return getFunctionExecutionService().onMembers(groups); } /** @@ -170,7 +177,7 @@ public class FunctionService { * @since GemFire 7.0 */ public static Execution onMembers(Set<DistributedMember> distributedMembers) { - return functionSvcMgr.onMembers(getDistributedSystem(), distributedMembers); + return getFunctionExecutionService().onMembers(distributedMembers); } /** @@ -185,7 +192,7 @@ public class FunctionService { * @since GemFire 7.0 */ public static Execution onMember(String... groups) { - return functionSvcMgr.onMember(getDistributedSystem(), groups); + return getFunctionExecutionService().onMember(groups); } /** @@ -196,7 +203,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static Function getFunction(String functionId) { - return functionSvcMgr.getFunction(functionId); + return getFunctionExecutionService().getFunction(functionId); } /** @@ -212,7 +219,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static void registerFunction(Function function) { - functionSvcMgr.registerFunction(function); + getFunctionExecutionService().registerFunction(function); } /** @@ -224,7 +231,7 @@ public class FunctionService { * @since GemFire 6.0 */ public static void unregisterFunction(String functionId) { - functionSvcMgr.unregisterFunction(functionId); + getFunctionExecutionService().unregisterFunction(functionId); } /** @@ -234,10 +241,9 @@ public class FunctionService { * @since GemFire 6.0 */ public static boolean isRegistered(String functionId) { - return functionSvcMgr.isRegistered(functionId); + return getFunctionExecutionService().isRegistered(functionId); } - /** * Returns all locally registered functions * @@ -245,15 +251,10 @@ public class FunctionService { * @since GemFire 6.0 */ public static Map<String, Function> getRegisteredFunctions() { - return functionSvcMgr.getRegisteredFunctions(); + return getFunctionExecutionService().getRegisteredFunctions(); } - private static DistributedSystem getDistributedSystem() { - DistributedSystem system = InternalDistributedSystem.getConnectedInstance(); - if (system == null) { - throw new DistributedSystemDisconnectedException( - "This connection to a distributed system has been disconnected."); - } - return system; + private static FunctionExecutionService getFunctionExecutionService() { + return INSTANCE.functionExecutionService; } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 32f3d13..e2cc885 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -61,7 +61,6 @@ import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.CacheXmlException; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; @@ -91,6 +90,7 @@ import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.execute.FunctionServiceStats; import org.apache.geode.internal.cache.execute.FunctionStats; +import org.apache.geode.internal.cache.execute.InternalFunctionService; import org.apache.geode.internal.cache.tier.sockets.EncryptorImpl; import org.apache.geode.internal.cache.xmlcache.CacheServerCreation; import org.apache.geode.internal.logging.InternalLogWriter; @@ -1432,7 +1432,7 @@ public class InternalDistributedSystem extends DistributedSystem functionstats.close(); } - (new FunctionServiceManager()).unregisterAllFunctions(); + InternalFunctionService.unregisterAllFunctions(); if (this.sampler != null) { this.sampler.stop(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java old mode 100755 new mode 100644 similarity index 73% copy from geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java copy to geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java index 630bfeb..6916728 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.cache.execute; +package org.apache.geode.internal.cache.execute; import java.util.Map; import java.util.Properties; @@ -23,25 +23,13 @@ import org.apache.geode.cache.RegionService; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.DistributedSystemDisconnectedException; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -/** - * Provides the entry point into execution of user defined {@linkplain Function}s. - * <p> - * Function execution provides a means to route application behaviour to {@linkplain Region data} or - * more generically to peers in a {@link DistributedSystem} or servers in a {@link Pool}. - * </p> - * - * @since GemFire 6.0 - */ -public class FunctionService { - private static final FunctionServiceManager functionSvcMgr = new FunctionServiceManager(); - - FunctionService() {} +public interface FunctionExecutionService { /** * Returns an {@link Execution} object that can be used to execute a data dependent function on @@ -62,9 +50,7 @@ public class FunctionService { * @throws FunctionException if the region passed in is null * @since GemFire 6.0 */ - public static Execution onRegion(Region region) { - return functionSvcMgr.onRegion(region); - } + Execution onRegion(Region region); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -77,9 +63,7 @@ public class FunctionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - public static Execution onServer(Pool pool) { - return functionSvcMgr.onServer(pool); - } + Execution onServer(Pool pool); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -90,9 +74,7 @@ public class FunctionService { * @throws FunctionException if Pool instance passed in is null * @since GemFire 6.0 */ - public static Execution onServers(Pool pool) { - return functionSvcMgr.onServers(pool); - } + Execution onServers(Pool pool); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -107,9 +89,7 @@ public class FunctionService { * pool * @since GemFire 6.5 */ - public static Execution onServer(RegionService regionService) { - return functionSvcMgr.onServer(regionService); - } + Execution onServer(RegionService regionService); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -122,9 +102,7 @@ public class FunctionService { * pool * @since GemFire 6.5 */ - public static Execution onServers(RegionService regionService) { - return functionSvcMgr.onServers(regionService); - } + Execution onServers(RegionService regionService); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -135,11 +113,8 @@ public class FunctionService { * @param distributedMember defines a member in the distributed system * @throws FunctionException if distributedMember is null * @since GemFire 7.0 - * */ - public static Execution onMember(DistributedMember distributedMember) { - return functionSvcMgr.onMember(getDistributedSystem(), distributedMember); - } + Execution onMember(DistributedMember distributedMember); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -156,9 +131,7 @@ public class FunctionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - public static Execution onMembers(String... groups) { - return functionSvcMgr.onMembers(getDistributedSystem(), groups); - } + Execution onMembers(String... groups); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -169,9 +142,7 @@ public class FunctionService { * @throws FunctionException if distributedMembers is null * @since GemFire 7.0 */ - public static Execution onMembers(Set<DistributedMember> distributedMembers) { - return functionSvcMgr.onMembers(getDistributedSystem(), distributedMembers); - } + Execution onMembers(Set<DistributedMember> distributedMembers); /** * Returns an {@link Execution} object that can be used to execute a data independent function on @@ -184,9 +155,7 @@ public class FunctionService { * @throws FunctionException if no members are found belonging to the provided groups * @since GemFire 7.0 */ - public static Execution onMember(String... groups) { - return functionSvcMgr.onMember(getDistributedSystem(), groups); - } + Execution onMember(String... groups); /** * Returns the {@link Function} defined by the functionId, returns null if no function is found @@ -195,9 +164,7 @@ public class FunctionService { * @throws FunctionException if functionID passed is null * @since GemFire 6.0 */ - public static Function getFunction(String functionId) { - return functionSvcMgr.getFunction(functionId); - } + Function getFunction(String functionId); /** * Registers the given {@link Function} with the {@link FunctionService} using @@ -211,9 +178,7 @@ public class FunctionService { * @throws FunctionException if function instance passed is null or Function.getId() returns null * @since GemFire 6.0 */ - public static void registerFunction(Function function) { - functionSvcMgr.registerFunction(function); - } + void registerFunction(Function function); /** * Unregisters the given {@link Function} with the {@link FunctionService} using @@ -223,9 +188,7 @@ public class FunctionService { * @throws FunctionException if function instance passed is null or Function.getId() returns null * @since GemFire 6.0 */ - public static void unregisterFunction(String functionId) { - functionSvcMgr.unregisterFunction(functionId); - } + void unregisterFunction(String functionId); /** * Returns true if the function is registered to FunctionService @@ -233,10 +196,7 @@ public class FunctionService { * @throws FunctionException if function instance passed is null or Function.getId() returns null * @since GemFire 6.0 */ - public static boolean isRegistered(String functionId) { - return functionSvcMgr.isRegistered(functionId); - } - + boolean isRegistered(String functionId); /** * Returns all locally registered functions @@ -244,16 +204,5 @@ public class FunctionService { * @return A view of registered functions as a Map of {@link Function#getId()} to {@link Function} * @since GemFire 6.0 */ - public static Map<String, Function> getRegisteredFunctions() { - return functionSvcMgr.getRegisteredFunctions(); - } - - private static DistributedSystem getDistributedSystem() { - DistributedSystem system = InternalDistributedSystem.getConnectedInstance(); - if (system == null) { - throw new DistributedSystemDisconnectedException( - "This connection to a distributed system has been disconnected."); - } - return system; - } + Map<String, Function> getRegisteredFunctions(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java new file mode 100644 index 0000000..6fb529a --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.execute; + +import java.util.Properties; +import java.util.Set; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionService; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.DistributedSystem; + +public interface InternalFunctionExecutionService extends FunctionExecutionService { + + void unregisterAllFunctions(); + + /** + * Returns an {@link Execution} object that can be used to execute a function on the set of {@link + * Region}s. The function would be executed on the set of members that host data for any of the + * regions in the set of regions. <br> + * If the Set provided contains region with : <br> + * DataPolicy.NORMAL, execute the function on any random member which has DataPolicy.REPLICATE . + * <br> + * DataPolicy.EMPTY, execute the function on any random member which has DataPolicy.REPLICATE . + * <br> + * DataPolicy.REPLICATE, execute the function locally or any random member which has + * DataPolicy.REPLICATE .<br> + * DataPolicy.PARTITION, it executes on members where the primary copy of data is hosted. <br> + * This API is not supported for cache clients in client server mode + * + * <p> + * For an Execution object obtained from this method, calling the withFilter method throws + * {@link UnsupportedOperationException} + * + * @see MultiRegionFunctionContext + */ + Execution onRegions(Set<Region> regions); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * all the servers that the given cache is connected to. If one of the servers goes down while + * dispatching or executing the function on the server, an Exception will be thrown. + * + * @param regionService obtained from {@link ClientCacheFactory#create} or + * {@link ClientCache#createAuthenticatedView(Properties)} . + * @throws FunctionException if cache is null, is not on a client, or it does not have a default + * pool + * @since GemFire 6.5 + */ + Execution onServers(RegionService regionService, String... groups); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * a server that the given cache is connected to. + * <p> + * If the server goes down while dispatching or executing the function, an Exception will be + * thrown. + * + * @param regionService obtained from {@link ClientCacheFactory#create} or + * {@link ClientCache#createAuthenticatedView(Properties)} . + * @throws FunctionException if cache is null, is not on a client, or it does not have a default + * pool + * @since GemFire 6.5 + */ + Execution onServer(RegionService regionService, String... groups); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * all the servers in the provided {@link Pool}. If one of the servers goes down while dispatching + * or executing the function on the server, an Exception will be thrown. + * + * @param pool the set of servers to execute the function + * @throws FunctionException if Pool instance passed in is null + * @since GemFire 6.0 + */ + Execution onServers(Pool pool, String... groups); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * a server in the provided {@link Pool}. + * <p> + * If the server goes down while dispatching or executing the function, an Exception will be + * thrown. + * + * @param pool from which to chose a server for execution + * @throws FunctionException if Pool instance passed in is null + * @since GemFire 6.0 + */ + Execution onServer(Pool pool, String... groups); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * a {@link DistributedMember} of the {@link DistributedSystem}. If the member is not found in the + * system, the function execution will throw an Exception. If the member goes down while + * dispatching or executing the function on the member, an Exception will be thrown. + * + * @param system defines the distributed system + * @param distributedMember defines a member in the distributed system + * @throws FunctionException if either input parameter is null + * @since GemFire 6.0 + * + */ + Execution onMember(DistributedSystem system, DistributedMember distributedMember); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * all members of the {@link DistributedSystem}. If one of the members goes down while dispatching + * or executing the function on the member, an Exception will be thrown. + * + * @param system defines the distributed system + * + * @throws FunctionException if DistributedSystem instance passed is null + * @since GemFire 6.0 + */ + Execution onMembers(DistributedSystem system, String... groups); + + /** + * Uses {@code RANDOM_onMember} for tests. + * + * <p> + * TODO: maybe merge with {@link #onMembers(DistributedSystem, String...)} + */ + Execution onMember(DistributedSystem system, String... groups); + + /** + * Returns an {@link Execution} object that can be used to execute a data independent function on + * the set of {@link DistributedMember}s of the {@link DistributedSystem}. If one of the members + * goes down while dispatching or executing the function, an Exception will be thrown. + * + * @param system defines the distributed system + * @param distributedMembers set of distributed members on which {@link Function} to be executed + * @throws FunctionException if DistributedSystem instance passed is null + * @since GemFire 6.0 + */ + Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers); +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java similarity index 58% rename from geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java rename to geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java index 4ef63db..244c73c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.cache.execute.internal; +package org.apache.geode.internal.cache.execute; import java.util.ArrayList; import java.util.Arrays; @@ -21,14 +21,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionService; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.Pool; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.InternalClientCache; @@ -37,36 +34,18 @@ import org.apache.geode.cache.client.internal.ProxyRegion; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; -import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.InternalEntity; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalRegion; -import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor; -import org.apache.geode.internal.cache.execute.MemberFunctionExecutor; -import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionExecutor; -import org.apache.geode.internal.cache.execute.ServerFunctionExecutor; -import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor; - -/** - * Provides the entry point into execution of user defined {@linkplain Function}s. - * <p> - * Function execution provides a means to route application behaviour to {@linkplain Region data} or - * more generically to peers in a {@link DistributedSystem} or servers in a {@link Pool}. - * </p> - * - * While {@link FunctionService} is a customer facing interface to this functionality, all of the - * work is done here. In addition, internal only functionality is exposed in this class. - * - * @since GemFire 7.0 - */ -public class FunctionServiceManager { - private static final ConcurrentHashMap<String, Function> idToFunctionMap = - new ConcurrentHashMap<>(); +public class InternalFunctionExecutionServiceImpl + implements FunctionExecutionService, InternalFunctionExecutionService { /** * use when the optimization to execute onMember locally is not desired. @@ -74,29 +53,58 @@ public class FunctionServiceManager { public static final boolean RANDOM_onMember = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "randomizeOnMember"); - public FunctionServiceManager() { - // do nothing + private static final String[] EMPTY_GROUPS = new String[0]; + + private static final ConcurrentHashMap<String, Function> idToFunctionMap = + new ConcurrentHashMap<>(); + + public InternalFunctionExecutionServiceImpl() { + // nothing } - /** - * Returns an {@link Execution} object that can be used to execute a data dependent function on - * the specified Region.<br> - * When invoked from a GemFire client, the method returns an Execution instance that sends a - * message to one of the connected servers as specified by the {@link Pool} for the region. <br> - * Depending on the filters setup on the {@link Execution}, the function is executed on all - * GemFire members that define the data region, or a subset of members. - * {@link Execution#withFilter(Set)}). - * - * For DistributedRegions with DataPolicy.NORMAL, it throws UnsupportedOperationException. For - * DistributedRegions with DataPolicy.EMPTY, execute the function on any random member which has - * DataPolicy.REPLICATE <br> - * . For DistributedRegions with DataPolicy.REPLICATE, execute the function locally. For Regions - * with DataPolicy.PARTITION, it executes on members where the data resides as specified by the - * filter. - * - * @throws FunctionException if the region passed in is null - * @since GemFire 6.0 - */ + // FunctionExecutionService API ---------------------------------------------------------------- + + @Override + public Execution onServer(Pool pool) { + return onServer(pool, EMPTY_GROUPS); + } + + @Override + public Execution onServers(Pool pool) { + return onServers(pool, EMPTY_GROUPS); + } + + @Override + public Execution onServer(RegionService regionService) { + return onServer(regionService, EMPTY_GROUPS); + } + + @Override + public Execution onServers(RegionService regionService) { + return onServers(regionService, EMPTY_GROUPS); + } + + @Override + public Execution onMember(DistributedMember distributedMember) { + return onMember(getDistributedSystem(), distributedMember); + } + + @Override + public Execution onMembers(String... groups) { + return onMembers(getDistributedSystem(), groups); + } + + @Override + public Execution onMembers(Set<DistributedMember> distributedMembers) { + return onMembers(getDistributedSystem(), distributedMembers); + } + + @Override + public Execution onMember(String... groups) { + return onMember(getDistributedSystem(), groups); + } + + @Override public Execution onRegion(Region region) { if (region == null) { throw new FunctionException( @@ -127,17 +135,66 @@ public class FunctionServiceManager { return new DistributedRegionFunctionExecutor(region); } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server in the provided {@link Pool}. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param pool from which to chose a server for execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ + @Override + public Function getFunction(String functionId) { + if (functionId == null) { + throw new FunctionException(String.format("%s passed is null", + "functionId instance ")); + } + return idToFunctionMap.get(functionId); + } + + @Override + public void registerFunction(Function function) { + if (function == null) { + throw new FunctionException(String.format("%s passed is null", + "function instance ")); + } + if (function.getId() == null) { + throw new FunctionException( + "function.getId() returned null, implement the Function.getId() method properly"); + } + if (function.isHA() && !function.hasResult()) { + throw new FunctionException( + "For Functions with isHA true, hasResult must also be true."); + } + + idToFunctionMap.put(function.getId(), function); + } + + @Override + public void unregisterFunction(String functionId) { + if (functionId == null) { + throw new FunctionException(String.format("%s passed is null", + "functionId instance ")); + } + idToFunctionMap.remove(functionId); + } + + @Override + public boolean isRegistered(String functionId) { + if (functionId == null) { + throw new FunctionException(String.format("%s passed is null", + "functionId instance ")); + } + return idToFunctionMap.containsKey(functionId); + } + + @Override + public Map<String, Function> getRegisteredFunctions() { + // We have to remove the internal functions before returning the map to the users + final Map<String, Function> tempIdToFunctionMap = new HashMap<>(); + for (Map.Entry<String, Function> entry : idToFunctionMap.entrySet()) { + if (!(entry.getValue() instanceof InternalEntity)) { + tempIdToFunctionMap.put(entry.getKey(), entry.getValue()); + } + } + return tempIdToFunctionMap; + } + + // InternalFunctionExecutionService OnServerGroups API ----------------------------------------- + + @Override public Execution onServer(Pool pool, String... groups) { if (pool == null) { throw new FunctionException( @@ -151,15 +208,7 @@ public class FunctionServiceManager { return new ServerFunctionExecutor(pool, false, groups); } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers in the provided {@link Pool}. If one of the servers goes down while dispatching - * or executing the function on the server, an Exception will be thrown. - * - * @param pool the set of servers to execute the function - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ + @Override public Execution onServers(Pool pool, String... groups) { if (pool == null) { throw new FunctionException( @@ -173,19 +222,7 @@ public class FunctionServiceManager { return new ServerFunctionExecutor(pool, true, groups); } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server that the given cache is connected to. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)} . - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ + @Override public Execution onServer(RegionService regionService, String... groups) { if (regionService == null) { throw new FunctionException(String.format("%s passed is null", @@ -207,17 +244,7 @@ public class FunctionServiceManager { } } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers that the given cache is connected to. If one of the servers goes down while - * dispatching or executing the function on the server, an Exception will be thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)} . - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ + @Override public Execution onServers(RegionService regionService, String... groups) { if (regionService == null) { throw new FunctionException(String.format("%s passed is null", @@ -239,18 +266,9 @@ public class FunctionServiceManager { } } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a {@link DistributedMember} of the {@link DistributedSystem}. If the member is not found in the - * system, the function execution will throw an Exception. If the member goes down while - * dispatching or executing the function on the member, an Exception will be thrown. - * - * @param system defines the distributed system - * @param distributedMember defines a member in the distributed system - * @throws FunctionException if either input parameter is null - * @since GemFire 6.0 - * - */ + // InternalFunctionExecutionService InDistributedSystem API ------------------------------------ + + @Override public Execution onMember(DistributedSystem system, DistributedMember distributedMember) { if (system == null) { throw new FunctionException(String.format("%s passed is null", @@ -263,16 +281,7 @@ public class FunctionServiceManager { return new MemberFunctionExecutor(system, distributedMember); } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all members of the {@link DistributedSystem}. If one of the members goes down while dispatching - * or executing the function on the member, an Exception will be thrown. - * - * @param system defines the distributed system - * - * @throws FunctionException if DistributedSystem instance passed is null - * @since GemFire 6.0 - */ + @Override public Execution onMembers(DistributedSystem system, String... groups) { if (system == null) { throw new FunctionException(String.format("%s passed is null", @@ -281,7 +290,7 @@ public class FunctionServiceManager { if (groups.length == 0) { return new MemberFunctionExecutor(system); } - Set<DistributedMember> members = new HashSet<DistributedMember>(); + Set<DistributedMember> members = new HashSet<>(); for (String group : groups) { members.addAll(system.getGroupMembers(group)); } @@ -292,16 +301,32 @@ public class FunctionServiceManager { return new MemberFunctionExecutor(system, members); } - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * the set of {@link DistributedMember}s of the {@link DistributedSystem}. If one of the members - * goes down while dispatching or executing the function, an Exception will be thrown. - * - * @param system defines the distributed system - * @param distributedMembers set of distributed members on which {@link Function} to be executed - * @throws FunctionException if DistributedSystem instance passed is null - * @since GemFire 6.0 - */ + @Override + public Execution onMember(DistributedSystem system, String... groups) { + if (system == null) { + throw new FunctionException(String.format("%s passed is null", + "DistributedSystem instance ")); + } + Set<DistributedMember> members = new HashSet<>(); + for (String group : groups) { + List<DistributedMember> grpMembers = new ArrayList<>(system.getGroupMembers(group)); + if (!grpMembers.isEmpty()) { + if (!RANDOM_onMember && grpMembers.contains(system.getDistributedMember())) { + members.add(system.getDistributedMember()); + } else { + Collections.shuffle(grpMembers); + members.add(grpMembers.get(0)); + } + } + } + if (members.isEmpty()) { + throw new FunctionException(String.format("No members found in group(s) %s", + Arrays.toString(groups))); + } + return new MemberFunctionExecutor(system, members); + } + + @Override public Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers) { if (system == null) { throw new FunctionException(String.format("%s passed is null", @@ -314,100 +339,37 @@ public class FunctionServiceManager { return new MemberFunctionExecutor(system, distributedMembers); } - /** - * Returns the {@link Function} defined by the functionId, returns null if no function is found - * for the specified functionId - * - * @throws FunctionException if functionID passed is null - * @since GemFire 6.0 - */ - public Function getFunction(String functionId) { - if (functionId == null) { - throw new FunctionException(String.format("%s passed is null", - "functionId instance ")); - } - return idToFunctionMap.get(functionId); - } + // InternalFunctionExecutionService OnRegions API ---------------------------------------------- - /** - * Registers the given {@link Function} with the {@link FunctionService} using - * {@link Function#getId()}. - * <p> - * Registering a function allows execution of the function using - * {@link Execution#execute(String)}. Every member that could execute a function using its - * {@link Function#getId()} should register the function. - * </p> - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public void registerFunction(Function function) { - if (function == null) { - throw new FunctionException(String.format("%s passed is null", - "function instance ")); - } - if (function.getId() == null) { - throw new FunctionException( - "function.getId() returned null, implement the Function.getId() method properly"); - } - if (function.isHA() && !function.hasResult()) { - throw new FunctionException( - "For Functions with isHA true, hasResult must also be true."); + @Override + public Execution onRegions(Set<Region> regions) { + if (regions == null) { + throw new IllegalArgumentException( + String.format("The input %s for the execute function request is null", + "regions set")); } - - idToFunctionMap.put(function.getId(), function); - } - - /** - * Unregisters the given {@link Function} with the {@link FunctionService} using - * {@link Function#getId()}. - * <p> - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public void unregisterFunction(String functionId) { - if (functionId == null) { - throw new FunctionException(String.format("%s passed is null", - "functionId instance ")); + if (regions.contains(null)) { + throw new IllegalArgumentException( + "One or more region references added to the regions set is(are) null"); } - idToFunctionMap.remove(functionId); - } - - /** - * Returns true if the function is registered to FunctionService - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public boolean isRegistered(String functionId) { - if (functionId == null) { - throw new FunctionException(String.format("%s passed is null", - "functionId instance ")); + if (regions.isEmpty()) { + throw new IllegalArgumentException( + "Regions set is empty for onRegions function execution"); } - return idToFunctionMap.containsKey(functionId); - } - - /** - * Returns all locally registered functions - * - * @return A view of registered functions as a Map of {@link Function#getId()} to {@link Function} - * @since GemFire 6.0 - */ - public Map<String, Function> getRegisteredFunctions() { - // We have to remove the internal functions before returning the map to the users - final Map<String, Function> tempIdToFunctionMap = new HashMap<String, Function>(); - for (Map.Entry<String, Function> entry : idToFunctionMap.entrySet()) { - if (!(entry.getValue() instanceof InternalEntity)) { - tempIdToFunctionMap.put(entry.getKey(), entry.getValue()); + for (Region region : regions) { + if (isClientRegion(region)) { + throw new UnsupportedOperationException( + "FunctionService#onRegions() is not supported for cache clients in client server mode"); } } - return tempIdToFunctionMap; + return new MultiRegionFunctionExecutor(regions); } + // InternalFunctionExecutionService unregisterAllFunctions API --------------------------------- + + @Override public void unregisterAllFunctions() { // Unregistering all the functions registered with the FunctionService. - Map<String, Function> functions = new HashMap<String, Function>(idToFunctionMap); for (String functionId : idToFunctionMap.keySet()) { unregisterFunction(functionId); } @@ -421,27 +383,12 @@ public class FunctionServiceManager { return ((InternalRegion) region).hasServerProxy(); } - public Execution onMember(DistributedSystem system, String... groups) { + private static DistributedSystem getDistributedSystem() { + DistributedSystem system = InternalDistributedSystem.getConnectedInstance(); if (system == null) { - throw new FunctionException(String.format("%s passed is null", - "DistributedSystem instance ")); + throw new DistributedSystemDisconnectedException( + "This connection to a distributed system has been disconnected."); } - Set<DistributedMember> members = new HashSet<>(); - for (String group : groups) { - List<DistributedMember> grpMembers = new ArrayList<>(system.getGroupMembers(group)); - if (!grpMembers.isEmpty()) { - if (!RANDOM_onMember && grpMembers.contains(system.getDistributedMember())) { - members.add(system.getDistributedMember()); - } else { - Collections.shuffle(grpMembers); - members.add(grpMembers.get(0)); - } - } - } - if (members.isEmpty()) { - throw new FunctionException(String.format("No members found in group(s) %s", - Arrays.toString(groups))); - } - return new MemberFunctionExecutor(system, members); + return system; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionService.java index a781f63..f433b7c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionService.java @@ -22,18 +22,25 @@ import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.Pool; import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; -import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.cache.execute.FunctionService; /** - * * Provides internal methods for tests * - * * @since GemFire 6.1 - * */ -public class InternalFunctionService { +public class InternalFunctionService extends FunctionService { + + private static final InternalFunctionService INSTANCE = + new InternalFunctionService(new InternalFunctionExecutionServiceImpl()); + + private final InternalFunctionExecutionService internalFunctionExecutionService; + + private InternalFunctionService( + InternalFunctionExecutionService internalFunctionExecutionService) { + super(internalFunctionExecutionService); + this.internalFunctionExecutionService = internalFunctionExecutionService; + } /** * Returns an {@link Execution} object that can be used to execute a function on the set of @@ -55,43 +62,11 @@ public class InternalFunctionService { * {@link UnsupportedOperationException} * * @see MultiRegionFunctionContext - * - * */ public static Execution onRegions(Set<Region> regions) { - if (regions == null) { - throw new IllegalArgumentException( - String.format("The input %s for the execute function request is null", - "regions set")); - } - if (regions.contains(null)) { - throw new IllegalArgumentException( - "One or more region references added to the regions set is(are) null"); - } - if (regions.isEmpty()) { - throw new IllegalArgumentException( - "Regions set is empty for onRegions function execution"); - } - for (Region region : regions) { - if (isClientRegion(region)) { - throw new UnsupportedOperationException( - "FunctionService#onRegions() is not supported for cache clients in client server mode"); - } - } - return new MultiRegionFunctionExecutor(regions); - } - - /** - * @return true if the method is called on a region has a {@link Pool}. - * @since GemFire 6.0 - */ - private static boolean isClientRegion(Region region) { - LocalRegion localRegion = (LocalRegion) region; - return localRegion.hasServerProxy(); + return getInternalFunctionExecutionService().onRegions(regions); } - private static final FunctionServiceManager funcServiceManager = new FunctionServiceManager(); - /** * Returns an {@link Execution} object that can be used to execute a data independent function on * all the servers that the given cache is connected to. If the optional groups parameter is @@ -108,7 +83,7 @@ public class InternalFunctionService { * @since GemFire 7.0 */ public static Execution onServers(RegionService regionService, String... groups) { - return funcServiceManager.onServers(regionService, groups); + return getInternalFunctionExecutionService().onServers(regionService, groups); } /** @@ -127,7 +102,7 @@ public class InternalFunctionService { * @since GemFire 7.0 */ public static Execution onServer(RegionService regionService, String... groups) { - return funcServiceManager.onServer(regionService, groups); + return getInternalFunctionExecutionService().onServer(regionService, groups); } /** @@ -145,7 +120,7 @@ public class InternalFunctionService { * @since GemFire 7.0 */ public static Execution onServers(Pool pool, String... groups) { - return funcServiceManager.onServers(pool, groups); + return getInternalFunctionExecutionService().onServers(pool, groups); } /** @@ -163,6 +138,17 @@ public class InternalFunctionService { * @since GemFire 7.0 */ public static Execution onServer(Pool pool, String... groups) { - return funcServiceManager.onServer(pool, groups); + return getInternalFunctionExecutionService().onServer(pool, groups); + } + + /** + * Unregisters all functions. + */ + public static void unregisterAllFunctions() { + getInternalFunctionExecutionService().unregisterAllFunctions(); + } + + private static InternalFunctionExecutionService getInternalFunctionExecutionService() { + return INSTANCE.internalFunctionExecutionService; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html index 24b9782..18b3478 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html @@ -1691,7 +1691,7 @@ Allow query on region with heterogeneous objects <p> <em>Boolean</em> (defaults to false) <p> -See <code>org.apache.geode.cache.execute.internal.FunctionServiceManager.#RANDOM_onMember</code> +See <code>org.apache.geode.internal.cache.execute.InternalFunctionExecutionServiceImpl.#RANDOM_onMember</code> <p> When set, onMember execution will be executed on a random member. </dd> diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java index 525782a..eb2c80d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java @@ -24,11 +24,11 @@ import java.util.Set; import org.apache.geode.cache.client.internal.ExecuteFunctionOp; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.execute.AbstractExecution; +import org.apache.geode.internal.cache.execute.InternalFunctionExecutionServiceImpl; import org.apache.geode.internal.cache.execute.MemberFunctionExecutor; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.tier.Command; @@ -106,7 +106,7 @@ public class ExecuteFunction70 extends ExecuteFunction66 { ArrayList<DistributedMember> memberList = new ArrayList<DistributedMember>(ds.getGroupMembers(group)); if (!memberList.isEmpty()) { - if (!FunctionServiceManager.RANDOM_onMember + if (!InternalFunctionExecutionServiceImpl.RANDOM_onMember && memberList.contains(ds.getDistributedMember())) { members.add(ds.getDistributedMember()); } else {