This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 247a608 Fixed concurrent modification exception on function worker stop (#4006) 247a608 is described below commit 247a60866dcdd1fa5e2865bebb4409bdb1ffa3e5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Apr 8 14:39:05 2019 -0700 Fixed concurrent modification exception on function worker stop (#4006) --- .../pulsar/functions/worker/FunctionRuntimeManager.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 4618e88..e4acc70 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -58,6 +58,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -91,9 +92,9 @@ public class FunctionRuntimeManager implements AutoCloseable{ private RuntimeFactory runtimeFactory; private MembershipManager membershipManager; - + private final PulsarAdmin functionAdmin; - + @Getter private WorkerService workerService; @@ -424,7 +425,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ final String workerId = this.workerConfig.getWorkerId(); Map<String, Assignment> assignments = workerIdToAssignments.get(workerId); if (assignments != null) { - assignments.values().forEach(assignment -> { + // Take a copy of the map since the stopFunction will modify the same map + // and invalidate the iterator + Map<String, Assignment> copiedAssignments = new TreeMap<>(assignments); + copiedAssignments.values().forEach(assignment -> { String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()); try { stopFunction(fullyQualifiedInstanceId, false); @@ -681,7 +685,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.setAssignment(assignment); } } - + public synchronized void deleteAssignment(String fullyQualifiedInstanceId) { FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId); if (functionRuntimeInfo != null) { @@ -713,7 +717,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } - + String workerId = null; for(Entry<String, Map<String, Assignment>> workerAssignments : workerIdToAssignments.entrySet()) { if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) {