sijie closed pull request #2415: Add admin api to support stop function URL: https://github.com/apache/incubator-pulsar/pull/2415
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index b8891e5a92..4384f50688 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -292,6 +292,31 @@ public Response restartFunction(final @PathParam("tenant") String tenant, return functions.restartFunctionInstances(tenant, namespace, functionName); } + @POST + @ApiOperation(value = "Stop function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Stop all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.stopFunctionInstances(tenant, namespace, functionName); + } + @POST @ApiOperation( value = "Uploads Pulsar Function file data", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 1306f13335..730296341b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -509,7 +509,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception { } @Test(timeOut = 20000) - public void testFunctionRestartApi() throws Exception { + public void testFunctionStopAndRestartApi() throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; @@ -543,6 +543,21 @@ public void testFunctionRestartApi() throws Exception { SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); assertEquals(subStats.consumers.size(), 1); + // it should stop consumer : so, check none of the consumer connected on subscription + admin.functions().stopFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + SubscriptionStats subStat = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStat != null && subStat.consumers.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + assertEquals(subStats.consumers.size(), 0); + // it should restart consumer : so, check if consumer came up again after restarting function admin.functions().restartFunction(tenant, namespacePortion, functionName); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 4525c51b81..7d16ff52b8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -215,6 +215,40 @@ */ void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException; + + /** + * Stop function instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; + + /** + * Stop all function instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException; + /** * Triggers the function by writing to the input topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 402b5d3971..4552ebae9a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -236,6 +236,27 @@ public void restartFunction(String tenant, String namespace, String functionName } } + @Override + public void stopFunction(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void stopFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path("stop")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 11a3c7a0b9..97e8633fb0 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction; +import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; import org.apache.pulsar.admin.cli.CmdSources.CreateSource; @@ -243,6 +244,34 @@ public void restartFunctionInstances() throws Exception { verify(functions, times(1)).restartFunction(tenant, namespace, fnName); } + @Test + public void stopFunction() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + int instanceId = 0; + cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", namespace, "--name", fnName, + "--instance-id", Integer.toString(instanceId)}); + + StopFunction stop = cmd.getStopper(); + assertEquals(fnName, stop.getFunctionName()); + + verify(functions, times(1)).stopFunction(tenant, namespace, fnName, instanceId); + } + + @Test + public void stopFunctionInstances() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", namespace, "--name", fnName }); + + StopFunction stop = cmd.getStopper(); + assertEquals(fnName, stop.getFunctionName()); + + verify(functions, times(1)).stopFunction(tenant, namespace, fnName); + } + @Test public void testCreateFunctionWithHttpUrl() throws Exception { String fnName = TEST_NAME + "-function"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 5f52313d57..a23bc7b14b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -104,6 +104,7 @@ private final GetFunction getter; private final GetFunctionStatus functionStatus; private final RestartFunction restart; + private final StopFunction stop; private final ListFunctions lister; private final StateGetter stateGetter; private final TriggerFunction triggerer; @@ -853,7 +854,28 @@ void runCmd() throws Exception { System.out.println("Restarted successfully"); } } - + + @Parameters(commandDescription = "Temporary stops function instance. (If worker restarts then it reassigns and starts functiona again") + class StopFunction extends FunctionCommand { + + @Parameter(names = "--instance-id", description = "The function instanceId (stop all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.functions().stopFunction(tenant, namespace, functionName); + } + System.out.println("Restarted successfully"); + } + } + @Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster") class DeleteFunction extends FunctionCommand { @Override @@ -1066,12 +1088,14 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { downloader = new DownloadFunction(); cluster = new GetCluster(); restart = new RestartFunction(); + stop = new StopFunction(); jcommander.addCommand("localrun", getLocalRunner()); jcommander.addCommand("create", getCreater()); jcommander.addCommand("delete", getDeleter()); jcommander.addCommand("update", getUpdater()); jcommander.addCommand("get", getGetter()); jcommander.addCommand("restart", getRestarter()); + jcommander.addCommand("stop", getStopper()); jcommander.addCommand("getstatus", getStatuser()); jcommander.addCommand("list", getLister()); jcommander.addCommand("querystate", getStateGetter()); @@ -1139,6 +1163,11 @@ RestartFunction getRestarter() { return restart; } + @VisibleForTesting + StopFunction getStopper() { + return stop; + } + private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 121a454a91..1016171502 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -90,7 +90,7 @@ private MembershipManager membershipManager; private final ConnectorsManager connectorsManager; - + public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Namespace dlogNamespace, @@ -326,7 +326,8 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { return functionStatus; } - public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId) throws Exception { + public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId, + boolean restart) throws Exception { Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId); if (assignment == null) { @@ -336,9 +337,9 @@ public Response restartFunctionInstance(String tenant, String namespace, String final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); - + if (assignedWorkerId.equals(workerId)) { - restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart); return Response.status(Status.OK).build(); } else { // query other worker @@ -355,8 +356,10 @@ public Response restartFunctionInstance(String tenant, String namespace, String } URI redirect = null; - final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", - workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId); + String action = restart ? "restart" : "stop"; + final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s", + workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId, + action); try { redirect = new URI(redirectUrl); } catch (URISyntaxException e) { @@ -369,7 +372,8 @@ public Response restartFunctionInstance(String tenant, String namespace, String } } - public Response restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception { + public Response stopFunctionInstances(String tenant, String namespace, String functionName, boolean restart) + throws Exception { final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName); Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -382,7 +386,7 @@ public Response restartFunctionInstances(String tenant, String namespace, String final String workerId = this.workerConfig.getWorkerId(); String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); if (assignedWorkerId.equals(workerId)) { - restartFunction(fullyQualifiedInstanceId); + stopFunction(fullyQualifiedInstanceId, restart); } else { List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership(); WorkerInfo workerInfo = null; @@ -398,10 +402,11 @@ public Response restartFunctionInstances(String tenant, String namespace, String continue; } Client client = ClientBuilder.newClient(); + String action = restart ? "restart" : "stop"; // TODO: create and use pulsar-admin to support authorization and authentication and manage redirect - final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", + final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s", workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); + assignment.getInstance().getInstanceId(), action); client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON) .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } @@ -409,15 +414,17 @@ public Response restartFunctionInstances(String tenant, String namespace, String return Response.status(Status.OK).build(); } - private void restartFunction(String fullyQualifiedInstanceId) throws Exception { - log.info("[{}] restarting..", fullyQualifiedInstanceId); + private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception { + log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId); FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); if (functionRuntimeInfo != null) { this.functionActioner.stopFunction(functionRuntimeInfo); try { - this.functionActioner.startFunction(functionRuntimeInfo); + if(restart) { + this.functionActioner.startFunction(functionRuntimeInfo); + } } catch (Exception ex) { - log.info("{} Error starting function", fullyQualifiedInstanceId, ex); + log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex); functionRuntimeInfo.setStartupException(ex); throw ex; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index a9e03de9e3..c2747f4320 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -79,7 +79,6 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; @@ -386,8 +385,18 @@ public Response getFunctionInstanceStatus(final String tenant, final String name return Response.status(Status.OK).entity(jsonResponse).build(); } + public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName, + final String instanceId) { + return stopFunctionInstance(tenant, namespace, functionName, instanceId, false); + } + public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName, final String instanceId) { + return stopFunctionInstance(tenant, namespace, functionName, instanceId, true); + } + + public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName, + final String instanceId, boolean restart) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -411,8 +420,8 @@ public Response restartFunctionInstance(final String tenant, final String namesp FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - return functionRuntimeManager.restartFunctionInstance(tenant, namespace, functionName, - Integer.parseInt(instanceId)); + return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName, + Integer.parseInt(instanceId), restart); } catch (WebApplicationException we) { throw we; } catch (Exception e) { @@ -421,7 +430,16 @@ public Response restartFunctionInstance(final String tenant, final String namesp } } + public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName) { + return stopFunctionInstances(tenant, namespace, functionName, false); + } + public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) { + return stopFunctionInstances(tenant, namespace, functionName, true); + } + + public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName, + boolean restart) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -445,8 +463,8 @@ public Response restartFunctionInstances(final String tenant, final String names FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - return functionRuntimeManager.restartFunctionInstances(tenant, namespace, functionName); - }catch (Exception e) { + return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart); + } catch (Exception e) { log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 3581453920..ddea22ace0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -22,7 +22,6 @@ import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; import java.io.IOException; import java.io.InputStream; @@ -41,9 +40,6 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.functions.worker.WorkerInfo; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; -import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import org.glassfish.jersey.media.multipart.FormDataParam; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -189,7 +185,32 @@ public Response restartFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { return functions.restartFunctionInstances(tenant, namespace, functionName); } - + + @POST + @ApiOperation(value = "Stop function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Stop all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.stopFunctionInstances(tenant, namespace, functionName); + } + @POST @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services