rdhabalia closed pull request #2365: Add support to restart function
URL: https://github.com/apache/incubator-pulsar/pull/2365
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 564eb18067..b8891e5a92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -267,6 +267,31 @@ public Response triggerFunction(final @PathParam("tenant") 
String tenant,
 
     }
 
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
functionName, instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
functionName);
+    }
+
     @POST
     @ApiOperation(
             value = "Uploads Pulsar Function file data",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index c57a8a0e59..2254626804 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -206,7 +206,7 @@ public void testAuthorization() throws Exception {
         String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
                 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
         FunctionDetails functionDetails = 
PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
-                functionName, sinkTopic, subscriptionName);
+                functionName, "my.*", sinkTopic, subscriptionName);
 
         try {
             functionAdmin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index abc4735ea1..e5902d4d00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -270,7 +270,7 @@ public void testE2EPulsarSink() throws Exception {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -332,7 +332,7 @@ public void testPulsarSinkStats() throws Exception {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -381,7 +381,7 @@ public void testPulsarSinkStats() throws Exception {
         assertEquals(ownerWorkerId, workerId);
     }
 
-    protected static FunctionDetails createSinkConfig(String jarFile, String 
tenant, String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
+    protected static FunctionDetails createSinkConfig(String jarFile, String 
tenant, String namespace, String functionName, String sourceTopic, String 
sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -389,7 +389,7 @@ protected static FunctionDetails createSinkConfig(String 
jarFile, String tenant,
         } catch (MalformedURLException e) {
             throw new RuntimeException("Failed to load user jar " + file, e);
         }
-        String sourceTopicPattern = String.format("persistent://%s/%s/my.*", 
tenant, namespace);
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", 
tenant, namespace, sourceTopic);
         Class<?> typeArg = byte[].class;
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
@@ -444,7 +444,7 @@ public void testAuthorization(boolean validRoleName) throws 
Exception {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         try {
             admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
             assertTrue(validRoleName);
@@ -505,4 +505,57 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() 
throws Exception {
         assertEquals(functionMetadata.getSink().getTypeClassName(), 
typeArgs[1].getName());
 
     }
+    
+    @Test(timeOut = 20000)
+    public void testFunctionRestartApi() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopicName = "restartFunction";
+        final String sourceTopic = "persistent://" + replNamespace + "/" + 
sourceTopicName;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create source topic
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                sourceTopicName, sinkTopic, subscriptionName);
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats != null && subStats.consumers.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+        assertEquals(subStats.consumers.size(), 1);
+
+        // it should restart consumer : so, check if consumer came up again 
after restarting function
+        admin.functions().restartFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStat = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStat != null && subStat.consumers.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+        assertEquals(subStats.consumers.size(), 1);
+
+        producer.close();
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index c04873da8e..4525c51b81 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -181,6 +181,39 @@
      *             Unexpected error
      */
     FunctionStatusList getFunctionStatus(String tenant, String namespace, 
String function) throws PulsarAdminException;
+    
+    /**
+     * Restart function instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartFunction(String tenant, String namespace, String function, int 
instanceId) throws PulsarAdminException;
+    
+    /**
+     * Restart all function instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartFunction(String tenant, String namespace, String function) 
throws PulsarAdminException;
 
     /**
      * Triggers the function by writing to the input topic.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 028da3c2b1..402b5d3971 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -215,6 +215,27 @@ public String triggerFunction(String tenant, String 
namespace, String functionNa
         }
     }
 
+    @Override
+    public void restartFunction(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartFunction(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     @Override
     public void uploadFunction(String sourceFile, String path) throws 
PulsarAdminException {
         try {
@@ -289,4 +310,5 @@ public static void mergeJson(String json, Builder builder) 
throws IOException {
     public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
     }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e206e75253..11a3c7a0b9 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
+import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
 import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
@@ -214,6 +215,34 @@ public void testCreateFunction() throws Exception {
 
     }
 
+    @Test
+    public void restartFunction() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        int instanceId = 0;
+        cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName,
+                "--instance-id", Integer.toString(instanceId)});
+
+        RestartFunction restarter = cmd.getRestarter();
+        assertEquals(fnName, restarter.getFunctionName());
+
+        verify(functions, times(1)).restartFunction(tenant, namespace, fnName, 
instanceId);
+    }
+
+    @Test
+    public void restartFunctionInstances() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName });
+
+        RestartFunction restarter = cmd.getRestarter();
+        assertEquals(fnName, restarter.getFunctionName());
+
+        verify(functions, times(1)).restartFunction(tenant, namespace, fnName);
+    }
+
     @Test
     public void testCreateFunctionWithHttpUrl() throws Exception {
         String fnName = TEST_NAME + "-function";
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b11dabee82..dd1ef3df5d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -102,7 +102,8 @@
     private final DeleteFunction deleter;
     private final UpdateFunction updater;
     private final GetFunction getter;
-    private final GetFunctionStatus statuser;
+    private final GetFunctionStatus functionStatus;
+    private final RestartFunction restart;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
     private final TriggerFunction triggerer;
@@ -164,7 +165,7 @@ public void processArguments() {
 
         @Parameter(names = "--name", description = "The function's name")
         protected String functionName;
-
+        
         @Override
         void processArguments() throws Exception {
             super.processArguments();
@@ -831,6 +832,27 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Restart function instance")
+    class RestartFunction extends FunctionCommand {
+        
+        @Parameter(names = "--instance-id", description = "The function 
instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+        
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.functions().restartFunction(tenant, namespace, 
functionName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.functions().restartFunction(tenant, namespace, 
functionName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+    
     @Parameters(commandDescription = "Delete a Pulsar Function that's running 
on a Pulsar cluster")
     class DeleteFunction extends FunctionCommand {
         @Override
@@ -1035,18 +1057,20 @@ public CmdFunctions(PulsarAdmin admin) throws 
PulsarClientException {
         deleter = new DeleteFunction();
         updater = new UpdateFunction();
         getter = new GetFunction();
-        statuser = new GetFunctionStatus();
+        functionStatus = new GetFunctionStatus();
         lister = new ListFunctions();
         stateGetter = new StateGetter();
         triggerer = new TriggerFunction();
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
         cluster = new GetCluster();
+        restart = new RestartFunction();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
         jcommander.addCommand("update", getUpdater());
         jcommander.addCommand("get", getGetter());
+        jcommander.addCommand("restart", getRestarter());
         jcommander.addCommand("getstatus", getStatuser());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
@@ -1082,7 +1106,7 @@ GetFunction getGetter() {
     }
 
     @VisibleForTesting
-    GetFunctionStatus getStatuser() { return statuser; }
+    GetFunctionStatus getStatuser() { return functionStatus; }
 
     @VisibleForTesting
     ListFunctions getLister() {
@@ -1109,6 +1133,11 @@ DownloadFunction getDownloader() {
         return downloader;
     }
 
+    @VisibleForTesting
+    RestartFunction getRestarter() {
+        return restart;
+    }
+
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig 
functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 09273606ab..b3f30fdb95 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -127,7 +127,7 @@ public void join() throws InterruptedException {
     }
 
     @VisibleForTesting
-    protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) 
throws Exception {
+    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws 
Exception {
         FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
         int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
 
@@ -225,7 +225,7 @@ private void downloadFile(File pkgFile, boolean 
isPkgUrlProvided, FunctionMetaDa
         }
     }
 
-    private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
         Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
         FunctionMetaData functionMetaData = instance.getFunctionMetaData();
         log.info("Stopping function {} - {}...",
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5e1995e633..121a454a91 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -20,11 +20,15 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -35,9 +39,14 @@
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -317,6 +326,104 @@ public synchronized void 
removeAssignments(Collection<Assignment> assignments) {
         return functionStatus;
     }
 
+    public Response restartFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId) throws Exception {
+        Assignment assignment = this.findAssignment(tenant, namespace, 
functionName, instanceId);
+        final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, 
namespace, functionName, instanceId);
+        if (assignment == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(fullFunctionName + " doesn't 
exist")).build();
+        }
+
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+        
+        if (assignedWorkerId.equals(workerId)) {
+            
restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+            return Response.status(Status.OK).build();
+        } else {
+            // query other worker
+            List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
+            WorkerInfo workerInfo = null;
+            for (WorkerInfo entry : workerInfoList) {
+                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                    workerInfo = entry;
+                }
+            }
+            if (workerInfo == null) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build();
+            }
+
+            URI redirect = null;
+            final String redirectUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
+                    workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName, instanceId);
+            try {
+                redirect = new URI(redirectUrl);
+            } catch (URISyntaxException e) {
+                log.error("Error in preparing redirect url for {}/{}/{}/{}: 
{}", tenant, namespace, functionName,
+                        instanceId, e.getMessage(), e);
+                return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(fullFunctionName + " invalid 
redirection url")).build();
+            }
+            throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+        }
+    }
+
+    public Response restartFunctionInstances(String tenant, String namespace, 
String functionName) throws Exception {
+        final String fullFunctionName = String.format("%s/%s/%s", tenant, 
namespace, functionName);
+        Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
+
+        if (assignments.isEmpty()) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(fullFunctionName + " has not been 
assigned yet")).build();
+        }
+        for (Assignment assignment : assignments) {
+            final String assignedWorkerId = assignment.getWorkerId();
+            final String workerId = this.workerConfig.getWorkerId();
+            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            if (assignedWorkerId.equals(workerId)) {
+                restartFunction(fullyQualifiedInstanceId);
+            } else {
+                List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry : workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] has not been assigned yet", 
fullyQualifiedInstanceId);
+                    }
+                    continue;
+                }
+                Client client = ClientBuilder.newClient();
+                // TODO: create and use pulsar-admin to support authorization 
and authentication and manage redirect
+                final String instanceRestartUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
+                        workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName,
+                        assignment.getInstance().getInstanceId());
+                
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
+                        .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+            }
+        }
+        return Response.status(Status.OK).build();
+    }
+
+    private void restartFunction(String fullyQualifiedInstanceId) throws 
Exception {
+        log.info("[{}] restarting..", fullyQualifiedInstanceId);
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
+        if (functionRuntimeInfo != null) {
+            this.functionActioner.stopFunction(functionRuntimeInfo);
+            try {
+                this.functionActioner.startFunction(functionRuntimeInfo);
+            } catch (Exception ex) {
+                log.info("{} Error starting function", 
fullyQualifiedInstanceId, ex);
+                functionRuntimeInfo.setStartupException(ex);
+                throw ex;
+            }
+        }
+    }
+
     /**
      * Get statuses of all function instances.
      * @param tenant the tenant the function belongs to
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c35dd52e80..45bdb1426a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -385,6 +385,72 @@ public Response getFunctionInstanceStatus(final String 
tenant, final String name
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
+    public Response restartFunctionInstance(final String tenant, final String 
namespace, final String functionName,
+            final String instanceId) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionInstanceRequestParams(tenant, namespace, 
functionName, instanceId);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid restart-function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            log.error("Function in getFunctionStatus does not exist @ 
/{}/{}/{}", tenant, namespace, functionName);
+            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't 
exist", functionName))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        try {
+            return functionRuntimeManager.restartFunctionInstance(tenant, 
namespace, functionName,
+                    Integer.parseInt(instanceId));
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("Failed to restart function: {}/{}/{}/{}", tenant, 
namespace, functionName, instanceId, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.getMessage()).build();
+        }
+    }
+
+    public Response restartFunctionInstances(final String tenant, final String 
namespace, final String functionName) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, functionName);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            log.error("Function in getFunctionStatus does not exist @ 
/{}/{}/{}", tenant, namespace, functionName);
+            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't 
exist", functionName))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        try {
+            return functionRuntimeManager.restartFunctionInstances(tenant, 
namespace, functionName);
+        }catch (Exception e) {
+            log.error("Failed to restart function: {}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.getMessage()).build();
+        }
+    }
+    
     public Response getFunctionStatus(final String tenant, final String 
namespace, final String functionName)
             throws IOException {
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 96baada967..3581453920 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,6 +22,7 @@
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,6 +46,8 @@
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -162,6 +165,31 @@ public Response triggerFunction(final @PathParam("tenant") 
String tenant,
         return functions.triggerFunction(tenant, namespace, functionName, 
input, uploadedInputStream, topic);
     }
 
+    @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
functionName, instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
functionName);
+    }
+    
     @POST
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to