This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 247a608  Fixed concurrent modification exception on function worker 
stop (#4006)
247a608 is described below

commit 247a60866dcdd1fa5e2865bebb4409bdb1ffa3e5
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Apr 8 14:39:05 2019 -0700

    Fixed concurrent modification exception on function worker stop (#4006)
---
 .../pulsar/functions/worker/FunctionRuntimeManager.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 4618e88..e4acc70 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -58,6 +58,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -91,9 +92,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
-    
+
     private final PulsarAdmin functionAdmin;
-    
+
     @Getter
     private WorkerService workerService;
 
@@ -424,7 +425,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         final String workerId = this.workerConfig.getWorkerId();
         Map<String, Assignment> assignments = 
workerIdToAssignments.get(workerId);
         if (assignments != null) {
-            assignments.values().forEach(assignment -> {
+            // Take a copy of the map since the stopFunction will modify the 
same map
+            // and invalidate the iterator
+            Map<String, Assignment> copiedAssignments = new 
TreeMap<>(assignments);
+            copiedAssignments.values().forEach(assignment -> {
                 String fullyQualifiedInstanceId = 
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                 try {
                     stopFunction(fullyQualifiedInstanceId, false);
@@ -681,7 +685,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             this.setAssignment(assignment);
         }
     }
-   
+
     public synchronized void deleteAssignment(String fullyQualifiedInstanceId) 
{
         FunctionRuntimeInfo functionRuntimeInfo = 
_getFunctionRuntimeInfo(fullyQualifiedInstanceId);
         if (functionRuntimeInfo != null) {
@@ -713,7 +717,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             }
             this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
         }
-        
+
         String workerId = null;
         for(Entry<String, Map<String, Assignment>> workerAssignments : 
workerIdToAssignments.entrySet()) {
             
if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) {

Reply via email to