This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 18530da GEODE-6798: Refactoring of client function execution logic (#3710) 18530da is described below commit 18530da5ae33fc2c922bb4a2aeb5e00a57eaa31e Author: albertogpz <alberto.go...@est.tech> AuthorDate: Sat Jun 22 03:56:19 2019 +0200 GEODE-6798: Refactoring of client function execution logic (#3710) * Some unit tests for the ExecuteRegionFunctionOpImplTest constructors * Do not allocate failedNodes Collection unless necessary --- .../client/internal/ExecuteRegionFunctionOp.java | 335 +++++++-------------- .../internal/ExecuteRegionFunctionOpImplTest.java | 128 ++++++++ 2 files changed, 233 insertions(+), 230 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java index 6c95b02..641fd26 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java @@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal; import static org.apache.geode.internal.cache.execute.AbstractExecution.DEFAULT_CLIENT_FUNCTION_TIMEOUT; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -63,49 +64,32 @@ public class ExecuteRegionFunctionOp { // no instances allowed } - /** - * Does a execute Function on a server using connections from the given pool to communicate with - * the server. - * - * @param pool the pool to use to communicate with the server. - * @param region the name of the region to do the put on - * @param function to be executed - * @param serverRegionExecutor which will return argument and filter - * @param resultCollector is used to collect the results from the Server - * @param timeoutMs timeout in milliseconds - */ - public static void execute(ExecutablePool pool, String region, Function function, + private static void execute(ExecutablePool pool, String region, String function, ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, - byte hasResult, int mRetryAttempts, final int timeoutMs) { - - ExecuteRegionFunctionOpImpl op = - new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor, - resultCollector, hasResult, new HashSet<>(), timeoutMs); - boolean reexecute = false; - boolean reexecuteForServ = false; - Set<String> failedNodes = new HashSet<>(); - AbstractOp reexecOp; - - int maxRetryAttempts = mRetryAttempts; - if (!function.isHA()) { + byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite, + ExecuteRegionFunctionOpImpl op, boolean isReexecute, Set<String> failedNodes) { + + if (!isHA) { maxRetryAttempts = 0; } do { try { - if (reexecuteForServ) { - reexecOp = new ExecuteRegionFunctionOpImpl(op, + if (isReexecute) { + failedNodes = ensureMutability(failedNodes); + op = new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes); - pool.execute(reexecOp, 0); - } else { - pool.execute(op, 0); } - reexecute = false; - reexecuteForServ = false; + pool.execute(op, 0); + return; } catch (InternalFunctionInvocationTargetException e) { - reexecute = true; resultCollector.clearResults(); + if (!isHA) { + return; + } + isReexecute = true; Set<String> failedNodesIds = e.getFailedNodeSet(); + failedNodes = ensureMutability(failedNodes); failedNodes.clear(); if (failedNodesIds != null) { failedNodes.addAll(failedNodesIds); @@ -125,126 +109,68 @@ public class ExecuteRegionFunctionOp { throw se; } - reexecuteForServ = true; + isReexecute = true; resultCollector.clearResults(); + failedNodes = ensureMutability(failedNodes); failedNodes.clear(); } - } while (reexecuteForServ); + } while (true); + } - if (reexecute && function.isHA()) { - ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor, - resultCollector, hasResult, failedNodes, maxRetryAttempts, timeoutMs); + private static Set<String> ensureMutability(final Set<String> failedNodes) { + if (failedNodes == Collections.EMPTY_SET) { + return new HashSet<>(); } + return failedNodes; } - public static void execute(ExecutablePool pool, String region, String function, + /** + * Does a execute Function on a server using connections from the given pool to communicate with + * the server. + * + * @param pool the pool to use to communicate with the server. + * @param region the name of the region to do the put on + * @param function to be executed + * @param serverRegionExecutor which will return argument and filter + * @param resultCollector is used to collect the results from the Server + * @param hasResult is used to collect the results from the Server + * @param maxRetryAttempts Maximum number of retry attempts + * @param timeoutMs timeout in milliseconds + */ + static void execute(ExecutablePool pool, String region, Function function, ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, - byte hasResult, int mRetryAttempts, boolean isHA, boolean optimizeForWrite, - final int timeoutMs) { + byte hasResult, int maxRetryAttempts, final int timeoutMs) { - ExecuteRegionFunctionOpImpl op = - new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor, - resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs); - boolean reexecute = false; - boolean reexecuteForServ = false; - Set<String> failedNodes = new HashSet<>(); - AbstractOp reexecOp; + ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function, + serverRegionExecutor, resultCollector, timeoutMs); - int maxRetryAttempts = mRetryAttempts; - if (!isHA) { - maxRetryAttempts = 0; - } - - do { - try { - if (reexecuteForServ) { - reexecOp = new ExecuteRegionFunctionOpImpl(op, - (byte) 1/* isReExecute */, failedNodes); - pool.execute(reexecOp, 0); - } else { - pool.execute(op, 0); - } - reexecute = false; - reexecuteForServ = false; - } catch (InternalFunctionInvocationTargetException e) { - reexecute = true; - resultCollector.clearResults(); - Set<String> failedNodesIds = e.getFailedNodeSet(); - failedNodes.clear(); - if (failedNodesIds != null) { - failedNodes.addAll(failedNodesIds); - } - } catch (ServerOperationException | NoAvailableServersException failedException) { - throw failedException; - } catch (ServerConnectivityException se) { - - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1; - } + execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult, + maxRetryAttempts, function.isHA(), function.optimizeForWrite(), op, false, + Collections.emptySet()); + } - if ((maxRetryAttempts--) < 1) { - throw se; - } + static void execute(ExecutablePool pool, String region, String function, + ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, + byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite, + final int timeoutMs) { - reexecuteForServ = true; - resultCollector.clearResults(); - failedNodes.clear(); - } - } while (reexecuteForServ); + ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function, + serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite, + true, timeoutMs); - if (reexecute && isHA) { - ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor, - resultCollector, hasResult, failedNodes, maxRetryAttempts, isHA, optimizeForWrite, - timeoutMs); - } + execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult, + maxRetryAttempts, isHA, optimizeForWrite, op, false, Collections.emptySet()); } static void reexecute(ExecutablePool pool, String region, Function function, ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector, byte hasResult, Set<String> failedNodes, int retryAttempts, final int timeoutMs) { - ExecuteRegionFunctionOpImpl op = - new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor, - resultCollector, hasResult, new HashSet<>(), timeoutMs); - boolean reexecute = true; - int maxRetryAttempts = retryAttempts; - - do { - AbstractOp reExecuteOp = - new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes); - - try { - pool.execute(reExecuteOp, 0); - reexecute = false; - } catch (InternalFunctionInvocationTargetException e) { - resultCollector.clearResults(); - Set<String> failedNodesIds = e.getFailedNodeSet(); - failedNodes.clear(); - if (failedNodesIds != null) { - failedNodes.addAll(failedNodesIds); - } - } catch (ServerOperationException | NoAvailableServersException failedException) { - throw failedException; - } catch (ServerConnectivityException se) { - - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1; - } - - if ((maxRetryAttempts--) < 1) { - throw se; - } + ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function, + serverRegionExecutor, resultCollector, timeoutMs); - resultCollector.clearResults(); - failedNodes.clear(); - } - } while (reexecute); + execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult, + retryAttempts, function.isHA(), function.optimizeForWrite(), op, true, failedNodes); } static void reexecute(ExecutablePool pool, String region, String function, @@ -252,45 +178,12 @@ public class ExecuteRegionFunctionOp { byte hasResult, Set<String> failedNodes, int retryAttempts, boolean isHA, boolean optimizeForWrite, final int timeoutMs) { - ExecuteRegionFunctionOpImpl op = - new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor, - resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs); - boolean reexecute = true; - int maxRetryAttempts = retryAttempts; - - do { - ExecuteRegionFunctionOpImpl reExecuteOp = - new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes); - - try { - pool.execute(reExecuteOp, 0); - reexecute = false; - } catch (InternalFunctionInvocationTargetException e) { - resultCollector.clearResults(); - Set<String> failedNodesIds = e.getFailedNodeSet(); - failedNodes.clear(); - if (failedNodesIds != null) { - failedNodes.addAll(failedNodesIds); - } - } catch (ServerOperationException | NoAvailableServersException failedException) { - throw failedException; - } catch (ServerConnectivityException se) { + ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function, + serverRegionExecutor, resultCollector, hasResult, isHA, + optimizeForWrite, true, timeoutMs); - if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) { - // If the retryAttempt is set to default(-1). Try it on all servers once. - // Calculating number of servers when function is re-executed as it involves - // messaging locator. - maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1; - } - - if ((maxRetryAttempts--) < 1) { - throw se; - } - - resultCollector.clearResults(); - failedNodes.clear(); - } - } while (reexecute); + execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult, + retryAttempts, isHA, optimizeForWrite, op, true, failedNodes); } static class ExecuteRegionFunctionOpImpl extends AbstractOpWithTimeout { @@ -319,29 +212,28 @@ public class ExecuteRegionFunctionOp { private FunctionException functionException; + private static final int PART_COUNT = 8; - ExecuteRegionFunctionOpImpl(String region, Function function, - ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult, - Set<String> removedNodes, final int timeoutMs) { - super(MessageType.EXECUTE_REGION_FUNCTION, - 8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs); + private static int getMessagePartCount(int filterSize, int removedNodesSize) { + return PART_COUNT + filterSize + removedNodesSize; + } + + private void fillMessage(String region, Function function, String functionId, + ServerRegionFunctionExecutor serverRegionExecutor, + Set<String> removedNodes, byte functionState, byte flags) { Set routingObjects = serverRegionExecutor.getFilter(); Object args = serverRegionExecutor.getArguments(); - byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(), - function.optimizeForWrite()); MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument(); - addBytes(functionState); getMessage().addStringPart(region, true); - if (serverRegionExecutor.isFnSerializationReqd()) { + if (function != null && serverRegionExecutor.isFnSerializationReqd()) { getMessage().addStringOrObjPart(function); } else { - getMessage().addStringOrObjPart(function.getId()); + getMessage().addStringOrObjPart(functionId); } + getMessage().addObjPart(args); getMessage().addObjPart(memberMappedArg); - executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); - byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute); getMessage().addBytesPart(new byte[] {flags}); getMessage().addIntPart(routingObjects.size()); @@ -352,14 +244,27 @@ public class ExecuteRegionFunctionOp { for (Object nodes : removedNodes) { getMessage().addStringOrObjPart(nodes); } + } + ExecuteRegionFunctionOpImpl(String region, Function function, + ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, + final int timeoutMs) { + super(MessageType.EXECUTE_REGION_FUNCTION, + getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs); + executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); + byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute); + byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(), + function.optimizeForWrite()); + failedNodes = Collections.emptySet(); + fillMessage(region, + function, function.getId(), + serverRegionExecutor, failedNodes, functionState, flags); resultCollector = rc; regionName = region; this.function = function; functionId = function.getId(); executor = serverRegionExecutor; - this.hasResult = functionState; - failedNodes = removedNodes; + hasResult = functionState; isHA = function.isHA(); } @@ -378,58 +283,45 @@ public class ExecuteRegionFunctionOp { isHA = true; } - ExecuteRegionFunctionOpImpl(String region, String function, + ExecuteRegionFunctionOpImpl(String region, String functionId, ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult, - Set<String> removedNodes, boolean isHA, boolean optimizeForWrite, + boolean isHA, boolean optimizeForWrite, boolean calculateFnState, final int timeoutMs) { super(MessageType.EXECUTE_REGION_FUNCTION, - 8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs); - Set routingObjects = serverRegionExecutor.getFilter(); + getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs); + byte functionState = hasResult; if (calculateFnState) { functionState = AbstractExecution.getFunctionState(isHA, hasResult == (byte) 1, optimizeForWrite); } - Object args = serverRegionExecutor.getArguments(); - MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument(); - addBytes(functionState); - getMessage().addStringPart(region, true); - getMessage().addStringOrObjPart(function); - getMessage().addObjPart(args); - getMessage().addObjPart(memberMappedArg); executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag(); byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute); - getMessage().addBytesPart(new byte[] {flags}); - getMessage().addIntPart(routingObjects.size()); - for (Object key : routingObjects) { - getMessage().addStringOrObjPart(key); - } - getMessage().addIntPart(removedNodes.size()); - for (Object nodes : removedNodes) { - getMessage().addStringOrObjPart(nodes); - } + failedNodes = Collections.emptySet(); + fillMessage(region, null, functionId, serverRegionExecutor, failedNodes, functionState, + flags); resultCollector = rc; regionName = region; - functionId = function; + this.functionId = functionId; executor = serverRegionExecutor; this.hasResult = functionState; - failedNodes = removedNodes; this.isHA = isHA; } ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionSingleHopOpImpl newop) { this(newop.getRegionName(), newop.getFunctionId(), newop.getExecutor(), - newop.getResultCollector(), newop.getHasResult(), new HashSet<>(), newop.isHA(), + newop.getResultCollector(), newop.getHasResult(), newop.isHA(), newop.optimizeForWrite(), false, newop.getTimeoutMs()); } ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionOpImpl op, byte isReExecute, Set<String> removedNodes) { super(MessageType.EXECUTE_REGION_FUNCTION, - 8 + op.executor.getFilter().size() + removedNodes.size(), op.getTimeoutMs()); + getMessagePartCount(op.executor.getFilter().size(), removedNodes.size()), + op.getTimeoutMs()); this.isReExecute = isReExecute; resultCollector = op.resultCollector; function = op.function; @@ -445,30 +337,10 @@ public class ExecuteRegionFunctionOp { resultCollector.clearResults(); } - Set routingObjects = executor.getFilter(); - Object args = executor.getArguments(); - MemberMappedArgument memberMappedArg = executor.getMemberMappedArgument(); - getMessage().clear(); - addBytes(hasResult); - getMessage().addStringPart(regionName, true); - if (executor.isFnSerializationReqd()) { - getMessage().addStringOrObjPart(function); - } else { - getMessage().addStringOrObjPart(functionId); - } - getMessage().addObjPart(args); - getMessage().addObjPart(memberMappedArg); byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute); - getMessage().addBytesPart(new byte[] {flags}); - getMessage().addIntPart(routingObjects.size()); - for (Object key : routingObjects) { - getMessage().addStringOrObjPart(key); - } - getMessage().addIntPart(removedNodes.size()); - for (Object nodes : removedNodes) { - getMessage().addStringOrObjPart(nodes); - } + fillMessage(regionName, function, functionId, + executor, removedNodes, hasResult, flags); } private void addBytes(byte functionStateOrHasResult) { @@ -524,6 +396,7 @@ public class ExecuteRegionFunctionOp { .getCause() instanceof InternalFunctionInvocationTargetException) { InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException) ex.getCause(); + failedNodes = ensureMutability(failedNodes); failedNodes.addAll(ifite.getFailedNodeSet()); addFunctionException((FunctionException) result); } else { @@ -555,6 +428,7 @@ public class ExecuteRegionFunctionOp { if (resultResponse instanceof ArrayList) { DistributedMember memberID = (DistributedMember) ((ArrayList) resultResponse).get(1); + failedNodes = ensureMutability(failedNodes); failedNodes.add(memberID.getId()); } functionException = new FunctionException(fite); @@ -611,6 +485,7 @@ public class ExecuteRegionFunctionOp { .getCause() instanceof InternalFunctionInvocationTargetException) { InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException) ex.getCause(); + failedNodes = ensureMutability(failedNodes); failedNodes.addAll(ifite.getFailedNodeSet()); } throw ex; diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java new file mode 100644 index 0000000..72b5748 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import junitparams.JUnitParamsRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor; +import org.apache.geode.test.junit.categories.ClientServerTest; + + +/** + * Test ExecutionRegionFunctionOpImpl class + */ +@Category({ClientServerTest.class}) +@RunWith(JUnitParamsRunner.class) +public class ExecuteRegionFunctionOpImplTest { + + @Test + public void testExecuteRegionFunctionOpImplWithFunction() { + ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters(); + + int numberOfParts = 10; + assertEquals(numberOfParts, op.getMessage().getNumberOfParts()); + for (int i = 0; i < numberOfParts; i++) { + assertNotNull(op.getMessage().getPart(i)); + } + assertNull(op.getMessage().getPart(numberOfParts)); + } + + @Test + public void testExecuteRegionFunctionOpImplWithFunctionIdCalculateFnState() { + ExecuteRegionFunctionOpImpl op = createOpWithFunctionIdOneFilter(); + + int numberOfParts = 9; + assertEquals(numberOfParts, op.getMessage().getNumberOfParts()); + for (int i = 0; i < numberOfParts; i++) { + assertNotNull(op.getMessage().getPart(i)); + } + assertNull(op.getMessage().getPart(numberOfParts)); + } + + @Test + public void testExecuteRegionFunctionOpImplWithOpAndIsReexecute() { + ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters(); + + HashSet<String> removedNodes = new HashSet(Arrays.asList("node1", "node2", "node3")); + + ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl newOp = + new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(op, (byte) 1, removedNodes); + + int numberOfParts = 13; + assertEquals(numberOfParts, newOp.getMessage().getNumberOfParts()); + for (int i = 0; i < numberOfParts; i++) { + assertNotNull(newOp.getMessage().getPart(i)); + } + assertNull(newOp.getMessage().getPart(numberOfParts)); + } + + private ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl createOpWithFunctionTwoFilters() { + String region = "testRegion"; + String functionId = "testFunctionId"; + Function function = mock(Function.class); + ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class); + Set filter = new HashSet(Arrays.asList("one", "two")); + when(serverRegionExecutor.getFilter()).thenReturn(filter); + byte functionState = (byte) 1; + byte flags = (byte) 2; + ResultCollector resultCollector = mock(ResultCollector.class); + int timeoutMs = 100; + + ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = + new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(region, function, + serverRegionExecutor, resultCollector, + timeoutMs); + return op; + } + + private ExecuteRegionFunctionOpImpl createOpWithFunctionIdOneFilter() { + String region = "testRegion"; + String functionId = "testFunctionId"; + Function function = null; + ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class); + Set filter = new HashSet(Arrays.asList("one")); + when(serverRegionExecutor.getFilter()).thenReturn(filter); + byte functionState = (byte) 1; + byte flags = (byte) 1; + byte hasResult = (byte) 1; + boolean isHA = false; + ResultCollector resultCollector = mock(ResultCollector.class); + int timeoutMs = 100; + boolean optimizeForWrite = true; + boolean isReexecute = false; + + ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, functionId, + serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite, + true, timeoutMs); + return op; + } + +}