This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 020a1d5  Fixed the behavior of Function start/stop (#3477)
020a1d5 is described below

commit 020a1d57e122582a5ad8bd043f278e4a92d4ffc1
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Jan 31 16:47:39 2019 -0800

    Fixed the behavior of Function start/stop (#3477)
    
    * Added a state in the function metadata about what the state of the 
instances should be
    
    * Have start api for sources/sinks
    
    * Add missing pieces
    
    * more checks while handling request
    
    * Fixed bugs
    
    * Added unittests
    
    * Added unittest
    
    * Fix the all instances side logic
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  31 +++++
 .../apache/pulsar/broker/admin/impl/SinkBase.java  |  31 +++++
 .../pulsar/broker/admin/impl/SourceBase.java       |  29 +++++
 .../org/apache/pulsar/client/admin/Functions.java  |  34 +++++
 .../java/org/apache/pulsar/client/admin/Sink.java  |  34 +++++
 .../org/apache/pulsar/client/admin/Source.java     |  34 +++++
 .../client/admin/internal/FunctionsImpl.java       |  21 ++++
 .../pulsar/client/admin/internal/SinkImpl.java     |  21 ++++
 .../pulsar/client/admin/internal/SourceImpl.java   |  21 ++++
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  29 +++++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  33 ++++-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  28 ++++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  28 ++++-
 .../proto/src/main/proto/Function.proto            |   5 +
 .../functions/worker/FunctionMetaDataManager.java  |  59 +++++++++
 .../functions/worker/FunctionRuntimeManager.java   |  56 ++++++---
 .../functions/worker/rest/api/ComponentImpl.java   | 139 +++++++++++++++++----
 .../worker/rest/api/v3/FunctionApiV3Resource.java  |  31 +++++
 .../worker/rest/api/v3/SinkApiV3Resource.java      |  27 ++++
 .../worker/rest/api/v3/SourceApiV3Resource.java    |  27 ++++
 .../worker/FunctionMetaDataManagerTest.java        |  62 +++++++++
 .../worker/FunctionRuntimeManagerTest.java         |  53 ++++++++
 22 files changed, 781 insertions(+), 52 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 73af2c5..9b88f29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -338,6 +338,37 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
     }
 
     @POST
+    @ApiOperation(value = "Start function instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startFunction(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("functionName") String 
functionName,
+                              final @PathParam("instanceId") String 
instanceId) {
+        functions.startFunctionInstance(tenant, namespace, functionName, 
instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all function instances", response = 
Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startFunction(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("functionName") String 
functionName) {
+        functions.startFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
     @ApiOperation(
             value = "Uploads Pulsar Function file data",
             hidden = true
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 41c5376..2bd22a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -256,6 +256,37 @@ public class SinkBase extends AdminResource implements 
Supplier<WorkerService> {
         sink.stopFunctionInstances(tenant, namespace, sinkName);
     }
 
+    @POST
+    @ApiOperation(value = "Start sink instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSink(final @PathParam("tenant") String tenant,
+                          final @PathParam("namespace") String namespace,
+                          final @PathParam("sinkName") String sinkName,
+                          final @PathParam("instanceId") String instanceId) {
+        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, 
uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all sink instances", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSink(final @PathParam("tenant") String tenant,
+                          final @PathParam("namespace") String namespace,
+                          final @PathParam("sinkName") String sinkName) {
+        sink.startFunctionInstances(tenant, namespace, sinkName);
+    }
+
     @GET
     @ApiOperation(
             value = "Fetches a list of supported Pulsar IO sink connectors 
currently running in cluster mode",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index c4a102b..0e8348f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -251,6 +251,35 @@ public class SourceBase extends AdminResource implements 
Supplier<WorkerService>
         source.stopFunctionInstances(tenant, namespace, sourceName);
     }
 
+    @POST
+    @ApiOperation(value = "Start source instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSource(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sourceName") String sourceName,
+                            final @PathParam("instanceId") String instanceId) {
+        source.startFunctionInstance(tenant, namespace, sourceName, 
instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all source instances", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSource(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sourceName") String sourceName) {
+        source.startFunctionInstances(tenant, namespace, sourceName);
+    }
+
     @GET
     @ApiOperation(
             value = "Fetches a list of supported Pulsar IO source connectors 
currently running in cluster mode",
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 2bd2e9f..481c5fd 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -285,6 +285,39 @@ public interface Functions {
     void stopFunction(String tenant, String namespace, String function, int 
instanceId) throws PulsarAdminException;
 
     /**
+     * Start all function instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startFunction(String tenant, String namespace, String function) 
throws PulsarAdminException;
+
+    /**
+     * Start function instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startFunction(String tenant, String namespace, String function, int 
instanceId) throws PulsarAdminException;
+
+    /**
      * Stop all function instances
      *
      * @param tenant
@@ -299,6 +332,7 @@ public interface Functions {
      */
     void stopFunction(String tenant, String namespace, String function) throws 
PulsarAdminException;
 
+
     /**
      * Triggers the function by writing to the input topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 9f2d9ab..2b924f6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -264,6 +264,40 @@ public interface Sink {
     void stopSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
 
     /**
+     * Start sink instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @param instanceId
+     *            Sink instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Start all sink instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
+
+
+    /**
      * Fetches a list of supported Pulsar IO sinks currently running in 
cluster mode
      *
      * @throws PulsarAdminException
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 989598a..706150b 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -264,6 +264,40 @@ public interface Source {
     void stopSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
 
     /**
+     * Start source instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @param instanceId
+     *            Source instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Start all source instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void startSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
+
+
+    /**
      * Fetches a list of supported Pulsar IO sources currently running in 
cluster mode
      *
      * @throws PulsarAdminException
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 130704e..ac8d60d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -291,6 +291,27 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
     }
 
     @Override
+    public void startFunction(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("start")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void startFunction(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path("start"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public void uploadFunction(String sourceFile, String path) throws 
PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index a9f99b8..48a75e4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -233,6 +233,27 @@ public class SinkImpl extends BaseResource implements Sink 
{
     }
 
     @Override
+    public void startSink(String tenant, String namespace, String sinkName, 
int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
+                    .path("start")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void startSink(String tenant, String namespace, String sinkName) 
throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(sinkName).path("start"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public List<ConnectorDefinition> getBuiltInSinks() throws 
PulsarAdminException {
         try {
             Response response = request(sink.path("builtinsinks")).get();
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 2d066e0..1a56dc4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -233,6 +233,27 @@ public class SourceImpl extends BaseResource implements 
Source {
     }
 
     @Override
+    public void startSource(String tenant, String namespace, String 
sourceName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
+                    .path("start")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void startSource(String tenant, String namespace, String 
sourceName) throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(sourceName).path("start"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public List<ConnectorDefinition> getBuiltInSources() throws 
PulsarAdminException {
         try {
             Response response = request(source.path("builtinsources")).get();
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 950ee94..a64c38c 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -254,6 +254,35 @@ public class CmdFunctionsTest {
     }
 
     @Test
+    public void startFunction() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        int instanceId = 0;
+        cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName,
+                "--instance-id", Integer.toString(instanceId)});
+
+        CmdFunctions.StartFunction stop = cmd.getStarter();
+        assertEquals(fnName, stop.getFunctionName());
+
+        verify(functions, times(1)).startFunction(tenant, namespace, fnName, 
instanceId);
+    }
+
+    @Test
+    public void startFunctionInstances() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName });
+
+        CmdFunctions.StartFunction stop = cmd.getStarter();
+        assertEquals(fnName, stop.getFunctionName());
+
+        verify(functions, times(1)).startFunction(tenant, namespace, fnName);
+    }
+
+
+    @Test
     public void testGetFunctionStatus() throws Exception {
         String fnName = TEST_NAME + "-function";
         String tenant = "sample";
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 00ce1af..b651ab1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -66,6 +66,7 @@ public class CmdFunctions extends CmdBase {
     private final GetFunctionStats functionStats;
     private final RestartFunction restart;
     private final StopFunction stop;
+    private final StartFunction start;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
     private final TriggerFunction triggerer;
@@ -673,7 +674,7 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Temporary stops function instance. (If 
worker restarts then it reassigns and starts functiona again")
+    @Parameters(commandDescription = "Stops function instance")
     class StopFunction extends FunctionCommand {
 
         @Parameter(names = "--instance-id", description = "The function 
instanceId (stop all instances if instance-id is not provided")
@@ -690,7 +691,28 @@ public class CmdFunctions extends CmdBase {
             } else {
                 admin.functions().stopFunction(tenant, namespace, 
functionName);
             }
-            System.out.println("Restarted successfully");
+            System.out.println("Stopped successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Starts a stopped function instance")
+    class StartFunction extends FunctionCommand {
+
+        @Parameter(names = "--instance-id", description = "The function 
instanceId (start all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.functions().startFunction(tenant, namespace, 
functionName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.functions().startFunction(tenant, namespace, 
functionName);
+            }
+            System.out.println("Started successfully");
         }
     }
 
@@ -882,6 +904,7 @@ public class CmdFunctions extends CmdBase {
         downloader = new DownloadFunction();
         restart = new RestartFunction();
         stop = new StopFunction();
+        start = new StartFunction();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
@@ -889,6 +912,7 @@ public class CmdFunctions extends CmdBase {
         jcommander.addCommand("get", getGetter());
         jcommander.addCommand("restart", getRestarter());
         jcommander.addCommand("stop", getStopper());
+        jcommander.addCommand("start", getStarter());
         // TODO depecreate getstatus
         jcommander.addCommand("status", getStatuser(), "getstatus");
         jcommander.addCommand("stats", getFunctionStats());
@@ -962,6 +986,11 @@ public class CmdFunctions extends CmdBase {
         return stop;
     }
 
+    @VisibleForTesting
+    StartFunction getStarter() {
+        return start;
+    }
+
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig 
functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index dd533ac..00cd26f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -64,6 +64,7 @@ public class CmdSinks extends CmdBase {
     private final GetSink getSink;
     private final GetSinkStatus getSinkStatus;
     private final StopSink stopSink;
+    private final StartSink startSink;
     private final RestartSink restartSink;
     private final LocalSinkRunner localSinkRunner;
 
@@ -76,6 +77,7 @@ public class CmdSinks extends CmdBase {
         getSink = new GetSink();
         getSinkStatus = new GetSinkStatus();
         stopSink = new StopSink();
+        startSink = new StartSink();
         restartSink = new RestartSink();
         localSinkRunner = new LocalSinkRunner();
 
@@ -87,6 +89,7 @@ public class CmdSinks extends CmdBase {
         // TODO deprecate getstatus
         jcommander.addCommand("status", getSinkStatus, "getstatus");
         jcommander.addCommand("stop", stopSink);
+        jcommander.addCommand("start", startSink);
         jcommander.addCommand("restart", restartSink);
         jcommander.addCommand("localrun", localSinkRunner);
         jcommander.addCommand("available-sinks", new ListBuiltInSinks());
@@ -575,7 +578,7 @@ public class CmdSinks extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Temporary stops sink instance. (If 
worker restarts then it reassigns and starts sink again")
+    @Parameters(commandDescription = "Stops sink instance")
     class StopSink extends SinkCommand {
 
         @Parameter(names = "--instance-id", description = "The sink instanceId 
(stop all instances if instance-id is not provided")
@@ -592,7 +595,28 @@ public class CmdSinks extends CmdBase {
             } else {
                 admin.sink().stopSink(tenant, namespace, sinkName);
             }
-            System.out.println("Restarted successfully");
+            System.out.println("Stopped successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Starts sink instance")
+    class StartSink extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId 
(start all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.sink().startSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.sink().startSink(tenant, namespace, sinkName);
+            }
+            System.out.println("Started successfully");
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index c61b69a..c334380 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -68,6 +68,7 @@ public class CmdSources extends CmdBase {
     private final UpdateSource updateSource;
     private final RestartSource restartSource;
     private final StopSource stopSource;
+    private final StartSource startSource;
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
@@ -80,6 +81,7 @@ public class CmdSources extends CmdBase {
         getSourceStatus = new GetSourceStatus();
         restartSource = new RestartSource();
         stopSource = new StopSource();
+        startSource = new StartSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
@@ -90,6 +92,7 @@ public class CmdSources extends CmdBase {
         jcommander.addCommand("status", getSourceStatus, "getstatus");
         jcommander.addCommand("list", listSources);
         jcommander.addCommand("stop", stopSource);
+        jcommander.addCommand("start", startSource);
         jcommander.addCommand("restart", restartSource);
         jcommander.addCommand("localrun", localSourceRunner);
         jcommander.addCommand("available-sources", new ListBuiltInSources());
@@ -529,7 +532,7 @@ public class CmdSources extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Temporary stops source instance. (If 
worker restarts then it reassigns and starts source again")
+    @Parameters(commandDescription = "Stop source instance")
     class StopSource extends SourceCommand {
 
         @Parameter(names = "--instance-id", description = "The source 
instanceId (stop all instances if instance-id is not provided")
@@ -546,7 +549,28 @@ public class CmdSources extends CmdBase {
             } else {
                 admin.source().stopSource(tenant, namespace, sourceName);
             }
-            System.out.println("Restarted successfully");
+            System.out.println("Stopped successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Start source instance")
+    class StartSource extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source 
instanceId (start all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.source().startSource(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.source().startSource(tenant, namespace, sourceName);
+            }
+            System.out.println("Started successfully");
         }
     }
 
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index cb5021b..74457e3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -127,11 +127,16 @@ message PackageLocationMetaData {
     string originalFileName = 2;
 }
 
+enum FunctionState {
+    RUNNING = 0;
+    STOPPED = 1;
+}
 message FunctionMetaData {
     FunctionDetails functionDetails = 1;
     PackageLocationMetaData packageLocation = 2;
     uint64 version = 3;
     uint64 createTime = 4;
+    map<int32, FunctionState> instanceStates = 5;
 }
 
 message Instance {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 920063e..832ed5d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -224,6 +224,42 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
     }
 
     /**
+     * Sends a start/stop function request to the FMT (Function Metadata 
Topic) for a function
+     * @param tenant the tenant the function that needs to be deregistered 
belongs to
+     * @param namespace the namespace the function that needs to be 
deregistered belongs to
+     * @param functionName the name of the function
+     * @param instanceId the instanceId of the function, -1 if for all 
instances
+     * @param start do we need to start or stop
+     * @return a completable future of when the start/stop has been applied
+     */
+    public synchronized CompletableFuture<RequestResult> 
changeFunctionInstanceStatus(String tenant, String namespace, String 
functionName,
+                                                                               
       Integer instanceId, boolean start) {
+        FunctionMetaData functionMetaData = 
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
+
+        FunctionMetaData.Builder builder = functionMetaData.toBuilder()
+                .setVersion(functionMetaData.getVersion() + 1);
+        if (builder.getInstanceStatesMap() == null || 
builder.getInstanceStatesMap().isEmpty()) {
+            for (int i = 0; i < 
functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+                builder.putInstanceStates(i, Function.FunctionState.RUNNING);
+            }
+        }
+        Function.FunctionState state = start ? Function.FunctionState.RUNNING 
: Function.FunctionState.STOPPED;
+        if (instanceId < 0) {
+            for (int i = 0; i < 
functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+                builder.putInstanceStates(i, state);
+            }
+        } else {
+            builder.putInstanceStates(instanceId, state);
+        }
+        FunctionMetaData newFunctionMetaData = builder.build();
+
+        Request.ServiceRequest updateRequest = 
ServiceRequestUtils.getUpdateRequest(
+                this.workerConfig.getWorkerId(), newFunctionMetaData);
+
+        return submit(updateRequest);
+    }
+
+    /**
      * Processes a request received from the FMT (Function Metadata Topic)
      * @param messageId The message id of the request
      * @param serviceRequest The request
@@ -421,6 +457,29 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
         }
     }
 
+    public boolean canChangeState(FunctionMetaData functionMetaData, int 
instanceId, Function.FunctionState newState) {
+        if (instanceId >= 
functionMetaData.getFunctionDetails().getParallelism()) {
+            return false;
+        }
+        if (functionMetaData.getInstanceStatesMap() == null || 
functionMetaData.getInstanceStatesMap().isEmpty()) {
+            // This means that all instances of the functions are running
+            return newState == Function.FunctionState.STOPPED;
+        }
+        if (instanceId >= 0) {
+            if 
(functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
+                return functionMetaData.getInstanceStatesMap().get(instanceId) 
!= newState;
+            } else {
+                return false;
+            }
+        } else {
+            // want to change state for all instances
+            for (Function.FunctionState state : 
functionMetaData.getInstanceStatesMap().values()) {
+                if (state != newState) return true;
+            }
+            return false;
+        }
+    }
+
     private ServiceRequestManager getServiceRequestManager(PulsarClient 
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
         return new 
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index a5a6aa2..94009a6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -201,7 +201,9 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             Map<String, Assignment> assignmentMap = 
workerIdToAssignments.get(this.workerConfig.getWorkerId());
             if (assignmentMap != null) {
                 for (Assignment assignment : assignmentMap.values()) {
-                    startFunctionInstance(assignment);
+                    if (needsStart(assignment)) {
+                        startFunctionInstance(assignment);
+                    }
                 }
             }
             // start assignment tailer
@@ -304,8 +306,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
-    public Response stopFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId,
-            boolean restart, URI uri) throws Exception {
+    public Response restartFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId,
+            URI uri) throws Exception {
         if (runtimeFactory.externallyManaged()) {
             return 
Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData("Externally managed schedulers can't 
do per instance stop")).build();
@@ -321,7 +323,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         final String workerId = this.workerConfig.getWorkerId();
 
         if (assignedWorkerId.equals(workerId)) {
-            
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 restart);
+            
stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 true);
             return Response.status(Status.OK).build();
         } else {
             // query other worker
@@ -346,7 +348,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
-    public Response stopFunctionInstances(String tenant, String namespace, 
String functionName, boolean restart)
+    public Response restartFunctionInstances(String tenant, String namespace, 
String functionName)
             throws Exception {
         final String fullFunctionName = String.format("%s/%s/%s", tenant, 
namespace, functionName);
         Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
@@ -361,7 +363,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             final String workerId = this.workerConfig.getWorkerId();
             String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
             if (assignedWorkerId.equals(workerId)) {
-                stopFunction(fullyQualifiedInstanceId, restart);
+                stopFunction(fullyQualifiedInstanceId, true);
             } else {
                 List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
                 WorkerInfo workerInfo = null;
@@ -377,11 +379,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                             .entity(new ErrorData(fullFunctionName + " has not 
been assigned yet")).build();
                 }
-                if (restart) {
-                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
-                } else {
-                    this.functionAdmin.functions().stopFunction(tenant, 
namespace, functionName);
-                }
+                this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName);
             }
         } else {
             for (Assignment assignment : assignments) {
@@ -389,7 +387,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 final String workerId = this.workerConfig.getWorkerId();
                 String fullyQualifiedInstanceId = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
                 if (assignedWorkerId.equals(workerId)) {
-                    stopFunction(fullyQualifiedInstanceId, restart);
+                    stopFunction(fullyQualifiedInstanceId, true);
                 } else {
                     List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
                     WorkerInfo workerInfo = null;
@@ -404,13 +402,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         }
                         continue;
                     }
-                    if (restart) {
-                        this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
-                                assignment.getInstance().getInstanceId());
-                    } else {
-                        this.functionAdmin.functions().stopFunction(tenant, 
namespace, functionName,
+                    this.functionAdmin.functions().restartFunction(tenant, 
namespace, functionName,
                                 assignment.getInstance().getInstanceId());
-                    }
                 }
             }
         }
@@ -619,7 +612,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 this.insertStopAction(functionRuntimeInfo);
             }
             // still assigned to me, need to restart
-            if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
+            if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) && 
needsStart(assignment)) {
                 //start again
                 FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
                 
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
@@ -687,7 +680,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         this.setAssignment(assignment);
 
         //Assigned to me
-        if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
+        if (assignment.getWorkerId().equals(workerConfig.getWorkerId()) && 
needsStart(assignment)) {
             startFunctionInstance(assignment);
         }
     }
@@ -819,4 +812,27 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String 
fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
+
+    private boolean needsStart(Assignment assignment) {
+        boolean toStart = false;
+        Function.FunctionMetaData functionMetaData = 
assignment.getInstance().getFunctionMetaData();
+        if (functionMetaData.getInstanceStatesMap() == null || 
functionMetaData.getInstanceStatesMap().isEmpty()) {
+            toStart = true;
+        } else {
+            if (assignment.getInstance().getInstanceId() < 0) {
+                // for externally managed functions, insert the start only if 
there is atleast one
+                // instance that needs to be started
+                for (Function.FunctionState state : 
functionMetaData.getInstanceStatesMap().values()) {
+                    if (state == Function.FunctionState.RUNNING) {
+                        toStart = true;
+                    }
+                }
+            } else {
+                if 
(functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(),
 Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
+                    toStart = true;
+                }
+            }
+        }
+        return toStart;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 71825a6..cde2aae 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -167,8 +167,7 @@ public abstract class ComponentImpl {
                 FunctionRuntimeInfo functionRuntimeInfo = 
worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
                         
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
                 if (functionRuntimeInfo == null) {
-                    log.error("{} in get {} Status does not exist @ 
/{}/{}/{}", componentType, componentType, tenant, namespace, name);
-                    throw new RestException(Status.NOT_FOUND, 
String.format("%s %s doesn't exist", componentType, name));
+                    return notRunning(assignedWorkerId, "");
                 }
                 RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
 
@@ -181,7 +180,8 @@ public abstract class ComponentImpl {
                         throw new RuntimeException(e);
                     }
                 } else {
-                    return notRunning(assignedWorkerId, 
functionRuntimeInfo.getStartupException().getMessage());
+                    String message = functionRuntimeInfo.getStartupException() 
!= null ? functionRuntimeInfo.getStartupException().getMessage() : "";
+                    return notRunning(assignedWorkerId, message);
                 }
             } else {
                 // query other worker
@@ -703,7 +703,62 @@ public abstract class ComponentImpl {
                                      final String componentName,
                                      final String instanceId,
                                      final URI uri) {
-        stopFunctionInstance(tenant, namespace, componentName, instanceId, 
false, uri);
+        changeFunctionInstanceStatus(tenant, namespace, componentName, 
instanceId, false, uri);
+    }
+
+    public void startFunctionInstance(final String tenant,
+                                      final String namespace,
+                                      final String componentName,
+                                      final String instanceId,
+                                      final URI uri) {
+        changeFunctionInstanceStatus(tenant, namespace, componentName, 
instanceId, true, uri);
+    }
+
+    public void changeFunctionInstanceStatus(final String tenant,
+                                             final String namespace,
+                                             final String componentName,
+                                             final String instanceId,
+                                             final boolean start,
+                                             final URI uri) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionInstanceRequestParams(tenant, namespace, 
componentName, componentType, instanceId);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid start/stop {} request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, 
namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        if (!functionMetaDataManager.canChangeState(functionMetaData, 
Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : 
Function.FunctionState.STOPPED)) {
+            log.error("Operation not permitted on {}/{}/{}", tenant, 
namespace, componentName);
+            throw new RestException(Status.BAD_REQUEST, 
String.format("Operation not permitted"));
+        }
+
+        try {
+            functionMetaDataManager.changeFunctionInstanceStatus(tenant, 
namespace, componentName,
+                    Integer.parseInt(instanceId), start);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("Failed to start/stop {}: {}/{}/{}/{}", componentType, 
tenant, namespace, componentName, instanceId, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
     }
 
     public void restartFunctionInstance(final String tenant,
@@ -711,16 +766,6 @@ public abstract class ComponentImpl {
                                         final String componentName,
                                         final String instanceId,
                                         final URI uri) {
-        stopFunctionInstance(tenant, namespace, componentName, instanceId, 
true, uri);
-    }
-
-    public void stopFunctionInstance(final String tenant,
-                                     final String namespace,
-                                     final String componentName,
-                                     final String instanceId,
-                                     final boolean restart,
-                                     final URI uri) {
-
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
@@ -747,8 +792,8 @@ public abstract class ComponentImpl {
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
         try {
-            functionRuntimeManager.stopFunctionInstance(tenant, namespace, 
componentName,
-                    Integer.parseInt(instanceId), restart, uri);
+            functionRuntimeManager.restartFunctionInstance(tenant, namespace, 
componentName,
+                    Integer.parseInt(instanceId), uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
@@ -760,20 +805,62 @@ public abstract class ComponentImpl {
     public void stopFunctionInstances(final String tenant,
                                       final String namespace,
                                       final String componentName) {
-        stopFunctionInstances(tenant, namespace, componentName, false);
+        changeFunctionStatusAllInstances(tenant, namespace, componentName, 
false);
     }
 
-    public void restartFunctionInstances(final String tenant,
-                                         final String namespace,
-                                         final String componentName) {
-        stopFunctionInstances(tenant, namespace, componentName, true);
+    public void startFunctionInstances(final String tenant,
+                                       final String namespace,
+                                       final String componentName) {
+        changeFunctionStatusAllInstances(tenant, namespace, componentName, 
true);
     }
 
-    public void stopFunctionInstances(final String tenant,
-                                      final String namespace,
-                                      final String componentName,
-                                      final boolean restart) {
+    public void changeFunctionStatusAllInstances(final String tenant,
+                                                 final String namespace,
+                                                 final String componentName,
+                                                 final boolean start) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, 
componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid start/stop {} request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
+            log.error("{} in stopFunctionInstances does not exist @ 
/{}/{}/{}", componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, componentType);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", componentType, componentName));
+        }
+
+        if (!functionMetaDataManager.canChangeState(functionMetaData, -1, 
start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+            log.error("Operation not permitted on {}/{}/{}", tenant, 
namespace, componentName);
+            throw new RestException(Status.BAD_REQUEST, 
String.format("Operation not permitted"));
+        }
 
+        try {
+            functionMetaDataManager.changeFunctionInstanceStatus(tenant, 
namespace, componentName, -1, start);
+        } catch (WebApplicationException we) {
+            throw we;
+        } catch (Exception e) {
+            log.error("Failed to start/stop {}: {}/{}/{}", componentType, 
tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+    }
+
+    public void restartFunctionInstances(final String tenant,
+                                         final String namespace,
+                                         final String componentName) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
@@ -800,7 +887,7 @@ public abstract class ComponentImpl {
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
         try {
-            functionRuntimeManager.stopFunctionInstances(tenant, namespace, 
componentName, restart);
+            functionRuntimeManager.restartFunctionInstances(tenant, namespace, 
componentName);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 2eebf62..4d27134 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -260,6 +260,37 @@ public class FunctionApiV3Resource extends 
FunctionApiResource {
     }
 
     @POST
+    @ApiOperation(value = "Start function instance", response = Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startFunction(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("functionName") String 
functionName,
+                              final @PathParam("instanceId") String 
instanceId) {
+        functions.startFunctionInstance(tenant, namespace, functionName, 
instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all function instances", response = 
Void.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startFunction(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("functionName") String 
functionName) {
+        functions.startFunctionInstances(tenant, namespace, functionName);
+    }
+
+    @POST
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void uploadFunction(final @FormDataParam("data") InputStream 
uploadedInputStream,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index 6a8f25e..ee7d1a4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -195,6 +195,33 @@ public class SinkApiV3Resource extends FunctionApiResource 
{
         sink.stopFunctionInstances(tenant, namespace, sinkName);
     }
 
+    @POST
+    @ApiOperation(value = "Start sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSink(final @PathParam("tenant") String tenant,
+                          final @PathParam("namespace") String namespace,
+                          final @PathParam("sinkName") String sinkName,
+                          final @PathParam("instanceId") String instanceId) {
+        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, 
this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSink(final @PathParam("tenant") String tenant,
+                          final @PathParam("namespace") String namespace,
+                          final @PathParam("sinkName") String sinkName) {
+        sink.startFunctionInstances(tenant, namespace, sinkName);
+    }
+
     @GET
     @Path("/builtinsinks")
     public List<ConnectorDefinition> getSinkList() {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index 8675cc5..c532e3a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -198,6 +198,33 @@ public class SourceApiV3Resource extends 
FunctionApiResource {
         source.stopFunctionInstances(tenant, namespace, sourceName);
     }
 
+    @POST
+    @ApiOperation(value = "Start source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSource(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sourceName") String sourceName,
+                            final @PathParam("instanceId") String instanceId) {
+        source.startFunctionInstance(tenant, namespace, sourceName, 
instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Start all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/start")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void startSource(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sourceName") String sourceName) {
+        source.startFunctionInstances(tenant, namespace, sourceName);
+    }
+
     @GET
     @Path("/builtinsources")
     public List<ConnectorDefinition> getSourceList() {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 7fc7c7f..75e83bf 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -177,6 +177,68 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
+    public void testStopFunction() throws PulsarClientException {
+
+        long version = 5;
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        FunctionMetaDataManager functionMetaDataManager = spy(
+                new FunctionMetaDataManager(workerConfig,
+                        mock(SchedulerManager.class),
+                        mockPulsarClient()));
+
+        Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new 
HashMap<>();
+        Function.FunctionMetaData f1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+        functionMetaDataMap1.put("func-1", f1);
+
+        Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0, 
Function.FunctionState.STOPPED));
+        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0, 
Function.FunctionState.RUNNING));
+        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, 
Function.FunctionState.STOPPED));
+        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, 
Function.FunctionState.RUNNING));
+
+        functionMetaDataManager.functionMetaDataMap.put("tenant-1", new 
HashMap<>());
+        
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", 
functionMetaDataMap1);
+
+        
Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
+
+        functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", 
"namespace-1", "func-1", 0, false);
+
+        verify(functionMetaDataManager, 
times(1)).submit(any(Request.ServiceRequest.class));
+        verify(functionMetaDataManager).submit(argThat(new 
ArgumentMatcher<Request.ServiceRequest>() {
+            @Override
+            public boolean matches(Object o) {
+                if (o instanceof Request.ServiceRequest) {
+                    Request.ServiceRequest serviceRequest = 
(Request.ServiceRequest) o;
+                    if 
(!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false;
+                    if (!serviceRequest.getServiceRequestType().equals(
+                            Request.ServiceRequest.ServiceRequestType.UPDATE)) 
{
+                        return false;
+                    }
+                    if 
(!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails()))
 {
+                        return false;
+                    }
+                    if (serviceRequest.getFunctionMetaData().getVersion() != 
(version + 1)) {
+                        return false;
+                    }
+                    Map<Integer, Function.FunctionState> stateMap = 
serviceRequest.getFunctionMetaData().getInstanceStatesMap();
+                    if (stateMap == null || stateMap.isEmpty()) {
+                        return false;
+                    }
+                    if (stateMap.get(1) != Function.FunctionState.RUNNING) {
+                        return false;
+                    }
+                    if (stateMap.get(0) != Function.FunctionState.STOPPED) {
+                        return false;
+                    }
+                    return true;
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Test
     public void deregisterFunction() throws PulsarClientException {
         long version = 5;
         WorkerConfig workerConfig = new WorkerConfig();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 17d6642..3ba667b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -371,6 +371,59 @@ public class FunctionRuntimeManagerTest {
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
+
+        reset(functionRuntimeManager);
+        functionRuntimeManager.actionQueue.clear();
+
+        // add a stop
+        Function.FunctionMetaData.Builder function2StoppedBldr = 
function2.toBuilder();
+        function2StoppedBldr.putInstanceStates(0, 
Function.FunctionState.STOPPED);
+        Function.FunctionMetaData function2Stopped = 
function2StoppedBldr.build();
+
+        Function.Assignment assignment4 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        
.setFunctionMetaData(function2Stopped).setInstanceId(0).build())
+                .build();
+
+        functionRuntimeManager.processAssignment(assignment4);
+
+        verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+        // make sure terminate is not called since this is a update operation
+        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+
+        verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+            @Override
+            public boolean matches(Object o) {
+                if (o instanceof FunctionRuntimeInfo) {
+                    FunctionRuntimeInfo functionRuntimeInfo = 
(FunctionRuntimeInfo) o;
+
+                    if 
(!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2))
 {
+                        return false;
+                    }
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
+
+        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
+        Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
+                new FunctionAction()
+                        .setAction(FunctionAction.Action.STOP)
+                        .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
+                                
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
+                                        .build()))));
+
+        
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
+        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
+        Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
+                .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment1);
+        Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
+                .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment4);
+
     }
 
     @Test

Reply via email to