This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch spike/slow-functions in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6396f3850099a6beb70ab6c4409b76fdb74b78d6 Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Wed Aug 14 14:27:41 2019 -0700 Cleanup --- .../internal/cache/control/HeapMemoryMonitor.java | 7 +- .../internal/cache/execute/AbstractExecution.java | 30 ++------ .../cache/execute/MemberFunctionExecutor.java | 87 ++++++++++------------ 3 files changed, 51 insertions(+), 73 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java index 29f0025..109dd8d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java @@ -675,7 +675,8 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor { }); } - protected Set<DistributedMember> getHeapCriticalMembersFrom(Set<DistributedMember> members) { + protected Set<DistributedMember> getHeapCriticalMembersFrom( + Set<? extends DistributedMember> members) { Set<DistributedMember> criticalMembers = getCriticalMembers(); criticalMembers.retainAll(members); return criticalMembers; @@ -694,7 +695,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor { checkForLowMemory(function, targetMembers); } - public void checkForLowMemory(Function function, Set<DistributedMember> dest) { + public void checkForLowMemory(Function function, Set<? extends DistributedMember> dest) { LowMemoryException exception = createLowMemoryIfNeeded(function, dest); if (exception != null) { throw exception; @@ -708,7 +709,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor { } public LowMemoryException createLowMemoryIfNeeded(Function function, - Set<DistributedMember> memberSet) { + Set<? extends DistributedMember> memberSet) { if (function.optimizeForWrite() && !MemoryThresholds.isLowMemoryExceptionDisabled()) { Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java index b5f55cc..bafd555 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java @@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.execute; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -37,6 +36,7 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.execute.ResultSender; import org.apache.geode.cache.query.QueryInvalidException; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; @@ -68,8 +68,6 @@ public abstract class AbstractExecution implements InternalExecution { protected Set filter = new HashSet(); - protected boolean hasRoutingObjects; - protected volatile boolean isReExecute = false; volatile boolean isClientServerMode = false; @@ -189,10 +187,6 @@ public abstract class AbstractExecution implements InternalExecution { this.isReExecute = isReExecute; } - public boolean isMemberMappedArgument() { - return isMemberMappedArgument; - } - public Object getArgumentsForMember(String memberId) { if (!isMemberMappedArgument) { return args; @@ -249,15 +243,6 @@ public abstract class AbstractExecution implements InternalExecution { return isFnSerializationReqd; } - public Collection<InternalDistributedMember> getExecutionNodes() { - return executionNodes; - } - - public void setRequireExecutionNodes(ExecutionNodesListener listener) { - executionNodes = Collections.emptySet(); - executionNodesListener = listener; - } - public void setExecutionNodes(Set<InternalDistributedMember> nodes) { if (executionNodes != null) { executionNodes = nodes; @@ -348,7 +333,7 @@ public abstract class AbstractExecution implements InternalExecution { } else { functionException = new FunctionException(fite); } - handleException(functionException, fn, cx, sender, dm); + handleException(functionException, fn, sender, dm); } catch (BucketMovedException bme) { FunctionException functionException; if (fn.isHA()) { @@ -357,13 +342,13 @@ public abstract class AbstractExecution implements InternalExecution { } else { functionException = new FunctionException(bme); } - handleException(functionException, fn, cx, sender, dm); + handleException(functionException, fn, sender, dm); } catch (VirtualMachineError e) { SystemFailure.initiateFailure(e); throw e; } catch (Throwable t) { SystemFailure.checkFailure(); - handleException(t, fn, cx, sender, dm); + handleException(t, fn, sender, dm); } } @@ -446,7 +431,8 @@ public abstract class AbstractExecution implements InternalExecution { * @throws TransactionException if more than one nodes are targeted within a transaction * @throws LowMemoryException if the set contains a heap critical member */ - public abstract void validateExecution(Function function, Set targetMembers); + public abstract void validateExecution(Function function, + Set<? extends DistributedMember> targetMembers); public LocalResultCollector<?, ?> getLocalResultCollector(Function function, final ResultCollector<?, ?> rc) { @@ -482,7 +468,7 @@ public abstract class AbstractExecution implements InternalExecution { } private void handleException(Throwable functionException, final Function fn, - final FunctionContext cx, final ResultSender sender, DistributionManager dm) { + final ResultSender sender, DistributionManager dm) { FunctionStats stats = FunctionStats.getFunctionStats(fn.getId(), dm.getSystem()); if (logger.isDebugEnabled()) { @@ -515,7 +501,7 @@ public abstract class AbstractExecution implements InternalExecution { * * @return timeout in milliseconds. */ - protected int getTimeoutMs() { + int getTimeoutMs() { return timeoutMs; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java index a18c560..573c40c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.geode.cache.TransactionDataNotColocatedException; @@ -38,67 +37,66 @@ import org.apache.geode.internal.cache.InternalCache; public class MemberFunctionExecutor extends AbstractExecution { - protected InternalDistributedSystem ds; + protected InternalDistributedSystem distributedSystem; - protected Set members; + protected Set<InternalDistributedMember> members; private ServerToClientFunctionResultSender sender; - public MemberFunctionExecutor(DistributedSystem s) { - this.ds = (InternalDistributedSystem) s; - this.members = this.ds.getDistributionManager().getNormalDistributionManagerIds(); + MemberFunctionExecutor(DistributedSystem distributedSystem) { + this(distributedSystem, ((InternalDistributedSystem) distributedSystem).getDistributionManager() + .getNormalDistributionManagerIds()); } - public MemberFunctionExecutor(DistributedSystem s, DistributedMember m) { - this.ds = (InternalDistributedSystem) s; - this.members = Collections.singleton(m); + MemberFunctionExecutor(DistributedSystem distributedSystem, DistributedMember distributedMember) { + this(distributedSystem, Collections.singleton((InternalDistributedMember) distributedMember)); } - public MemberFunctionExecutor(DistributedSystem s, Set m) { - this.ds = (InternalDistributedSystem) s; - this.members = m; + MemberFunctionExecutor(DistributedSystem distributedSystem, + Set<? extends DistributedMember> members) { + this.distributedSystem = (InternalDistributedSystem) distributedSystem; + this.members = (Set<InternalDistributedMember>) members; } - public MemberFunctionExecutor(DistributedSystem s, Set m, + public MemberFunctionExecutor(DistributedSystem distributedSystem, + Set<? extends DistributedMember> members, ServerToClientFunctionResultSender sender) { - this(s, m); + this(distributedSystem, members); this.sender = sender; } private MemberFunctionExecutor(MemberFunctionExecutor memFunctionExecutor) { super(memFunctionExecutor); - this.ds = memFunctionExecutor.ds; - this.members = new HashSet(); - this.members.addAll(memFunctionExecutor.members); - this.sender = memFunctionExecutor.sender; + distributedSystem = memFunctionExecutor.distributedSystem; + members = new HashSet<>(memFunctionExecutor.members); + sender = memFunctionExecutor.sender; } private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, MemberMappedArgument argument) { this(memberFunctionExecutor); - this.memberMappedArg = argument; - this.isMemberMappedArgument = true; + memberMappedArg = argument; + isMemberMappedArgument = true; } private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, ResultCollector rs) { this(memberFunctionExecutor); - this.rc = rs; + rc = rs; } private MemberFunctionExecutor(MemberFunctionExecutor memberFunctionExecutor, Object arguments) { this(memberFunctionExecutor); - this.args = arguments; + args = arguments; } - @SuppressWarnings("unchecked") private ResultCollector executeFunction(final Function function, ResultCollector resultCollector) { - final DistributionManager dm = this.ds.getDistributionManager(); - final Set dest = new HashSet(this.members); + final DistributionManager dm = distributedSystem.getDistributionManager(); + final Set<InternalDistributedMember> dest = new HashSet<>(members); if (dest.isEmpty()) { throw new FunctionException( String.format("No member found for executing function : %s.", @@ -108,7 +106,7 @@ public class MemberFunctionExecutor extends AbstractExecution { setExecutionNodes(dest); final InternalDistributedMember localVM = - this.ds.getDistributionManager().getDistributionManagerId(); + distributedSystem.getDistributionManager().getDistributionManagerId(); final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector); boolean remoteOnly = false; boolean localOnly = false; @@ -129,7 +127,7 @@ public class MemberFunctionExecutor extends AbstractExecution { boolean isTx = false; InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { - isTx = cache.getTxManager().getTXState() == null ? false : true; + isTx = cache.getTxManager().getTXState() != null; } final FunctionContext context = new FunctionContextImpl(cache, function.getId(), getArgumentsForMember(localVM.getId()), resultSender); @@ -137,25 +135,23 @@ public class MemberFunctionExecutor extends AbstractExecution { } if (!dest.isEmpty()) { - HashMap<InternalDistributedMember, Object> memberArgs = - new HashMap<InternalDistributedMember, Object>(); - Iterator<DistributedMember> iter = dest.iterator(); - while (iter.hasNext()) { - InternalDistributedMember recip = (InternalDistributedMember) iter.next(); - memberArgs.put(recip, getArgumentsForMember(recip.getId())); + HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>(); + for (InternalDistributedMember distributedMember : dest) { + memberArgs.put(distributedMember, getArgumentsForMember(distributedMember.getId())); } Assert.assertTrue(memberArgs.size() == dest.size()); - MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC, - function, memberArgs, dest, resultSender); + MemberFunctionResultWaiter resultReceiver = + new MemberFunctionResultWaiter(distributedSystem, localRC, + function, memberArgs, dest, resultSender); - ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this); - return reply; + return resultReceiver.getFunctionResultFrom(dest, function, this); } return localRC; } @Override - public void validateExecution(final Function function, final Set dest) { + public void validateExecution(final Function function, + final Set<? extends DistributedMember> dest) { final InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { return; @@ -170,7 +166,7 @@ public class MemberFunctionExecutor extends AbstractExecution { throw new UnsupportedOperationException( "Client function execution on members is not supported with transaction"); } - DistributedMember funcTarget = (DistributedMember) dest.iterator().next(); + DistributedMember funcTarget = dest.iterator().next(); DistributedMember target = cache.getTxManager().getTXState().getTarget(); if (target == null) { cache.getTxManager().getTXState().setTarget(funcTarget); @@ -249,25 +245,20 @@ public class MemberFunctionExecutor extends AbstractExecution { } @Override - public boolean isMemberMappedArgument() { - return this.isMemberMappedArgument; - } - - @Override public Object getArgumentsForMember(String memberId) { if (!isMemberMappedArgument) { - return this.args; + return args; } else { - return this.memberMappedArg.getArgumentsForMember(memberId); + return memberMappedArg.getArgumentsForMember(memberId); } } @Override public MemberMappedArgument getMemberMappedArgument() { - return this.memberMappedArg; + return memberMappedArg; } public ServerToClientFunctionResultSender getServerResultSender() { - return this.sender; + return sender; } }