This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 020a1d5 Fixed the behavior of Function start/stop (#3477) 020a1d5 is described below commit 020a1d57e122582a5ad8bd043f278e4a92d4ffc1 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Jan 31 16:47:39 2019 -0800 Fixed the behavior of Function start/stop (#3477) * Added a state in the function metadata about what the state of the instances should be * Have start api for sources/sinks * Add missing pieces * more checks while handling request * Fixed bugs * Added unittests * Added unittest * Fix the all instances side logic --- .../pulsar/broker/admin/impl/FunctionsBase.java | 31 +++++ .../apache/pulsar/broker/admin/impl/SinkBase.java | 31 +++++ .../pulsar/broker/admin/impl/SourceBase.java | 29 +++++ .../org/apache/pulsar/client/admin/Functions.java | 34 +++++ .../java/org/apache/pulsar/client/admin/Sink.java | 34 +++++ .../org/apache/pulsar/client/admin/Source.java | 34 +++++ .../client/admin/internal/FunctionsImpl.java | 21 ++++ .../pulsar/client/admin/internal/SinkImpl.java | 21 ++++ .../pulsar/client/admin/internal/SourceImpl.java | 21 ++++ .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 29 +++++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 33 ++++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 ++++- .../org/apache/pulsar/admin/cli/CmdSources.java | 28 ++++- .../proto/src/main/proto/Function.proto | 5 + .../functions/worker/FunctionMetaDataManager.java | 59 +++++++++ .../functions/worker/FunctionRuntimeManager.java | 56 ++++++--- .../functions/worker/rest/api/ComponentImpl.java | 139 +++++++++++++++++---- .../worker/rest/api/v3/FunctionApiV3Resource.java | 31 +++++ .../worker/rest/api/v3/SinkApiV3Resource.java | 27 ++++ .../worker/rest/api/v3/SourceApiV3Resource.java | 27 ++++ .../worker/FunctionMetaDataManagerTest.java | 62 +++++++++ .../worker/FunctionRuntimeManagerTest.java | 53 ++++++++ 22 files changed, 781 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 73af2c5..9b88f29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -338,6 +338,37 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi } @POST + @ApiOperation(value = "Start function instance", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all function instances", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + functions.startFunctionInstances(tenant, namespace, functionName); + } + + @POST @ApiOperation( value = "Uploads Pulsar Function file data", hidden = true diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java index 41c5376..2bd22a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java @@ -256,6 +256,37 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { sink.stopFunctionInstances(tenant, namespace, sinkName); } + @POST + @ApiOperation(value = "Start sink instance", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all sink instances", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{sinkName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.startFunctionInstances(tenant, namespace, sinkName); + } + @GET @ApiOperation( value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java index c4a102b..0e8348f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java @@ -251,6 +251,35 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> source.stopFunctionInstances(tenant, namespace, sourceName); } + @POST + @ApiOperation(value = "Start source instance", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all source instances", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.startFunctionInstances(tenant, namespace, sourceName); + } + @GET @ApiOperation( value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode", diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 2bd2e9f..481c5fd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -285,6 +285,39 @@ public interface Functions { void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; /** + * Start all function instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @throws PulsarAdminException + * Unexpected error + */ + void startFunction(String tenant, String namespace, String function) throws PulsarAdminException; + + /** + * Start function instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void startFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; + + /** * Stop all function instances * * @param tenant @@ -299,6 +332,7 @@ public interface Functions { */ void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException; + /** * Triggers the function by writing to the input topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java index 9f2d9ab..2b924f6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java @@ -264,6 +264,40 @@ public interface Sink { void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException; /** + * Start sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Start all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + + /** * Fetches a list of supported Pulsar IO sinks currently running in cluster mode * * @throws PulsarAdminException diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java index 989598a..706150b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java @@ -264,6 +264,40 @@ public interface Source { void stopSource(String tenant, String namespace, String source) throws PulsarAdminException; /** + * Start source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Start all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void startSource(String tenant, String namespace, String source) throws PulsarAdminException; + + + /** * Fetches a list of supported Pulsar IO sources currently running in cluster mode * * @throws PulsarAdminException diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 130704e..ac8d60d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -291,6 +291,27 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override + public void startFunction(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void startFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path("start")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java index a9f99b8..48a75e4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java @@ -233,6 +233,27 @@ public class SinkImpl extends BaseResource implements Sink { } @Override + public void startSink(String tenant, String namespace, String sinkName, int instanceId) + throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId)) + .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void startSink(String tenant, String namespace, String sinkName) throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(sinkName).path("start")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException { try { Response response = request(sink.path("builtinsinks")).get(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java index 2d066e0..1a56dc4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java @@ -233,6 +233,27 @@ public class SourceImpl extends BaseResource implements Source { } @Override + public void startSource(String tenant, String namespace, String sourceName, int instanceId) + throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId)) + .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void startSource(String tenant, String namespace, String sourceName) throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(sourceName).path("start")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException { try { Response response = request(source.path("builtinsources")).get(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 950ee94..a64c38c 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -254,6 +254,35 @@ public class CmdFunctionsTest { } @Test + public void startFunction() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + int instanceId = 0; + cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", namespace, "--name", fnName, + "--instance-id", Integer.toString(instanceId)}); + + CmdFunctions.StartFunction stop = cmd.getStarter(); + assertEquals(fnName, stop.getFunctionName()); + + verify(functions, times(1)).startFunction(tenant, namespace, fnName, instanceId); + } + + @Test + public void startFunctionInstances() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + cmd.run(new String[] { "start", "--tenant", tenant, "--namespace", namespace, "--name", fnName }); + + CmdFunctions.StartFunction stop = cmd.getStarter(); + assertEquals(fnName, stop.getFunctionName()); + + verify(functions, times(1)).startFunction(tenant, namespace, fnName); + } + + + @Test public void testGetFunctionStatus() throws Exception { String fnName = TEST_NAME + "-function"; String tenant = "sample"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 00ce1af..b651ab1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -66,6 +66,7 @@ public class CmdFunctions extends CmdBase { private final GetFunctionStats functionStats; private final RestartFunction restart; private final StopFunction stop; + private final StartFunction start; private final ListFunctions lister; private final StateGetter stateGetter; private final TriggerFunction triggerer; @@ -673,7 +674,7 @@ public class CmdFunctions extends CmdBase { } } - @Parameters(commandDescription = "Temporary stops function instance. (If worker restarts then it reassigns and starts functiona again") + @Parameters(commandDescription = "Stops function instance") class StopFunction extends FunctionCommand { @Parameter(names = "--instance-id", description = "The function instanceId (stop all instances if instance-id is not provided") @@ -690,7 +691,28 @@ public class CmdFunctions extends CmdBase { } else { admin.functions().stopFunction(tenant, namespace, functionName); } - System.out.println("Restarted successfully"); + System.out.println("Stopped successfully"); + } + } + + @Parameters(commandDescription = "Starts a stopped function instance") + class StartFunction extends FunctionCommand { + + @Parameter(names = "--instance-id", description = "The function instanceId (start all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.functions().startFunction(tenant, namespace, functionName); + } + System.out.println("Started successfully"); } } @@ -882,6 +904,7 @@ public class CmdFunctions extends CmdBase { downloader = new DownloadFunction(); restart = new RestartFunction(); stop = new StopFunction(); + start = new StartFunction(); jcommander.addCommand("localrun", getLocalRunner()); jcommander.addCommand("create", getCreater()); jcommander.addCommand("delete", getDeleter()); @@ -889,6 +912,7 @@ public class CmdFunctions extends CmdBase { jcommander.addCommand("get", getGetter()); jcommander.addCommand("restart", getRestarter()); jcommander.addCommand("stop", getStopper()); + jcommander.addCommand("start", getStarter()); // TODO depecreate getstatus jcommander.addCommand("status", getStatuser(), "getstatus"); jcommander.addCommand("stats", getFunctionStats()); @@ -962,6 +986,11 @@ public class CmdFunctions extends CmdBase { return stop; } + @VisibleForTesting + StartFunction getStarter() { + return start; + } + private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index dd533ac..00cd26f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -64,6 +64,7 @@ public class CmdSinks extends CmdBase { private final GetSink getSink; private final GetSinkStatus getSinkStatus; private final StopSink stopSink; + private final StartSink startSink; private final RestartSink restartSink; private final LocalSinkRunner localSinkRunner; @@ -76,6 +77,7 @@ public class CmdSinks extends CmdBase { getSink = new GetSink(); getSinkStatus = new GetSinkStatus(); stopSink = new StopSink(); + startSink = new StartSink(); restartSink = new RestartSink(); localSinkRunner = new LocalSinkRunner(); @@ -87,6 +89,7 @@ public class CmdSinks extends CmdBase { // TODO deprecate getstatus jcommander.addCommand("status", getSinkStatus, "getstatus"); jcommander.addCommand("stop", stopSink); + jcommander.addCommand("start", startSink); jcommander.addCommand("restart", restartSink); jcommander.addCommand("localrun", localSinkRunner); jcommander.addCommand("available-sinks", new ListBuiltInSinks()); @@ -575,7 +578,7 @@ public class CmdSinks extends CmdBase { } } - @Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again") + @Parameters(commandDescription = "Stops sink instance") class StopSink extends SinkCommand { @Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided") @@ -592,7 +595,28 @@ public class CmdSinks extends CmdBase { } else { admin.sink().stopSink(tenant, namespace, sinkName); } - System.out.println("Restarted successfully"); + System.out.println("Stopped successfully"); + } + } + + @Parameters(commandDescription = "Starts sink instance") + class StartSink extends SinkCommand { + + @Parameter(names = "--instance-id", description = "The sink instanceId (start all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.sink().startSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.sink().startSink(tenant, namespace, sinkName); + } + System.out.println("Started successfully"); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index c61b69a..c334380 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -68,6 +68,7 @@ public class CmdSources extends CmdBase { private final UpdateSource updateSource; private final RestartSource restartSource; private final StopSource stopSource; + private final StartSource startSource; private final LocalSourceRunner localSourceRunner; public CmdSources(PulsarAdmin admin) { @@ -80,6 +81,7 @@ public class CmdSources extends CmdBase { getSourceStatus = new GetSourceStatus(); restartSource = new RestartSource(); stopSource = new StopSource(); + startSource = new StartSource(); localSourceRunner = new LocalSourceRunner(); jcommander.addCommand("create", createSource); @@ -90,6 +92,7 @@ public class CmdSources extends CmdBase { jcommander.addCommand("status", getSourceStatus, "getstatus"); jcommander.addCommand("list", listSources); jcommander.addCommand("stop", stopSource); + jcommander.addCommand("start", startSource); jcommander.addCommand("restart", restartSource); jcommander.addCommand("localrun", localSourceRunner); jcommander.addCommand("available-sources", new ListBuiltInSources()); @@ -529,7 +532,7 @@ public class CmdSources extends CmdBase { } } - @Parameters(commandDescription = "Temporary stops source instance. (If worker restarts then it reassigns and starts source again") + @Parameters(commandDescription = "Stop source instance") class StopSource extends SourceCommand { @Parameter(names = "--instance-id", description = "The source instanceId (stop all instances if instance-id is not provided") @@ -546,7 +549,28 @@ public class CmdSources extends CmdBase { } else { admin.source().stopSource(tenant, namespace, sourceName); } - System.out.println("Restarted successfully"); + System.out.println("Stopped successfully"); + } + } + + @Parameters(commandDescription = "Start source instance") + class StartSource extends SourceCommand { + + @Parameter(names = "--instance-id", description = "The source instanceId (start all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.source().startSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.source().startSource(tenant, namespace, sourceName); + } + System.out.println("Started successfully"); } } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index cb5021b..74457e3 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -127,11 +127,16 @@ message PackageLocationMetaData { string originalFileName = 2; } +enum FunctionState { + RUNNING = 0; + STOPPED = 1; +} message FunctionMetaData { FunctionDetails functionDetails = 1; PackageLocationMetaData packageLocation = 2; uint64 version = 3; uint64 createTime = 4; + map<int32, FunctionState> instanceStates = 5; } message Instance { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 920063e..832ed5d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -224,6 +224,42 @@ public class FunctionMetaDataManager implements AutoCloseable { } /** + * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function + * @param tenant the tenant the function that needs to be deregistered belongs to + * @param namespace the namespace the function that needs to be deregistered belongs to + * @param functionName the name of the function + * @param instanceId the instanceId of the function, -1 if for all instances + * @param start do we need to start or stop + * @return a completable future of when the start/stop has been applied + */ + public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName, + Integer instanceId, boolean start) { + FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName); + + FunctionMetaData.Builder builder = functionMetaData.toBuilder() + .setVersion(functionMetaData.getVersion() + 1); + if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) { + for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) { + builder.putInstanceStates(i, Function.FunctionState.RUNNING); + } + } + Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED; + if (instanceId < 0) { + for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) { + builder.putInstanceStates(i, state); + } + } else { + builder.putInstanceStates(instanceId, state); + } + FunctionMetaData newFunctionMetaData = builder.build(); + + Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest( + this.workerConfig.getWorkerId(), newFunctionMetaData); + + return submit(updateRequest); + } + + /** * Processes a request received from the FMT (Function Metadata Topic) * @param messageId The message id of the request * @param serviceRequest The request @@ -421,6 +457,29 @@ public class FunctionMetaDataManager implements AutoCloseable { } } + public boolean canChangeState(FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) { + if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) { + return false; + } + if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) { + // This means that all instances of the functions are running + return newState == Function.FunctionState.STOPPED; + } + if (instanceId >= 0) { + if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) { + return functionMetaData.getInstanceStatesMap().get(instanceId) != newState; + } else { + return false; + } + } else { + // want to change state for all instances + for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) { + if (state != newState) return true; + } + return false; + } + } + private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException { return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index a5a6aa2..94009a6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -201,7 +201,9 @@ public class FunctionRuntimeManager implements AutoCloseable{ Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId()); if (assignmentMap != null) { for (Assignment assignment : assignmentMap.values()) { - startFunctionInstance(assignment); + if (needsStart(assignment)) { + startFunctionInstance(assignment); + } } } // start assignment tailer @@ -304,8 +306,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } - public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId, - boolean restart, URI uri) throws Exception { + public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId, + URI uri) throws Exception { if (runtimeFactory.externallyManaged()) { return Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON) .entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build(); @@ -321,7 +323,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ final String workerId = this.workerConfig.getWorkerId(); if (assignedWorkerId.equals(workerId)) { - stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart); + stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), true); return Response.status(Status.OK).build(); } else { // query other worker @@ -346,7 +348,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } - public Response stopFunctionInstances(String tenant, String namespace, String functionName, boolean restart) + public Response restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception { final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName); Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -361,7 +363,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ final String workerId = this.workerConfig.getWorkerId(); String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (assignedWorkerId.equals(workerId)) { - stopFunction(fullyQualifiedInstanceId, restart); + stopFunction(fullyQualifiedInstanceId, true); } else { List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership(); WorkerInfo workerInfo = null; @@ -377,11 +379,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build(); } - if (restart) { - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); - } else { - this.functionAdmin.functions().stopFunction(tenant, namespace, functionName); - } + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); } } else { for (Assignment assignment : assignments) { @@ -389,7 +387,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ final String workerId = this.workerConfig.getWorkerId(); String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (assignedWorkerId.equals(workerId)) { - stopFunction(fullyQualifiedInstanceId, restart); + stopFunction(fullyQualifiedInstanceId, true); } else { List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership(); WorkerInfo workerInfo = null; @@ -404,13 +402,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ } continue; } - if (restart) { - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); - } else { - this.functionAdmin.functions().stopFunction(tenant, namespace, functionName, + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, assignment.getInstance().getInstanceId()); - } } } } @@ -619,7 +612,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.insertStopAction(functionRuntimeInfo); } // still assigned to me, need to restart - if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { + if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) && needsStart(assignment)) { //start again FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); @@ -687,7 +680,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.setAssignment(assignment); //Assigned to me - if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { + if (assignment.getWorkerId().equals(workerConfig.getWorkerId()) && needsStart(assignment)) { startFunctionInstance(assignment); } } @@ -819,4 +812,27 @@ public class FunctionRuntimeManager implements AutoCloseable{ private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String fullyQualifiedInstanceId) { return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); } + + private boolean needsStart(Assignment assignment) { + boolean toStart = false; + Function.FunctionMetaData functionMetaData = assignment.getInstance().getFunctionMetaData(); + if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) { + toStart = true; + } else { + if (assignment.getInstance().getInstanceId() < 0) { + // for externally managed functions, insert the start only if there is atleast one + // instance that needs to be started + for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) { + if (state == Function.FunctionState.RUNNING) { + toStart = true; + } + } + } else { + if (functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(), Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) { + toStart = true; + } + } + } + return toStart; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 71825a6..cde2aae 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -167,8 +167,7 @@ public abstract class ComponentImpl { FunctionRuntimeInfo functionRuntimeInfo = worker().getFunctionRuntimeManager().getFunctionRuntimeInfo( org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance())); if (functionRuntimeInfo == null) { - log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, name); - throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, name)); + return notRunning(assignedWorkerId, ""); } RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); @@ -181,7 +180,8 @@ public abstract class ComponentImpl { throw new RuntimeException(e); } } else { - return notRunning(assignedWorkerId, functionRuntimeInfo.getStartupException().getMessage()); + String message = functionRuntimeInfo.getStartupException() != null ? functionRuntimeInfo.getStartupException().getMessage() : ""; + return notRunning(assignedWorkerId, message); } } else { // query other worker @@ -703,7 +703,62 @@ public abstract class ComponentImpl { final String componentName, final String instanceId, final URI uri) { - stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri); + changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri); + } + + public void startFunctionInstance(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final URI uri) { + changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri); + } + + public void changeFunctionInstanceStatus(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final boolean start, + final URI uri) { + + if (!isWorkerServiceAvailable()) { + throwUnavailableException(); + } + + // validate parameters + try { + validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); + } catch (IllegalArgumentException e) { + log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + throw new RestException(Status.BAD_REQUEST, e.getMessage()); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); + } + + if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) { + log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName); + throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted")); + } + + try { + functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, + Integer.parseInt(instanceId), start); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("Failed to start/stop {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } } public void restartFunctionInstance(final String tenant, @@ -711,16 +766,6 @@ public abstract class ComponentImpl { final String componentName, final String instanceId, final URI uri) { - stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri); - } - - public void stopFunctionInstance(final String tenant, - final String namespace, - final String componentName, - final String instanceId, - final boolean restart, - final URI uri) { - if (!isWorkerServiceAvailable()) { throwUnavailableException(); } @@ -747,8 +792,8 @@ public abstract class ComponentImpl { FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName, - Integer.parseInt(instanceId), restart, uri); + functionRuntimeManager.restartFunctionInstance(tenant, namespace, componentName, + Integer.parseInt(instanceId), uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { @@ -760,20 +805,62 @@ public abstract class ComponentImpl { public void stopFunctionInstances(final String tenant, final String namespace, final String componentName) { - stopFunctionInstances(tenant, namespace, componentName, false); + changeFunctionStatusAllInstances(tenant, namespace, componentName, false); } - public void restartFunctionInstances(final String tenant, - final String namespace, - final String componentName) { - stopFunctionInstances(tenant, namespace, componentName, true); + public void startFunctionInstances(final String tenant, + final String namespace, + final String componentName) { + changeFunctionStatusAllInstances(tenant, namespace, componentName, true); } - public void stopFunctionInstances(final String tenant, - final String namespace, - final String componentName, - final boolean restart) { + public void changeFunctionStatusAllInstances(final String tenant, + final String namespace, + final String componentName, + final boolean start) { + + if (!isWorkerServiceAvailable()) { + throwUnavailableException(); + } + + // validate parameters + try { + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); + } catch (IllegalArgumentException e) { + log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + throw new RestException(Status.BAD_REQUEST, e.getMessage()); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName)); + } + + if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) { + log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName); + throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted")); + } + try { + functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, -1, start); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("Failed to start/stop {}: {}/{}/{}", componentType, tenant, namespace, componentName, e); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + + public void restartFunctionInstances(final String tenant, + final String namespace, + final String componentName) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } @@ -800,7 +887,7 @@ public abstract class ComponentImpl { FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart); + functionRuntimeManager.restartFunctionInstances(tenant, namespace, componentName); } catch (WebApplicationException we) { throw we; } catch (Exception e) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java index 2eebf62..4d27134 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java @@ -260,6 +260,37 @@ public class FunctionApiV3Resource extends FunctionApiResource { } @POST + @ApiOperation(value = "Start function instance", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + functions.startFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all function instances", response = Void.class) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + functions.startFunctionInstances(tenant, namespace, functionName); + } + + @POST @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java index 6a8f25e..ee7d1a4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java @@ -195,6 +195,33 @@ public class SinkApiV3Resource extends FunctionApiResource { sink.stopFunctionInstances(tenant, namespace, sinkName); } + @POST + @ApiOperation(value = "Start sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + sink.startFunctionInstances(tenant, namespace, sinkName); + } + @GET @Path("/builtinsinks") public List<ConnectorDefinition> getSinkList() { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java index 8675cc5..c532e3a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java @@ -198,6 +198,33 @@ public class SourceApiV3Resource extends FunctionApiResource { source.stopFunctionInstances(tenant, namespace, sourceName); } + @POST + @ApiOperation(value = "Start source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Start all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/start") + @Consumes(MediaType.APPLICATION_JSON) + public void startSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + source.startFunctionInstances(tenant, namespace, sourceName); + } + @GET @Path("/builtinsources") public List<ConnectorDefinition> getSourceList() { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index 7fc7c7f..75e83bf 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -177,6 +177,68 @@ public class FunctionMetaDataManagerTest { } @Test + public void testStopFunction() throws PulsarClientException { + + long version = 5; + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + FunctionMetaDataManager functionMetaDataManager = spy( + new FunctionMetaDataManager(workerConfig, + mock(SchedulerManager.class), + mockPulsarClient())); + + Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>(); + Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build(); + functionMetaDataMap1.put("func-1", f1); + + Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.STOPPED)); + Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.RUNNING)); + Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.STOPPED)); + Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.RUNNING)); + + functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>()); + functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1); + + Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class)); + + functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", "namespace-1", "func-1", 0, false); + + verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class)); + verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() { + @Override + public boolean matches(Object o) { + if (o instanceof Request.ServiceRequest) { + Request.ServiceRequest serviceRequest = (Request.ServiceRequest) o; + if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) return false; + if (!serviceRequest.getServiceRequestType().equals( + Request.ServiceRequest.ServiceRequestType.UPDATE)) { + return false; + } + if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) { + return false; + } + if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) { + return false; + } + Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap(); + if (stateMap == null || stateMap.isEmpty()) { + return false; + } + if (stateMap.get(1) != Function.FunctionState.RUNNING) { + return false; + } + if (stateMap.get(0) != Function.FunctionState.STOPPED) { + return false; + } + return true; + } + return false; + } + })); + } + + @Test public void deregisterFunction() throws PulsarClientException { long version = 5; WorkerConfig workerConfig = new WorkerConfig(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 17d6642..3ba667b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -371,6 +371,59 @@ public class FunctionRuntimeManagerTest { .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); + + reset(functionRuntimeManager); + functionRuntimeManager.actionQueue.clear(); + + // add a stop + Function.FunctionMetaData.Builder function2StoppedBldr = function2.toBuilder(); + function2StoppedBldr.putInstanceStates(0, Function.FunctionState.STOPPED); + Function.FunctionMetaData function2Stopped = function2StoppedBldr.build(); + + Function.Assignment assignment4 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Stopped).setInstanceId(0).build()) + .build(); + + functionRuntimeManager.processAssignment(assignment4); + + verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); + // make sure terminate is not called since this is a update operation + verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); + + verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + @Override + public boolean matches(Object o) { + if (o instanceof FunctionRuntimeInfo) { + FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o; + + if (!functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)) { + return false; + } + return true; + } + return false; + } + })); + + verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); + + Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); + Assert.assertTrue(functionRuntimeManager.actionQueue.contains( + new FunctionAction() + .setAction(FunctionAction.Action.STOP) + .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( + Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) + .build())))); + + Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2); + Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); + Assert.assertEquals(functionRuntimeManager.workerIdToAssignments + .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1); + Assert.assertEquals(functionRuntimeManager.workerIdToAssignments + .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment4); + } @Test