This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f3a027b Added stop/restart functionality in sources/sinks (#2810) f3a027b is described below commit f3a027b45f8244d19ac24709edc7f762abb5ce6f Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Oct 19 10:00:25 2018 -0700 Added stop/restart functionality in sources/sinks (#2810) * Added Get and List source/sink functionality * Fixed compile * Removed test that doesnt make sense any more * Fixed build * Fixed logic * Return error response * Return response on error * Fix unittest * Fixed unittest * Fixed unittest * Fixed unittest * Added get/list sinks tests * Added get/list tests * Add more unittests * Added more unittests * Added TODO * Took feedback * Fix unittest * Fix unittest * Fix unittest * Fixed integration tests * Fixed integration test * Added restart/stop functionality to the sources/sinks * Added getstatus method to sources/sink * Fix integration tests --- .../pulsar/broker/admin/impl/FunctionsBase.java | 12 +- .../apache/pulsar/broker/admin/impl/SinkBase.java | 12 +- .../pulsar/broker/admin/impl/SourceBase.java | 12 +- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 69 +++++++++++ .../org/apache/pulsar/admin/cli/CmdSources.java | 91 +++++++++++++-- .../functions/worker/rest/api/FunctionsImpl.java | 129 +++++++++++++-------- .../worker/rest/api/v2/FunctionApiV2Resource.java | 12 +- .../worker/rest/api/v2/SinkApiV2Resource.java | 12 +- .../worker/rest/api/v2/SourceApiV2Resource.java | 12 +- .../integration/functions/PulsarFunctionsTest.java | 39 ++++++- 10 files changed, 299 insertions(+), 101 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index b50da21..01dd354 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -162,7 +162,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId, uri.getRequestUri()); + tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } @GET @@ -179,7 +179,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionStatus( - tenant, namespace, functionName, uri.getRequestUri()); + tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); } @GET @@ -256,7 +256,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi public Response restartFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } @POST @@ -268,7 +268,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi @Consumes(MediaType.APPLICATION_JSON) public Response restartFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.restartFunctionInstances(tenant, namespace, functionName); + return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @POST @@ -281,7 +281,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi public Response stopFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } @POST @@ -293,7 +293,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi @Consumes(MediaType.APPLICATION_JSON) public Response stopFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.stopFunctionInstances(tenant, namespace, functionName); + return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java index 0f5a5c5..ba7dedc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java @@ -148,7 +148,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri()); } @GET @@ -164,7 +164,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { public Response getSinkStatus(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) throws IOException { - return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri()); + return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri()); } @GET @@ -194,7 +194,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { public Response restartSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri()); } @POST @@ -206,7 +206,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { @Consumes(MediaType.APPLICATION_JSON) public Response restartSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.restartFunctionInstances(tenant, namespace, sinkName); + return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK); } @POST @@ -219,7 +219,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { public Response stopSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri()); } @POST @@ -231,7 +231,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> { @Consumes(MediaType.APPLICATION_JSON) public Response stopSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.stopFunctionInstances(tenant, namespace, sinkName); + return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java index 4bda489..c695a1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java @@ -150,7 +150,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri()); } @GET @@ -166,7 +166,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> public Response getSourceStatus(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) throws IOException { - return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri()); + return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri()); } @GET @@ -197,7 +197,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> public Response restartSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri()); } @POST @@ -209,7 +209,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> @Consumes(MediaType.APPLICATION_JSON) public Response restartSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.restartFunctionInstances(tenant, namespace, sourceName); + return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @POST @@ -222,7 +222,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> public Response stopSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri()); } @POST @@ -234,7 +234,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService> @Consumes(MediaType.APPLICATION_JSON) public Response stopSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.stopFunctionInstances(tenant, namespace, sourceName); + return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @GET diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 3b9159e..37d2d9b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -31,6 +31,7 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; import java.io.File; @@ -67,6 +68,9 @@ public class CmdSinks extends CmdBase { private final DeleteSink deleteSink; private final ListSinks listSinks; private final GetSink getSink; + private final GetSinkStatus getSinkStatus; + private final StopSink stopSink; + private final RestartSink restartSink; private final LocalSinkRunner localSinkRunner; public CmdSinks(PulsarAdmin admin) { @@ -76,6 +80,9 @@ public class CmdSinks extends CmdBase { deleteSink = new DeleteSink(); listSinks = new ListSinks(); getSink = new GetSink(); + getSinkStatus = new GetSinkStatus(); + stopSink = new StopSink(); + restartSink = new RestartSink(); localSinkRunner = new LocalSinkRunner(); jcommander.addCommand("create", createSink); @@ -83,6 +90,9 @@ public class CmdSinks extends CmdBase { jcommander.addCommand("delete", deleteSink); jcommander.addCommand("list", listSinks); jcommander.addCommand("get", getSink); + jcommander.addCommand("getstatus", getSinkStatus); + jcommander.addCommand("stop", stopSink); + jcommander.addCommand("restart", restartSink); jcommander.addCommand("localrun", localSinkRunner); jcommander.addCommand("available-sinks", new ListBuiltInSinks()); } @@ -590,6 +600,65 @@ public class CmdSinks extends CmdBase { } } + @Parameters(commandDescription = "Check the current status of a Pulsar Sink") + class GetSinkStatus extends SinkCommand { + + @Parameter(names = "--instance-id", description = "The sink instanceId (Get-status of all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + String json = Utils.printJson( + isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName) + : admin.sink().getSinkStatus(tenant, namespace, sinkName, + Integer.parseInt(instanceId))); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(new JsonParser().parse(json))); + } + } + + @Parameters(commandDescription = "Restart sink instance") + class RestartSink extends SinkCommand { + + @Parameter(names = "--instance-id", description = "The sink instanceId (restart all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.sink().restartSink(tenant, namespace, sinkName); + } + System.out.println("Restarted successfully"); + } + } + + @Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again") + class StopSink extends SinkCommand { + + @Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.sink().stopSink(tenant, namespace, sinkName); + } + System.out.println("Restarted successfully"); + } + } + @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster") public class ListBuiltInSinks extends BaseCommand { @Override diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index f27b0a5..5a1e9b3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.admin.cli; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.fileExists; @@ -29,6 +31,7 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; import java.io.File; @@ -69,8 +72,11 @@ public class CmdSources extends CmdBase { private final CreateSource createSource; private final DeleteSource deleteSource; private final GetSource getSource; + private final GetSourceStatus getSourceStatus; private final ListSources listSources; private final UpdateSource updateSource; + private final RestartSource restartSource; + private final StopSource stopSource; private final LocalSourceRunner localSourceRunner; public CmdSources(PulsarAdmin admin) { @@ -80,13 +86,19 @@ public class CmdSources extends CmdBase { deleteSource = new DeleteSource(); listSources = new ListSources(); getSource = new GetSource(); + getSourceStatus = new GetSourceStatus(); + restartSource = new RestartSource(); + stopSource = new StopSource(); localSourceRunner = new LocalSourceRunner(); jcommander.addCommand("create", createSource); jcommander.addCommand("update", updateSource); jcommander.addCommand("delete", deleteSource); jcommander.addCommand("get", getSource); + jcommander.addCommand("getstatus", getSourceStatus); jcommander.addCommand("list", listSources); + jcommander.addCommand("stop", stopSource); + jcommander.addCommand("restart", restartSource); jcommander.addCommand("localrun", localSourceRunner); jcommander.addCommand("available-sources", new ListBuiltInSources()); } @@ -147,13 +159,13 @@ public class CmdSources extends CmdBase { protected String tlsTrustCertFilePath; private void mergeArgs() { - if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; - if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; - if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams; + if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; + if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; + if (!isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams; if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls; if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection; if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled; - if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath; + if (!isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath; } @Override @@ -280,11 +292,11 @@ public class CmdSources extends CmdBase { private void mergeArgs() { if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; - if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName; - if (!StringUtils.isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName; - if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className; - if (!StringUtils.isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile; - if (!StringUtils.isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString; + if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName; + if (!isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName; + if (!isBlank(DEPRECATED_className)) className = DEPRECATED_className; + if (!isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile; + if (!isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString; } @Override @@ -382,7 +394,7 @@ public class CmdSources extends CmdBase { } protected void validateSourceConfigs(SourceConfig sourceConfig) { - if (StringUtils.isBlank(sourceConfig.getArchive())) { + if (isBlank(sourceConfig.getArchive())) { throw new ParameterException("Source archive not specfied"); } @@ -548,6 +560,65 @@ public class CmdSources extends CmdBase { } } + @Parameters(commandDescription = "Check the current status of a Pulsar Source") + class GetSourceStatus extends SourceCommand { + + @Parameter(names = "--instance-id", description = "The source instanceId (Get-status of all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + String json = Utils.printJson( + isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName) + : admin.source().getSourceStatus(tenant, namespace, sourceName, + Integer.parseInt(instanceId))); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + System.out.println(gson.toJson(new JsonParser().parse(json))); + } + } + + @Parameters(commandDescription = "Restart source instance") + class RestartSource extends SourceCommand { + + @Parameter(names = "--instance-id", description = "The source instanceId (restart all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.source().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.source().restartSource(tenant, namespace, sourceName); + } + System.out.println("Restarted successfully"); + } + } + + @Parameters(commandDescription = "Temporary stops source instance. (If worker restarts then it reassigns and starts source again") + class StopSource extends SourceCommand { + + @Parameter(names = "--instance-id", description = "The source instanceId (stop all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.source().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.source().stopSource(tenant, namespace, sourceName); + } + System.out.println("Restarted successfully"); + } + } + @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster") public class ListBuiltInSources extends BaseCommand { @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 44bb3bd..0eebe79 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -403,8 +403,8 @@ public class FunctionsImpl { return Response.status(Status.OK).entity(retval).build(); } - public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName, - final String instanceId, URI uri) throws IOException { + public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -412,23 +412,28 @@ public class FunctionsImpl { // validate parameters try { - validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); + validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); } catch (IllegalArgumentException e) { - log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } - FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName); int instanceIdInt = Integer.parseInt(instanceId); if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { - log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName); + log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(String.format("Invalid InstanceId"))).build(); } @@ -436,12 +441,12 @@ public class FunctionsImpl { FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); FunctionStatus functionStatus = null; try { - functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, + functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName, Integer.parseInt(instanceId), uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e); + log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } @@ -449,18 +454,18 @@ public class FunctionsImpl { return Response.status(Status.OK).entity(jsonResponse).build(); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName, - final String instanceId, URI uri) { - return stopFunctionInstance(tenant, namespace, functionName, instanceId, false, uri); + public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) { + return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri); } - public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName, - final String instanceId, URI uri) { - return stopFunctionInstance(tenant, namespace, functionName, instanceId, true, uri); + public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, URI uri) { + return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName, - final String instanceId, boolean restart, URI uri) { + public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, + final String componentType, final String instanceId, boolean restart, URI uri) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -468,42 +473,51 @@ public class FunctionsImpl { // validate parameters try { - validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); + validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); } catch (IllegalArgumentException e) { - log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName, + return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName, Integer.parseInt(instanceId), restart, uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e); + log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } } - public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName) { - return stopFunctionInstances(tenant, namespace, functionName, false); + public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType) { + return stopFunctionInstances(tenant, namespace, componentName, componentType, false); } - public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) { - return stopFunctionInstances(tenant, namespace, functionName, true); + public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType) { + return stopFunctionInstances(tenant, namespace, componentName, componentType, true); } - public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName, - boolean restart) { + public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, + final String componentType, boolean restart) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -511,32 +525,40 @@ public class FunctionsImpl { // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { - log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); try { - return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart); + return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart); } catch (WebApplicationException we) { throw we; } catch (Exception e) { - log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e); + log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } } - public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri) + public Response getFunctionStatus(final String tenant, final String namespace, final String componentName, + final String componentType, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { @@ -545,24 +567,31 @@ public class FunctionsImpl { // validate parameters try { - validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); } catch (IllegalArgumentException e) { - log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e); + log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); - if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { - log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { + log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName); return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); + } + + FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (!calculateSubjectType(functionMetaData).equals(componentType)) { + log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); InstanceCommunication.FunctionStatusList functionStatusList = null; try { - functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri); + functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, componentName, uri); } catch (WebApplicationException we) { throw we; } catch (Exception e) { @@ -870,11 +899,11 @@ public class FunctionsImpl { } } - private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName, - String instanceId) throws IllegalArgumentException { - validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION); + private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName, + String componentType, String instanceId) throws IllegalArgumentException { + validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); if (instanceId == null) { - throw new IllegalArgumentException("Function Instance Id is not provided"); + throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType)); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 405f88f..e37a88a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -107,7 +107,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId, uri.getRequestUri()); + tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri()); } @GET @@ -116,7 +116,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { return functions.getFunctionStatus( - tenant, namespace, functionName, uri.getRequestUri()); + tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri()); } @GET @@ -150,7 +150,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { public Response restartFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri()); } @POST @@ -162,7 +162,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response restartFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.restartFunctionInstances(tenant, namespace, functionName); + return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @POST @@ -175,7 +175,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { public Response stopFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri()); } @POST @@ -187,7 +187,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response stopFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { - return functions.stopFunctionInstances(tenant, namespace, functionName); + return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION); } @POST diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java index 488f47d..934b0fc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java @@ -97,7 +97,7 @@ public class SinkApiV2Resource extends FunctionApiResource { final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri()); } @GET @@ -105,7 +105,7 @@ public class SinkApiV2Resource extends FunctionApiResource { public Response getSinkStatus(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) throws IOException { - return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri()); + return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri()); } @GET @@ -126,7 +126,7 @@ public class SinkApiV2Resource extends FunctionApiResource { public Response restartSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri()); } @POST @@ -138,7 +138,7 @@ public class SinkApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response restartSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.restartFunctionInstances(tenant, namespace, sinkName); + return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK); } @POST @@ -151,7 +151,7 @@ public class SinkApiV2Resource extends FunctionApiResource { public Response stopSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri()); } @POST @@ -163,7 +163,7 @@ public class SinkApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response stopSink(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { - return functions.stopFunctionInstances(tenant, namespace, sinkName); + return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK); } @GET diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java index 3b1222e..2c39344 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java @@ -97,7 +97,7 @@ public class SourceApiV2Resource extends FunctionApiResource { final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri()); } @GET @@ -105,7 +105,7 @@ public class SourceApiV2Resource extends FunctionApiResource { public Response getSourceStatus(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) throws IOException { - return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri()); + return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri()); } @GET @@ -126,7 +126,7 @@ public class SourceApiV2Resource extends FunctionApiResource { public Response restartSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) { - return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri()); + return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri()); } @POST @@ -138,7 +138,7 @@ public class SourceApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response restartSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.restartFunctionInstances(tenant, namespace, sourceName); + return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @POST @@ -151,7 +151,7 @@ public class SourceApiV2Resource extends FunctionApiResource { public Response stopSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, final @PathParam("instanceId") String instanceId) { - return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri()); + return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri()); } @POST @@ -163,7 +163,7 @@ public class SourceApiV2Resource extends FunctionApiResource { @Consumes(MediaType.APPLICATION_JSON) public Response stopSource(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { - return functions.stopFunctionInstances(tenant, namespace, sourceName); + return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE); } @GET diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 6659fcf..0222bd6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -154,7 +154,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } // wait for sink to process messages - waitForProcessingMessages(tenant, namespace, sinkName, numMessages); + waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages); // validate the sink result tester.validateSinkResult(kvs); @@ -238,7 +238,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "sink", "getstatus", "--tenant", tenant, "--namespace", namespace, @@ -254,7 +254,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } catch (ContainerExecException e) { // expected in early iterations } - log.info("Backoff 1 second until the function is running"); + log.info("Backoff 1 second until the sink is running"); TimeUnit.SECONDS.sleep(1); } } @@ -482,7 +482,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "source", "getstatus", "--tenant", tenant, "--namespace", namespace, @@ -518,7 +518,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { int numMessages) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, - "functions", + "source", "getstatus", "--tenant", tenant, "--namespace", namespace, @@ -541,6 +541,35 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } } + protected void waitForProcessingSinkMessages(String tenant, + String namespace, + String sinkName, + int numMessages) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sink", + "getstatus", + "--tenant", tenant, + "--namespace", namespace, + "--name", sinkName + }; + Stopwatch stopwatch = Stopwatch.createStarted(); + while (true) { + try { + ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands); + log.info("Get sink status : {}", result.getStdout()); + if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\"")) { + return; + } + } catch (ContainerExecException e) { + // expected for early iterations + } + log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second", + stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages); + TimeUnit.SECONDS.sleep(1); + } + } + protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT,