rdhabalia closed pull request #2365: Add support to restart function URL: https://github.com/apache/incubator-pulsar/pull/2365
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 564eb18067..b8891e5a92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -267,6 +267,31 @@ public Response triggerFunction(final @PathParam("tenant") String tenant, } + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Restart all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); + } + @POST @ApiOperation( value = "Uploads Pulsar Function file data", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index c57a8a0e59..2254626804 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -206,7 +206,7 @@ public void testAuthorization() throws Exception { String jarFilePathUrl = String.format("%s:%s", Utils.FILE, PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()); FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion, - functionName, sinkTopic, subscriptionName); + functionName, "my.*", sinkTopic, subscriptionName); try { functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index abc4735ea1..e5902d4d00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -270,7 +270,7 @@ public void testE2EPulsarSink() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -332,7 +332,7 @@ public void testPulsarSinkStats() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -381,7 +381,7 @@ public void testPulsarSinkStats() throws Exception { assertEquals(ownerWorkerId, workerId); } - protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { + protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { File file = new File(jarFile); try { @@ -389,7 +389,7 @@ protected static FunctionDetails createSinkConfig(String jarFile, String tenant, } catch (MalformedURLException e) { throw new RuntimeException("Failed to load user jar " + file, e); } - String sourceTopicPattern = String.format("persistent://%s/%s/my.*", tenant, namespace); + String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic); Class<?> typeArg = byte[].class; FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); @@ -444,7 +444,7 @@ public void testAuthorization(boolean validRoleName) throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); try { admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); assertTrue(validRoleName); @@ -505,4 +505,57 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception { assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName()); } + + @Test(timeOut = 20000) + public void testFunctionRestartApi() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopicName = "restartFunction"; + final String sourceTopic = "persistent://" + replNamespace + "/" + sourceTopicName; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create source topic + Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, + sourceTopicName, sinkTopic, subscriptionName); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats != null && subStats.consumers.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + assertEquals(subStats.consumers.size(), 1); + + // it should restart consumer : so, check if consumer came up again after restarting function + admin.functions().restartFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + SubscriptionStats subStat = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStat != null && subStat.consumers.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + assertEquals(subStats.consumers.size(), 1); + + producer.close(); + } } \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index c04873da8e..4525c51b81 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -181,6 +181,39 @@ * Unexpected error */ FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException; + + /** + * Restart function instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; + + /** + * Restart all function instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** * Triggers the function by writing to the input topic. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 028da3c2b1..402b5d3971 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -215,6 +215,27 @@ public String triggerFunction(String tenant, String namespace, String functionNa } } + @Override + public void restartFunction(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path("restart")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { @@ -289,4 +310,5 @@ public static void mergeJson(String json, Builder builder) throws IOException { public static String printJson(MessageOrBuilder msg) throws IOException { return JsonFormat.printer().print(msg); } + } \ No newline at end of file diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index e206e75253..11a3c7a0b9 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -31,6 +31,7 @@ import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction; import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; +import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; import org.apache.pulsar.admin.cli.CmdSources.CreateSource; @@ -214,6 +215,34 @@ public void testCreateFunction() throws Exception { } + @Test + public void restartFunction() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + int instanceId = 0; + cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName, + "--instance-id", Integer.toString(instanceId)}); + + RestartFunction restarter = cmd.getRestarter(); + assertEquals(fnName, restarter.getFunctionName()); + + verify(functions, times(1)).restartFunction(tenant, namespace, fnName, instanceId); + } + + @Test + public void restartFunctionInstances() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName }); + + RestartFunction restarter = cmd.getRestarter(); + assertEquals(fnName, restarter.getFunctionName()); + + verify(functions, times(1)).restartFunction(tenant, namespace, fnName); + } + @Test public void testCreateFunctionWithHttpUrl() throws Exception { String fnName = TEST_NAME + "-function"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index b11dabee82..dd1ef3df5d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -102,7 +102,8 @@ private final DeleteFunction deleter; private final UpdateFunction updater; private final GetFunction getter; - private final GetFunctionStatus statuser; + private final GetFunctionStatus functionStatus; + private final RestartFunction restart; private final ListFunctions lister; private final StateGetter stateGetter; private final TriggerFunction triggerer; @@ -164,7 +165,7 @@ public void processArguments() { @Parameter(names = "--name", description = "The function's name") protected String functionName; - + @Override void processArguments() throws Exception { super.processArguments(); @@ -831,6 +832,27 @@ void runCmd() throws Exception { } } + @Parameters(commandDescription = "Restart function instance") + class RestartFunction extends FunctionCommand { + + @Parameter(names = "--instance-id", description = "The function instanceId (restart all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.functions().restartFunction(tenant, namespace, functionName); + } + System.out.println("Restarted successfully"); + } + } + @Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster") class DeleteFunction extends FunctionCommand { @Override @@ -1035,18 +1057,20 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { deleter = new DeleteFunction(); updater = new UpdateFunction(); getter = new GetFunction(); - statuser = new GetFunctionStatus(); + functionStatus = new GetFunctionStatus(); lister = new ListFunctions(); stateGetter = new StateGetter(); triggerer = new TriggerFunction(); uploader = new UploadFunction(); downloader = new DownloadFunction(); cluster = new GetCluster(); + restart = new RestartFunction(); jcommander.addCommand("localrun", getLocalRunner()); jcommander.addCommand("create", getCreater()); jcommander.addCommand("delete", getDeleter()); jcommander.addCommand("update", getUpdater()); jcommander.addCommand("get", getGetter()); + jcommander.addCommand("restart", getRestarter()); jcommander.addCommand("getstatus", getStatuser()); jcommander.addCommand("list", getLister()); jcommander.addCommand("querystate", getStateGetter()); @@ -1082,7 +1106,7 @@ GetFunction getGetter() { } @VisibleForTesting - GetFunctionStatus getStatuser() { return statuser; } + GetFunctionStatus getStatuser() { return functionStatus; } @VisibleForTesting ListFunctions getLister() { @@ -1109,6 +1133,11 @@ DownloadFunction getDownloader() { return downloader; } + @VisibleForTesting + RestartFunction getRestarter() { + return restart; + } + private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 09273606ab..b3f30fdb95 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -127,7 +127,7 @@ public void join() throws InterruptedException { } @VisibleForTesting - protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { + public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); @@ -225,7 +225,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa } } - private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { + public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { Function.Instance instance = functionRuntimeInfo.getFunctionInstance(); FunctionMetaData functionMetaData = instance.getFunctionMetaData(); log.info("Stopping function {} - {}...", diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 5e1995e633..121a454a91 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -20,11 +20,15 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -35,9 +39,14 @@ import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -317,6 +326,104 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { return functionStatus; } + public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId) throws Exception { + Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId); + if (assignment == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " doesn't exist")).build(); + } + + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + + if (assignedWorkerId.equals(workerId)) { + restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + return Response.status(Status.OK).build(); + } else { + // query other worker + List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry : workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build(); + } + + URI redirect = null; + final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", + workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId); + try { + redirect = new URI(redirectUrl); + } catch (URISyntaxException e) { + log.error("Error in preparing redirect url for {}/{}/{}/{}: {}", tenant, namespace, functionName, + instanceId, e.getMessage(), e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " invalid redirection url")).build(); + } + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } + } + + public Response restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception { + final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName); + Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName); + + if (assignments.isEmpty()) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build(); + } + for (Assignment assignment : assignments) { + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + if (assignedWorkerId.equals(workerId)) { + restartFunction(fullyQualifiedInstanceId); + } else { + List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry : workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId); + } + continue; + } + Client client = ClientBuilder.newClient(); + // TODO: create and use pulsar-admin to support authorization and authentication and manage redirect + final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", + workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } + } + return Response.status(Status.OK).build(); + } + + private void restartFunction(String fullyQualifiedInstanceId) throws Exception { + log.info("[{}] restarting..", fullyQualifiedInstanceId); + FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); + if (functionRuntimeInfo != null) { + this.functionActioner.stopFunction(functionRuntimeInfo); + try { + this.functionActioner.startFunction(functionRuntimeInfo); + } catch (Exception ex) { + log.info("{} Error starting function", fullyQualifiedInstanceId, ex); + functionRuntimeInfo.setStartupException(ex); + throw ex; + } + } + } + /** * Get statuses of all function instances. * @param tenant the tenant the function belongs to diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index c35dd52e80..45bdb1426a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -385,6 +385,72 @@ public Response getFunctionInstanceStatus(final String tenant, final String name return Response.status(Status.OK).entity(jsonResponse).build(); } + public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName, + final String instanceId) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); + } catch (IllegalArgumentException e) { + log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + try { + return functionRuntimeManager.restartFunctionInstance(tenant, namespace, functionName, + Integer.parseInt(instanceId)); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e); + return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); + } + } + + public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionRequestParams(tenant, namespace, functionName); + } catch (IllegalArgumentException e) { + log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + try { + return functionRuntimeManager.restartFunctionInstances(tenant, namespace, functionName); + }catch (Exception e) { + log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); + } + } + public Response getFunctionStatus(final String tenant, final String namespace, final String functionName) throws IOException { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 96baada967..3581453920 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -22,6 +22,7 @@ import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; import java.io.IOException; import java.io.InputStream; @@ -45,6 +46,8 @@ import org.glassfish.jersey.media.multipart.FormDataParam; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -162,6 +165,31 @@ public Response triggerFunction(final @PathParam("tenant") String tenant, return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic); } + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Restart all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); + } + @POST @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services