srkukarni closed pull request #2784: add initialize routine to 
FunctionRuntimeManager
URL: https://github.com/apache/pulsar/pull/2784
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3ad6c7c6ee..3f8bec3310 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -36,13 +36,11 @@
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager)
+    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager, Reader<byte[]> reader)
             throws PulsarClientException {
         this.functionRuntimeManager = functionRuntimeManager;
 
-        this.reader = 
functionRuntimeManager.getWorkerService().getClient().newReader()
-                
.topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
-                .startMessageId(MessageId.earliest).create();
+        this.reader = reader;
     }
 
     public void start() {
@@ -66,8 +64,7 @@ public void close() {
         log.info("Stopped function state consumer");
     }
 
-    @Override
-    public void accept(Message<byte[]> msg) {
+    public void processAssignment(Message<byte[]> msg) {
         if(msg.getData()==null || (msg.getData().length==0)) {
             log.info("Received assignment delete: {}", msg.getKey());
             this.functionRuntimeManager.deleteAssignment(msg.getKey());
@@ -82,8 +79,13 @@ public void accept(Message<byte[]> msg) {
                 throw new RuntimeException(e);
             }
             log.info("Received assignment update: {}", assignment);
-            this.functionRuntimeManager.processAssignment(assignment);    
+            this.functionRuntimeManager.processAssignment(assignment);
         }
+    }
+
+    @Override
+    public void accept(Message<byte[]> msg) {
+        processAssignment(msg);
         // receive next request
         receiveOne();
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 44ff807125..4faed11a38 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -82,13 +82,11 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
 
     /**
      * Initializes the FunctionMetaDataManager.  Does the following:
-     * 1. Restores from snapshot if one exists
-     * 2. Sends out initialize marker to FMT and consume messages until the 
initialize marker is consumed
+     * 1. Consume all existing function meta data upon start to establish 
existing state
      */
     public void initialize() {
         log.info("/** Initializing Function Metadata Manager **/");
         try {
-
             Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 47d317fc94..8c0b50b849 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -36,10 +36,13 @@
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.Assignment;
@@ -65,6 +68,8 @@
 
     // All the runtime info related to functions executed by this worker
     // Fully Qualified InstanceId - > FunctionRuntimeInfo
+    // NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo 
methods to modify this data structure
+    // Since during initialization phase nothing should be modified
     @VisibleForTesting
     Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
ConcurrentHashMap<>();
 
@@ -75,7 +80,7 @@
     @VisibleForTesting
     LinkedBlockingQueue<FunctionAction> actionQueue;
 
-    private final FunctionAssignmentTailer functionAssignmentTailer;
+    private FunctionAssignmentTailer functionAssignmentTailer;
 
     private FunctionActioner functionActioner;
 
@@ -89,14 +94,16 @@
     @Getter
     private WorkerService workerService;
 
+    @Setter
+    @Getter
+    boolean isInitializePhase = false;
+
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
-        this.functionAssignmentTailer = new FunctionAssignmentTailer(this);
-
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
                 
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
@@ -143,6 +150,41 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, 
WorkerService workerSer
         this.membershipManager = membershipManager;
     }
 
+    /**
+     * Initializes the FunctionRuntimeManager.  Does the following:
+     * 1. Consume all existing assignments to establish existing/latest set of 
assignments
+     * 2. After current assignments are read, assignments belonging to this 
worker will be processed
+     */
+    public void initialize() {
+        log.info("/** Initializing Runtime Manager **/");
+        try {
+            Reader<byte[]> reader = 
this.getWorkerService().getClient().newReader()
+                    
.topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
+                    .startMessageId(MessageId.earliest).create();
+
+            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
+            // read all existing messages
+            this.setInitializePhase(true);
+            while (reader.hasMessageAvailable()) {
+                
this.functionAssignmentTailer.processAssignment(reader.readNext());
+            }
+            this.setInitializePhase(false);
+            // realize existing assignments
+            Map<String, Assignment> assignmentMap = 
workerIdToAssignments.get(this.workerConfig.getWorkerId());
+            if (assignmentMap != null) {
+                for (Assignment assignment : assignmentMap.values()) {
+                    startFunctionInstance(assignment);
+                }
+            }
+            // start assignment tailer
+            this.functionAssignmentTailer.start();
+
+        } catch (Exception e) {
+            log.error("Failed to initialize function runtime manager: ", 
e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Starts the function runtime manager
      */
@@ -622,27 +664,29 @@ void deleteAssignment(Assignment assignment) {
     }
 
     private void addAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
-
         //add new function
         this.setAssignment(assignment);
 
         //Assigned to me
         if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
-            if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
-                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
-                        .setFunctionInstance(assignment.getInstance()));
+            startFunctionInstance(assignment);
+        }
+    }
 
-            } else {
-                //Somehow this function is already started
-                log.warn("Function {} already running. Going to restart 
function.",
-                        
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-            }
-            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-            this.insertStartAction(functionRuntimeInfo);
+    private void startFunctionInstance(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
+            this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
+                    .setFunctionInstance(assignment.getInstance()));
+
+        } else {
+            //Somehow this function is already started
+            log.warn("Function {} already running. Going to restart function.",
+                    this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+            
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
         }
-        
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+        this.insertStartAction(functionRuntimeInfo);
     }
 
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
@@ -670,26 +714,29 @@ public void updateRates() {
 
     @VisibleForTesting
     void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionAction functionAction = new FunctionAction();
-        functionAction.setAction(FunctionAction.Action.STOP);
-        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-        try {
-            actionQueue.put(functionAction);
-        } catch (InterruptedException ex) {
-            throw new RuntimeException("Interrupted while putting action");
+        if (!this.isInitializePhase) {
+            FunctionAction functionAction = new FunctionAction();
+            functionAction.setAction(FunctionAction.Action.STOP);
+            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+            try {
+                actionQueue.put(functionAction);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while putting action");
+            }
         }
-
     }
 
     @VisibleForTesting
     void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionAction functionAction = new FunctionAction();
-        functionAction.setAction(FunctionAction.Action.START);
-        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-        try {
-            actionQueue.put(functionAction);
-        } catch (InterruptedException ex) {
-            throw new RuntimeException("Interrupted while putting action");
+        if (!this.isInitializePhase) {
+            FunctionAction functionAction = new FunctionAction();
+            functionAction.setAction(FunctionAction.Action.START);
+            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+            try {
+                actionQueue.put(functionAction);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while putting action");
+            }
         }
     }
 
@@ -726,11 +773,16 @@ void setAssignment(Assignment assignment) {
     }
 
     private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
-        this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
+        if (!this.isInitializePhase) {
+            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
+        }
     }
 
     private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, 
FunctionRuntimeInfo functionRuntimeInfo) {
-        this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
+        // Don't modify Function Runtime Infos when initializing
+        if (!this.isInitializePhase) {
+            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
+        }
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 488bcd775d..2b9c632479 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -150,6 +150,9 @@ public void start(URI dlogUri) throws InterruptedException {
             this.functionRuntimeManager = new FunctionRuntimeManager(
                     this.workerConfig, this, this.dlogNamespace, 
this.membershipManager, connectorsManager);
 
+            // initialize function runtime manager
+            this.functionRuntimeManager.initialize();
+
             // Setting references to managers in scheduler
             
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
             
this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 5e1ed023aa..490be77732 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -18,23 +18,31 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.metrics.MetricsSink;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -46,7 +54,9 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@Slf4j
 public class FunctionRuntimeManagerTest {
 
     public static class TestSink implements MetricsSink {
@@ -384,4 +394,122 @@ public boolean matches(Object o) {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
+
+    @Test
+    public void testRuntimeManagerInitialize() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+
+        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder()
+                        
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
+
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        Function.Assignment assignment3 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        List<Message<byte[]>> messageList = new LinkedList<>();
+        Message message1 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                        new HashMap<>(), 
Unpooled.copiedBuffer(assignment1.toByteArray()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+
+        Message message2 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                new HashMap<>(), 
Unpooled.copiedBuffer(assignment2.toByteArray()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+
+        // delete function2
+        Message message3 = spy(new MessageImpl("foo", 
MessageId.latest.toString(),
+                new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null));
+        
doReturn(Utils.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
+
+        messageList.add(message1);
+        messageList.add(message2);
+        messageList.add(message3);
+
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+
+        Reader<byte[]> reader = mock(Reader.class);
+
+        Iterator<Message<byte[]>> it = messageList.iterator();
+
+        when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
+            @Override
+            public Message<byte[]> answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                return it.next();
+            }
+        });
+
+        when(reader.readNextAsync()).thenAnswer(new 
Answer<CompletableFuture<Message<byte[]>>>() {
+            @Override
+            public CompletableFuture<Message<byte[]>> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                return new CompletableFuture<>();
+            }
+        });
+
+
+        when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                return it.hasNext();
+            }
+        });
+
+
+
+        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
+
+        doReturn(reader).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
+        // test new assignment add functions
+        FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
+        ));
+
+
+        functionRuntimeManager.initialize();
+
+        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
+        log.info("actionQueue: {}", functionRuntimeManager.actionQueue);
+        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
+
+        FunctionAction functionAction = 
functionRuntimeManager.actionQueue.poll();
+
+        // only actually start function1
+        Assert.assertEquals(functionAction.getAction(), 
FunctionAction.Action.START);
+        
Assert.assertEquals(functionAction.getFunctionRuntimeInfo().getFunctionInstance(),
 assignment1.getInstance());
+
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to