This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c300e3e  fix bug in FunctionRuntimeManager involving not cleaning up 
old invalid assignments (#2223)
c300e3e is described below

commit c300e3e5f2f6118619e7d06d5db9629bbf29ad3f
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Jul 25 11:37:48 2018 -0700

    fix bug in FunctionRuntimeManager involving not cleaning up old invalid 
assignments (#2223)
    
    ### Motivation
    
    Old invalid assignments of functions are not cleaned up and left over after 
a function is rescheduled.
    
    Getting the status of functions rely on assignments to route the get status 
to the correct worker running the function.  If the assignments are not 
correct, get status requests will be routed incorrectly.
    
    Also fixed  a test that was asserting the incorrect value.  The test would 
have caught this problem if it was asserting the correct value.
---
 .../pulsar/functions/worker/FunctionRuntimeManager.java      | 12 ++++++++++--
 .../pulsar/functions/worker/FunctionRuntimeManagerTest.java  |  4 +---
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 05a79d8..5e1995e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -82,7 +82,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     private MembershipManager membershipManager;
     private final ConnectorsManager connectorsManager;
 
-
     public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   PulsarClient pulsarClient,
                                   Namespace dlogNamespace,
@@ -354,6 +353,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
      * @param assignmentsUpdate the assignment update
      */
     public synchronized void processAssignmentUpdate(MessageId messageId, 
AssignmentsUpdate assignmentsUpdate) {
+
         if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
 
             Map<String, Assignment> assignmentMap = new HashMap<>();
@@ -430,8 +430,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                         this.insertStartAction(newFunctionRuntimeInfo);
                         this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
-                        this.setAssignment(assignment);
                     }
+
+                    // find existing assignment
+                    Assignment existing_assignment = 
this.findAssignment(assignment);
+                    if (existing_assignment != null) {
+                        // delete old assignment that could have old data
+                        this.deleteAssignment(existing_assignment);
+                    }
+                    // set to newest assignment
+                    this.setAssignment(assignment);
                 }
             }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 8ab7473..4f618c4 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -378,12 +378,10 @@ public class FunctionRuntimeManagerTest {
                                         .build()))));
 
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
-
-        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
+        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
-
 }

Reply via email to