This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bcd893  Add support to restart function (#2365)
7bcd893 is described below

commit 7bcd8934a0a53ab7a62b3c9d77fbdec94ab497d2
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Aug 13 22:49:11 2018 -0700

    Add support to restart function (#2365)
    
    * Add support to restart function
    
    fix: pulsar function restart
    
    * add support to restart all function instances
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  25 +++++
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   2 +-
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  63 +++++++++++-
 .../org/apache/pulsar/client/admin/Functions.java  |  33 +++++++
 .../client/admin/internal/FunctionsImpl.java       |  22 +++++
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  29 ++++++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  37 ++++++-
 .../pulsar/functions/worker/FunctionActioner.java  |   4 +-
 .../functions/worker/FunctionRuntimeManager.java   | 107 +++++++++++++++++++++
 .../functions/worker/rest/api/FunctionsImpl.java   |  66 +++++++++++++
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  28 ++++++
 11 files changed, 404 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 564eb18..b8891e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -268,6 +268,31 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
     }
 
     @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
functionName, instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
functionName);
+    }
+
+    @POST
     @ApiOperation(
             value = "Uploads Pulsar Function file data",
             hidden = true
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index c57a8a0..2254626 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -206,7 +206,7 @@ public class PulsarFunctionTlsTest {
         String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
                 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
         FunctionDetails functionDetails = 
PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
-                functionName, sinkTopic, subscriptionName);
+                functionName, "my.*", sinkTopic, subscriptionName);
 
         try {
             functionAdmin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5db9a0a..1306f13 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -271,7 +271,7 @@ public class PulsarSinkE2ETest {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -333,7 +333,7 @@ public class PulsarSinkE2ETest {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -382,7 +382,7 @@ public class PulsarSinkE2ETest {
         assertEquals(ownerWorkerId, workerId);
     }
 
-    protected static FunctionDetails createSinkConfig(String jarFile, String 
tenant, String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
+    protected static FunctionDetails createSinkConfig(String jarFile, String 
tenant, String namespace, String functionName, String sourceTopic, String 
sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -390,7 +390,7 @@ public class PulsarSinkE2ETest {
         } catch (MalformedURLException e) {
             throw new RuntimeException("Failed to load user jar " + file, e);
         }
-        String sourceTopicPattern = String.format("persistent://%s/%s/my.*", 
tenant, namespace);
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", 
tenant, namespace, sourceTopic);
         Class<?> typeArg = byte[].class;
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
@@ -446,7 +446,7 @@ public class PulsarSinkE2ETest {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic, subscriptionName);
+                "my.*", sinkTopic, subscriptionName);
         try {
             admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
             assertTrue(validRoleName);
@@ -507,4 +507,57 @@ public class PulsarSinkE2ETest {
         assertEquals(functionMetadata.getSink().getTypeClassName(), 
typeArgs[1].getName());
 
     }
+    
+    @Test(timeOut = 20000)
+    public void testFunctionRestartApi() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopicName = "restartFunction";
+        final String sourceTopic = "persistent://" + replNamespace + "/" + 
sourceTopicName;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create source topic
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                sourceTopicName, sinkTopic, subscriptionName);
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats != null && subStats.consumers.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+        assertEquals(subStats.consumers.size(), 1);
+
+        // it should restart consumer : so, check if consumer came up again 
after restarting function
+        admin.functions().restartFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStat = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStat != null && subStat.consumers.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+        assertEquals(subStats.consumers.size(), 1);
+
+        producer.close();
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index c04873d..4525c51 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -181,6 +181,39 @@ public interface Functions {
      *             Unexpected error
      */
     FunctionStatusList getFunctionStatus(String tenant, String namespace, 
String function) throws PulsarAdminException;
+    
+    /**
+     * Restart function instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartFunction(String tenant, String namespace, String function, int 
instanceId) throws PulsarAdminException;
+    
+    /**
+     * Restart all function instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartFunction(String tenant, String namespace, String function) 
throws PulsarAdminException;
 
     /**
      * Triggers the function by writing to the input topic.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 028da3c..402b5d3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -216,6 +216,27 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
     }
 
     @Override
+    public void restartFunction(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartFunction(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public void uploadFunction(String sourceFile, String path) throws 
PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
@@ -289,4 +310,5 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
     public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
     }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e206e75..11a3c7a 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
+import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
 import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
@@ -215,6 +216,34 @@ public class CmdFunctionsTest {
     }
 
     @Test
+    public void restartFunction() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        int instanceId = 0;
+        cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName,
+                "--instance-id", Integer.toString(instanceId)});
+
+        RestartFunction restarter = cmd.getRestarter();
+        assertEquals(fnName, restarter.getFunctionName());
+
+        verify(functions, times(1)).restartFunction(tenant, namespace, fnName, 
instanceId);
+    }
+
+    @Test
+    public void restartFunctionInstances() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName });
+
+        RestartFunction restarter = cmd.getRestarter();
+        assertEquals(fnName, restarter.getFunctionName());
+
+        verify(functions, times(1)).restartFunction(tenant, namespace, fnName);
+    }
+
+    @Test
     public void testCreateFunctionWithHttpUrl() throws Exception {
         String fnName = TEST_NAME + "-function";
         String inputTopicName = TEST_NAME + "-input-topic";
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b11dabe..dd1ef3d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -102,7 +102,8 @@ public class CmdFunctions extends CmdBase {
     private final DeleteFunction deleter;
     private final UpdateFunction updater;
     private final GetFunction getter;
-    private final GetFunctionStatus statuser;
+    private final GetFunctionStatus functionStatus;
+    private final RestartFunction restart;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
     private final TriggerFunction triggerer;
@@ -164,7 +165,7 @@ public class CmdFunctions extends CmdBase {
 
         @Parameter(names = "--name", description = "The function's name")
         protected String functionName;
-
+        
         @Override
         void processArguments() throws Exception {
             super.processArguments();
@@ -831,6 +832,27 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Restart function instance")
+    class RestartFunction extends FunctionCommand {
+        
+        @Parameter(names = "--instance-id", description = "The function 
instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+        
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.functions().restartFunction(tenant, namespace, 
functionName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.functions().restartFunction(tenant, namespace, 
functionName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+    
     @Parameters(commandDescription = "Delete a Pulsar Function that's running 
on a Pulsar cluster")
     class DeleteFunction extends FunctionCommand {
         @Override
@@ -1035,18 +1057,20 @@ public class CmdFunctions extends CmdBase {
         deleter = new DeleteFunction();
         updater = new UpdateFunction();
         getter = new GetFunction();
-        statuser = new GetFunctionStatus();
+        functionStatus = new GetFunctionStatus();
         lister = new ListFunctions();
         stateGetter = new StateGetter();
         triggerer = new TriggerFunction();
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
         cluster = new GetCluster();
+        restart = new RestartFunction();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
         jcommander.addCommand("update", getUpdater());
         jcommander.addCommand("get", getGetter());
+        jcommander.addCommand("restart", getRestarter());
         jcommander.addCommand("getstatus", getStatuser());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
@@ -1082,7 +1106,7 @@ public class CmdFunctions extends CmdBase {
     }
 
     @VisibleForTesting
-    GetFunctionStatus getStatuser() { return statuser; }
+    GetFunctionStatus getStatuser() { return functionStatus; }
 
     @VisibleForTesting
     ListFunctions getLister() {
@@ -1109,6 +1133,11 @@ public class CmdFunctions extends CmdBase {
         return downloader;
     }
 
+    @VisibleForTesting
+    RestartFunction getRestarter() {
+        return restart;
+    }
+
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig 
functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 0927360..b3f30fd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -127,7 +127,7 @@ public class FunctionActioner implements AutoCloseable {
     }
 
     @VisibleForTesting
-    protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) 
throws Exception {
+    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws 
Exception {
         FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
         int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
 
@@ -225,7 +225,7 @@ public class FunctionActioner implements AutoCloseable {
         }
     }
 
-    private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
         Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
         FunctionMetaData functionMetaData = instance.getFunctionMetaData();
         log.info("Stopping function {} - {}...",
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5e1995e..121a454 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -35,9 +39,14 @@ import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -317,6 +326,104 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return functionStatus;
     }
 
+    public Response restartFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId) throws Exception {
+        Assignment assignment = this.findAssignment(tenant, namespace, 
functionName, instanceId);
+        final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, 
namespace, functionName, instanceId);
+        if (assignment == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(fullFunctionName + " doesn't 
exist")).build();
+        }
+
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+        
+        if (assignedWorkerId.equals(workerId)) {
+            
restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+            return Response.status(Status.OK).build();
+        } else {
+            // query other worker
+            List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
+            WorkerInfo workerInfo = null;
+            for (WorkerInfo entry : workerInfoList) {
+                if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                    workerInfo = entry;
+                }
+            }
+            if (workerInfo == null) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build();
+            }
+
+            URI redirect = null;
+            final String redirectUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
+                    workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName, instanceId);
+            try {
+                redirect = new URI(redirectUrl);
+            } catch (URISyntaxException e) {
+                log.error("Error in preparing redirect url for {}/{}/{}/{}: 
{}", tenant, namespace, functionName,
+                        instanceId, e.getMessage(), e);
+                return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(fullFunctionName + " invalid 
redirection url")).build();
+            }
+            throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+        }
+    }
+
+    public Response restartFunctionInstances(String tenant, String namespace, 
String functionName) throws Exception {
+        final String fullFunctionName = String.format("%s/%s/%s", tenant, 
namespace, functionName);
+        Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
+
+        if (assignments.isEmpty()) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(fullFunctionName + " has not been 
assigned yet")).build();
+        }
+        for (Assignment assignment : assignments) {
+            final String assignedWorkerId = assignment.getWorkerId();
+            final String workerId = this.workerConfig.getWorkerId();
+            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            if (assignedWorkerId.equals(workerId)) {
+                restartFunction(fullyQualifiedInstanceId);
+            } else {
+                List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry : workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] has not been assigned yet", 
fullyQualifiedInstanceId);
+                    }
+                    continue;
+                }
+                Client client = ClientBuilder.newClient();
+                // TODO: create and use pulsar-admin to support authorization 
and authentication and manage redirect
+                final String instanceRestartUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
+                        workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName,
+                        assignment.getInstance().getInstanceId());
+                
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
+                        .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+            }
+        }
+        return Response.status(Status.OK).build();
+    }
+
+    private void restartFunction(String fullyQualifiedInstanceId) throws 
Exception {
+        log.info("[{}] restarting..", fullyQualifiedInstanceId);
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
+        if (functionRuntimeInfo != null) {
+            this.functionActioner.stopFunction(functionRuntimeInfo);
+            try {
+                this.functionActioner.startFunction(functionRuntimeInfo);
+            } catch (Exception ex) {
+                log.info("{} Error starting function", 
fullyQualifiedInstanceId, ex);
+                functionRuntimeInfo.setStartupException(ex);
+                throw ex;
+            }
+        }
+    }
+
     /**
      * Get statuses of all function instances.
      * @param tenant the tenant the function belongs to
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4bb6e49..a9e03de 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -386,6 +386,72 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
+    public Response restartFunctionInstance(final String tenant, final String 
namespace, final String functionName,
+            final String instanceId) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionInstanceRequestParams(tenant, namespace, 
functionName, instanceId);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid restart-function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            log.error("Function in getFunctionStatus does not exist @ 
/{}/{}/{}", tenant, namespace, functionName);
+            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't 
exist", functionName))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        try {
+            return functionRuntimeManager.restartFunctionInstance(tenant, 
namespace, functionName,
+                    Integer.parseInt(instanceId));
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("Failed to restart function: {}/{}/{}/{}", tenant, 
namespace, functionName, instanceId, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.getMessage()).build();
+        }
+    }
+
+    public Response restartFunctionInstances(final String tenant, final String 
namespace, final String functionName) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, functionName);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            log.error("Function in getFunctionStatus does not exist @ 
/{}/{}/{}", tenant, namespace, functionName);
+            return 
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't 
exist", functionName))).build();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        try {
+            return functionRuntimeManager.restartFunctionInstances(tenant, 
namespace, functionName);
+        }catch (Exception e) {
+            log.error("Failed to restart function: {}/{}/{}", tenant, 
namespace, functionName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.getMessage()).build();
+        }
+    }
+    
     public Response getFunctionStatus(final String tenant, final String 
namespace, final String functionName)
             throws IOException {
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 96baada..3581453 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,6 +46,8 @@ import 
org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -163,6 +166,31 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
     }
 
     @POST
+    @ApiOperation(value = "Restart function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
functionName, instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all function instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
functionName);
+    }
+    
+    @POST
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response uploadFunction(final @FormDataParam("data") InputStream 
uploadedInputStream,

Reply via email to