This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e63b658 add backwards compatiblity to 2.2 for function admin API (#3241) e63b658 is described below commit e63b658d8d0e5863d9b77658d3b1eb8b374f0de9 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Dec 26 17:29:31 2018 -0800 add backwards compatiblity to 2.2 for function admin API (#3241) * add backwards compatiblity to 2.2 for function admin API * add license headers * renaming * fix tests * get integrations tests to pass --- .../org/apache/pulsar/broker/PulsarService.java | 1 + .../apache/pulsar/broker/admin/v1/Functions.java | 9 +- .../apache/pulsar/broker/admin/v2/Functions.java | 303 ++++++++++- .../pulsar/broker/admin/{v2 => v3}/Functions.java | 6 +- .../pulsar/broker/admin/{v2 => v3}/Sink.java | 2 +- .../pulsar/broker/admin/{v2 => v3}/Source.java | 2 +- .../client/admin/internal/FunctionsImpl.java | 2 +- .../pulsar/client/admin/internal/SinkImpl.java | 2 +- .../pulsar/client/admin/internal/SourceImpl.java | 2 +- .../src/main/proto/InstanceCommunication.proto | 8 +- .../pulsar/functions/worker/rest/Resources.java | 19 +- .../pulsar/functions/worker/rest/WorkerServer.java | 10 +- .../functions/worker/rest/api/FunctionsImplV2.java | 210 +++++++ .../worker/rest/api/v2/FunctionApiV2Resource.java | 299 +++++----- .../FunctionApiV3Resource.java} | 6 +- .../SinkApiV3Resource.java} | 6 +- .../SourceApiV3Resource.java} | 6 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 603 ++++++++++----------- .../FunctionApiV3ResourceTest.java} | 9 +- .../SinkApiV3ResourceTest.java} | 6 +- .../SourceApiV3ResourceTest.java} | 6 +- 21 files changed, 1028 insertions(+), 489 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index f576f95..417ec21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -381,6 +381,7 @@ public class PulsarService implements AutoCloseable { this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap); this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap); this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap); + this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap); this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap); this.webService.addServlet("/metrics", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java index 7eca553..0591279 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java @@ -19,10 +19,17 @@ package org.apache.pulsar.broker.admin.v1; import io.swagger.annotations.Api; + +import javax.ws.rs.Consumes; import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + import org.apache.pulsar.broker.admin.impl.FunctionsBase; @Path("/functions") @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true) -public class Functions extends FunctionsBase { +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class Functions extends org.apache.pulsar.broker.admin.v2.Functions{ } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java index 854a365..19c3052 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java @@ -19,16 +19,311 @@ package org.apache.pulsar.broker.admin.v2; import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; - -import org.apache.pulsar.broker.admin.impl.FunctionsBase; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; @Path("/functions") @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class Functions extends FunctionsBase { -} +public class Functions extends AdminResource implements Supplier<WorkerService> { + + private final FunctionsImplV2 functions; + + public Functions() { + this.functions = new FunctionsImplV2(this); + } + + @Override + public WorkerService get() { + return pulsar().getWorkerService(); + } + + @POST + @ApiOperation(value = "Creates a new Pulsar Function in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "Pulsar Function successfully created") + }) + @Path("/{tenant}/{namespace}/{functionName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { + + return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + } + + @PUT + @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Function successfully updated") + }) + @Path("/{tenant}/{namespace}/{functionName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { + + return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + } + + + @DELETE + @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function doesn't exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "The function was successfully deleted") + }) + @Path("/{tenant}/{namespace}/{functionName}") + public Response deregisterFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); + } + + @GET + @ApiOperation( + value = "Fetches information about a Pulsar Function currently running in cluster mode", + response = FunctionMetaData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{functionName}") + public Response getFunctionInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + + return functions.getFunctionInfo( + tenant, namespace, functionName); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Function instance", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status") + public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) throws IOException { + + return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Function running in cluster mode", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{functionName}/status") + public Response getFunctionStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + return functions.getFunctionStatusV2( + tenant, namespace, functionName, uri.getRequestUri()); + } + + @GET + @ApiOperation( + value = "Lists all Pulsar Functions currently deployed in a given namespace", + response = String.class, + responseContainer = "Collection" + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}") + public Response listFunctions(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions( tenant, namespace); + } + + @POST + @ApiOperation( + value = "Triggers a Pulsar Function with a user-specified value or file data", + response = Message.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/trigger") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response triggerFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") String triggerValue, + final @FormDataParam("dataStream") InputStream triggerStream, + final @FormDataParam("topic") String topic) { + return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic); + } + + @GET + @ApiOperation( + value = "Fetch the current state associated with a Pulsar Function", + response = String.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The key does not exist"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @Path("/{tenant}/{namespace}/{functionName}/state/{key}") + public Response getFunctionState(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("key") String key) { + return functions.getFunctionState(tenant, namespace, functionName, key); + } + + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Restart all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); + } + + @POST + @ApiOperation(value = "Stop function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Stop all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.stopFunctionInstances(tenant, namespace, functionName); + } + + @POST + @ApiOperation( + value = "Uploads Pulsar Function file data", + hidden = true + ) + @Path("/upload") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("path") String path) { + return functions.uploadFunction(uploadedInputStream, path); + } + + @GET + @ApiOperation( + value = "Downloads Pulsar Function file data", + hidden = true + ) + @Path("/download") + public Response downloadFunction(final @QueryParam("path") String path) { + return functions.downloadFunction(path); + } + + @GET + @ApiOperation( + value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode", + response = List.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout") + }) + @Path("/connectors") + public List<ConnectorDefinition> getConnectorsList() throws IOException { + return functions.getListOfConnectors(); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java similarity index 96% copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java index 854a365..ce634a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java @@ -16,16 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.admin.v2; +package org.apache.pulsar.broker.admin.v3; import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.FunctionsBase; + import javax.ws.rs.Consumes; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import org.apache.pulsar.broker.admin.impl.FunctionsBase; - @Path("/functions") @Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true) @Produces(MediaType.APPLICATION_JSON) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java similarity index 96% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java index aea0ae7..e137f08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.admin.v2; +package org.apache.pulsar.broker.admin.v3; import io.swagger.annotations.Api; import org.apache.pulsar.broker.admin.impl.SinkBase; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java similarity index 96% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java index e5ef56c..24e84eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.admin.v2; +package org.apache.pulsar.broker.admin.v3; import io.swagger.annotations.Api; import org.apache.pulsar.broker.admin.impl.SourceBase; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 1493d1d..130704e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -56,7 +56,7 @@ public class FunctionsImpl extends BaseResource implements Functions { public FunctionsImpl(WebTarget web, Authentication auth) { super(auth); - this.functions = web.path("/admin/functions"); + this.functions = web.path("/admin/v3/functions"); } @Override diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java index 1363d35..a9f99b8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java @@ -46,7 +46,7 @@ public class SinkImpl extends BaseResource implements Sink { public SinkImpl(WebTarget web, Authentication auth) { super(auth); - this.sink = web.path("/admin/v2/sink"); + this.sink = web.path("/admin/v3/sink"); } @Override diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java index d0d36a5..2d066e0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java @@ -46,7 +46,7 @@ public class SourceImpl extends BaseResource implements Source { public SourceImpl(WebTarget web, Authentication auth) { super(auth); - this.source = web.path("/admin/v2/source"); + this.source = web.path("/admin/v3/source"); } @Override diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 1b50ea8..aec1dd3 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -60,10 +60,10 @@ message FunctionStatus { } // Deprecated -//message FunctionStatusList { -// string error = 2; -// repeated FunctionStatus functionStatusList = 1; -//} +message FunctionStatusList { + string error = 2; + repeated FunctionStatus functionStatusList = 1; +} message MetricsData { // message DataDigest { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index ac011db..530f528 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -20,8 +20,9 @@ package org.apache.pulsar.functions.worker.rest; import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; -import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource; -import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource; +import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; import org.glassfish.jersey.media.multipart.MultiPartFeature; @@ -34,17 +35,25 @@ public final class Resources { private Resources() { } - public static Set<Class<?>> getApiResources() { + public static Set<Class<?>> getApiV2Resources() { return new HashSet<>( Arrays.asList( FunctionApiV2Resource.class, - SourceApiV2Resource.class, - SinkApiV2Resource.class, WorkerApiV2Resource.class, MultiPartFeature.class )); } + public static Set<Class<?>> getApiV3Resources() { + return new HashSet<>( + Arrays.asList( + MultiPartFeature.class, + SourceApiV3Resource.class, + SinkApiV3Resource.class, + FunctionApiV3Resource.class + )); + } + public static Set<Class<?>> getRootResources() { return new HashSet<>( Arrays.asList( diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 5bc2f4c..35a4b7a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -76,7 +76,7 @@ public class WorkerServer { public WorkerServer(WorkerService workerService) { this.workerConfig = workerService.getWorkerConfig(); this.workerService = workerService; - this.webServerExecutor = new WebExecutorThreadPool("function-web"); + this.webServerExecutor = new WebExecutorThreadPool(8, "function-web"); init(); } @@ -93,11 +93,13 @@ public class WorkerServer { connector.setPort(this.workerConfig.getWorkerPort()); connectors.add(connector); - List<Handler> handlers = new ArrayList<>(3); + List<Handler> handlers = new ArrayList<>(4); handlers.add( - newServletContextHandler("/admin", new ResourceConfig(Resources.getApiResources()), workerService)); + newServletContextHandler("/admin", new ResourceConfig(Resources.getApiV2Resources()), workerService)); handlers.add( - newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiResources()), workerService)); + newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiV2Resources()), workerService)); + handlers.add( + newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService)); handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService)); RequestLogHandler requestLogHandler = new RequestLogHandler(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java new file mode 100644 index 0000000..c71e1c2 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api; + +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.functions.FunctionState; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.FunctionStatus; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.worker.FunctionMetaDataManager; +import org.apache.pulsar.functions.worker.WorkerService; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; + +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +@Slf4j +public class FunctionsImplV2 { + + private FunctionsImpl delegate; + public FunctionsImplV2(Supplier<WorkerService> workerServiceSupplier) { + this.delegate = new FunctionsImpl(workerServiceSupplier); + } + + // For test purposes + public FunctionsImplV2(FunctionsImpl delegate) { + this.delegate = delegate; + } + + public Response getFunctionInfo(final String tenant, final String namespace, final String functionName) + throws IOException { + + // run just for parameter checks + delegate.getFunctionInfo(tenant, namespace, functionName); + + FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager(); + + Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, + functionName); + String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails()); + return Response.status(Response.Status.OK).entity(functionDetailsJson).build(); + } + + public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName, + final String instanceId, URI uri) throws IOException { + + org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData + functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri); + + String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId)); + return Response.status(Response.Status.OK).entity(jsonResponse).build(); + } + + public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws + IOException { + FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri); + InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder(); + functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList( + toProto(functionInstanceStatus.getStatus(), + String.valueOf(functionInstanceStatus.getInstanceId())))); + String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList); + return Response.status(Response.Status.OK).entity(jsonResponse).build(); + } + + public Response registerFunction(String tenant, String namespace, String functionName, InputStream + uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String + functionDetailsJson, String functionConfigJson, String clientAppId) { + delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId); + return Response.ok().build(); + } + + public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream, + FormDataContentDisposition fileDetail, String functionPkgUrl, String + functionDetailsJson, String functionConfigJson, String clientAppId) { + delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId); + return Response.ok().build(); + } + + public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) { + delegate.deregisterFunction(tenant, namespace, functionName, clientAppId); + return Response.ok().build(); + } + + public Response listFunctions(String tenant, String namespace) { + Collection<String> functionStateList = delegate.listFunctions( tenant, namespace); + return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build(); + } + + public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue, + InputStream triggerStream, String topic) { + String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic); + return Response.status(Response.Status.OK).entity(result).build(); + } + + public Response getFunctionState(String tenant, String namespace, String functionName, String key) { + FunctionState functionState = delegate.getFunctionState( + tenant, namespace, functionName, key); + + String value; + if (functionState.getNumberValue() != null) { + value = "value : " + functionState.getNumberValue() + ", version : " + functionState.getVersion(); + } else { + value = "value : " + functionState.getStringValue() + ", version : " + functionState.getVersion(); + } + return Response.status(Response.Status.OK) + .entity(value) + .build(); + } + + public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI + uri) { + delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri); + return Response.ok().build(); + } + + public Response restartFunctionInstances(String tenant, String namespace, String functionName) { + delegate.restartFunctionInstances(tenant, namespace, functionName); + return Response.ok().build(); + } + + public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI + uri) { + delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri); + return Response.ok().build(); + } + + public Response stopFunctionInstances(String tenant, String namespace, String functionName) { + delegate.stopFunctionInstances(tenant, namespace, functionName); + return Response.ok().build(); + } + + public Response uploadFunction(InputStream uploadedInputStream, String path) { + delegate.uploadFunction(uploadedInputStream, path); + return Response.ok().build(); + } + + public Response downloadFunction(String path) { + return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build(); + } + + public List<ConnectorDefinition> getListOfConnectors() { + return delegate.getListOfConnectors(); + } + + private InstanceCommunication.FunctionStatus toProto( + org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData + functionInstanceStatus, String instanceId) { + List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSysExceptions + = functionInstanceStatus.getLatestSystemExceptions() + .stream() + .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() + .setExceptionString(exceptionInformation.getExceptionString()) + .setMsSinceEpoch(exceptionInformation.getTimestampMs()) + .build()) + .collect(Collectors.toList()); + + List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions + = functionInstanceStatus.getLatestUserExceptions() + .stream() + .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder() + .setExceptionString(exceptionInformation.getExceptionString()) + .setMsSinceEpoch(exceptionInformation.getTimestampMs()) + .build()) + .collect(Collectors.toList()); + + + InstanceCommunication.FunctionStatus functionStatus = InstanceCommunication.FunctionStatus.newBuilder() + .setRunning(functionInstanceStatus.isRunning()) + .setFailureException(functionInstanceStatus.getError()) + .setNumRestarts(functionInstanceStatus.getNumRestarts()) + .setNumSuccessfullyProcessed(functionInstanceStatus.getNumSuccessfullyProcessed()) + .setNumUserExceptions(functionInstanceStatus.getNumUserExceptions()) + .addAllLatestUserExceptions(latestUserExceptions) + .setNumSystemExceptions(functionInstanceStatus.getNumSystemExceptions()) + .addAllLatestSystemExceptions(latestSysExceptions) + .setAverageLatency(functionInstanceStatus.getAverageLatency()) + .setLastInvocationTime(functionInstanceStatus.getLastInvocationTime()) + .setInstanceId(instanceId) + .setWorkerId(delegate.worker().getWorkerConfig().getWorkerId()) + .build(); + + return functionStatus; + } +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index b620fe5..2894c05 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -22,13 +22,12 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.common.functions.FunctionState; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; -import org.apache.pulsar.common.policies.data.FunctionStats; -import org.apache.pulsar.common.policies.data.FunctionStatus; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; -import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -39,10 +38,9 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -51,242 +49,271 @@ import java.util.List; @Path("/functions") public class FunctionApiV2Resource extends FunctionApiResource { - protected final FunctionsImpl functions; + protected final FunctionsImplV2 functions; public FunctionApiV2Resource() { - this.functions = new FunctionsImpl(this); + this.functions = new FunctionsImplV2(this); } @POST + @ApiOperation(value = "Creates a new Pulsar Function in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "Pulsar Function successfully created") + }) @Path("/{tenant}/{namespace}/{functionName}") @Consumes(MediaType.MULTIPART_FORM_DATA) - public void registerFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson, - final @FormDataParam("functionConfig") String functionConfigJson) { + public Response registerFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { - functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); - } @PUT + @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Function successfully updated") + }) @Path("/{tenant}/{namespace}/{functionName}") @Consumes(MediaType.MULTIPART_FORM_DATA) - public void updateFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String functionPkgUrl, - final @FormDataParam("functionDetails") String functionDetailsJson, - final @FormDataParam("functionConfig") String functionConfigJson) { + public Response updateFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("functionDetails") String functionDetailsJson, + final @FormDataParam("functionConfig") String functionConfigJson) { - functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, + return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); - } + @DELETE + @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function doesn't exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "The function was successfully deleted") + }) @Path("/{tenant}/{namespace}/{functionName}") - public void deregisterFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); + public Response deregisterFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + return functions.deregisterFunction(tenant, namespace, functionName, clientAppId()); } @GET + @ApiOperation( + value = "Fetches information about a Pulsar Function currently running in cluster mode", + response = Function.FunctionMetaData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) @Path("/{tenant}/{namespace}/{functionName}") - public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - return functions.getFunctionInfo(tenant, namespace, functionName); + public Response getFunctionInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + + return functions.getFunctionInfo( + tenant, namespace, functionName); } @GET @ApiOperation( value = "Displays the status of a Pulsar Function instance", - response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class + response = InstanceCommunication.FunctionStatus.class ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "The function doesn't exist") }) - @Produces(MediaType.APPLICATION_JSON) @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status") - public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus( - final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("instanceId") String instanceId) throws IOException { - return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId, uri.getRequestUri()); + public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) throws IOException { + + return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET @ApiOperation( - value = "Displays the status of a Pulsar Function", - response = FunctionStatus.class + value = "Displays the status of a Pulsar Function running in cluster mode", + response = InstanceCommunication.FunctionStatus.class ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The function doesn't exist") + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) - @Produces(MediaType.APPLICATION_JSON) @Path("/{tenant}/{namespace}/{functionName}/status") - public FunctionStatus getFunctionStatus( - final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) throws IOException { - return functions.getFunctionStatus( + public Response getFunctionStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) throws IOException { + return functions.getFunctionStatusV2( tenant, namespace, functionName, uri.getRequestUri()); } @GET @ApiOperation( - value = "Displays the stats of a Pulsar Function", - response = FunctionStats.class + value = "Lists all Pulsar Functions currently deployed in a given namespace", + response = String.class, + responseContainer = "Collection" ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The function doesn't exist") + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{functionName}/stats") - public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) throws IOException { - return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri()); + @Path("/{tenant}/{namespace}") + public Response listFunctions(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions( tenant, namespace); } - @GET + @POST @ApiOperation( - value = "Displays the stats of a Pulsar Function instance", - response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class + value = "Triggers a Pulsar Function with a user-specified value or file data", + response = Message.class ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The function doesn't exist") + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 500, message = "Internal server error") }) - @Produces(MediaType.APPLICATION_JSON) - @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats") - public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats( - final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("instanceId") String instanceId) throws IOException { - return functions.getFunctionsInstanceStats( - tenant, namespace, functionName, instanceId, uri.getRequestUri()); - } - - @POST @Path("/{tenant}/{namespace}/{functionName}/trigger") @Consumes(MediaType.MULTIPART_FORM_DATA) - public String triggerFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") String input, - final @FormDataParam("dataStream") InputStream uploadedInputStream, - final @FormDataParam("topic") String topic) { - return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic); + public Response triggerFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") String triggerValue, + final @FormDataParam("dataStream") InputStream triggerStream, + final @FormDataParam("topic") String topic) { + return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic); } - @POST - @ApiOperation(value = "Restart function instance", response = Void.class) + @GET + @ApiOperation( + value = "Fetch the current state associated with a Pulsar Function", + response = String.class + ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The key does not exist"), @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/state/{key}") + public Response getFunctionState(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @PathParam("key") String key) { + return functions.getFunctionState(tenant, namespace, functionName, key); + } + + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") @Consumes(MediaType.APPLICATION_JSON) - public void restartFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("instanceId") String instanceId) { - functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri()); + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @POST @ApiOperation(value = "Restart all function instances", response = Void.class) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") - }) + @ApiResponse(code = 500, message = "Internal server error") }) @Path("/{tenant}/{namespace}/{functionName}/restart") @Consumes(MediaType.APPLICATION_JSON) - public void restartFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - functions.restartFunctionInstances(tenant, namespace, functionName); + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); } @POST @ApiOperation(value = "Stop function instance", response = Void.class) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") - }) + @ApiResponse(code = 500, message = "Internal server error") }) @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop") @Consumes(MediaType.APPLICATION_JSON) - public void stopFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("instanceId") String instanceId) { - functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri()); + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @POST @ApiOperation(value = "Stop all function instances", response = Void.class) - @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") - }) + @ApiResponse(code = 500, message = "Internal server error") }) @Path("/{tenant}/{namespace}/{functionName}/stop") @Consumes(MediaType.APPLICATION_JSON) - public void stopFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName) { - functions.stopFunctionInstances(tenant, namespace, functionName); + public Response stopFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.stopFunctionInstances(tenant, namespace, functionName); } @POST + @ApiOperation( + value = "Uploads Pulsar Function file data", + hidden = true + ) @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) - public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("path") String path) { - functions.uploadFunction(uploadedInputStream, path); + public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("path") String path) { + return functions.uploadFunction(uploadedInputStream, path); } @GET + @ApiOperation( + value = "Downloads Pulsar Function file data", + hidden = true + ) @Path("/download") - public StreamingOutput downloadFunction(final @QueryParam("path") String path) { + public Response downloadFunction(final @QueryParam("path") String path) { return functions.downloadFunction(path); } @GET + @ApiOperation( + value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode", + response = List.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout") + }) @Path("/connectors") public List<ConnectorDefinition> getConnectorsList() throws IOException { return functions.getListOfConnectors(); } - - @GET - @Path("/{tenant}/{namespace}/{functionName}/state/{key}") - public FunctionState getFunctionState(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @PathParam("key") String key) throws IOException { - return functions.getFunctionState(tenant, namespace, functionName, key); - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java similarity index 98% copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java index b620fe5..d25f8f9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -49,11 +49,11 @@ import java.util.List; @Slf4j @Path("/functions") -public class FunctionApiV2Resource extends FunctionApiResource { +public class FunctionApiV3Resource extends FunctionApiResource { protected final FunctionsImpl functions; - public FunctionApiV2Resource() { + public FunctionApiV3Resource() { this.functions = new FunctionsImpl(this); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java similarity index 98% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java index e630db4..6a8f25e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -42,11 +42,11 @@ import java.util.List; @Slf4j @Path("/sink") -public class SinkApiV2Resource extends FunctionApiResource { +public class SinkApiV3Resource extends FunctionApiResource { protected final SinkImpl sink; - public SinkApiV2Resource() { + public SinkApiV3Resource() { this.sink = new SinkImpl(this); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java similarity index 98% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java index bb49fe1..8675cc5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -41,11 +41,11 @@ import java.util.List; @Slf4j @Path("/source") -public class SourceApiV2Resource extends FunctionApiResource { +public class SourceApiV3Resource extends FunctionApiResource { protected final SourceImpl source; - public SourceApiV2Resource() { + public SourceApiV3Resource() { this.source = new SourceImpl(this); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 25a3a66..aa22650 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; import org.apache.pulsar.functions.worker.rest.api.ComponentImpl; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -75,6 +76,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import static org.apache.pulsar.functions.utils.Utils.mergeJson; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -131,7 +133,7 @@ public class FunctionApiV2ResourceTest { private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; private Namespace mockedNamespace; - private FunctionsImpl resource; + private FunctionsImplV2 resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; private FunctionMetaData mockedFunctionMetadata; @@ -167,16 +169,20 @@ public class FunctionApiV2ResourceTest { // worker config WorkerConfig workerConfig = new WorkerConfig() - .setWorkerId("test") - .setWorkerPort(8080) - .setDownloadDirectory("/tmp/pulsar/functions") - .setFunctionMetadataTopicName("pulsar/functions") - .setNumFunctionPackageReplicas(3) - .setPulsarServiceUrl("pulsar://localhost:6650/"); + .setWorkerId("test") + .setWorkerPort(8080) + .setDownloadDirectory("/tmp/pulsar/functions") + .setFunctionMetadataTopicName("pulsar/functions") + .setNumFunctionPackageReplicas(3) + .setPulsarServiceUrl("pulsar://localhost:6650/"); when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); - this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any()); + FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService)); + + doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any()); + + this.resource = spy(new FunctionsImplV2(functions)); + } // @@ -208,16 +214,16 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionMissingNamespace() { try { testRegisterFunctionMissingArguments( - tenant, - null, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + null, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, null); } catch (RestException re){ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -228,22 +234,22 @@ public class FunctionApiV2ResourceTest { @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided") public void testRegisterFunctionMissingFunctionName() { try { - testRegisterFunctionMissingArguments( - tenant, - namespace, - null, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, - outputSerdeClassName, - className, - parallelism, - null); - } catch (RestException re){ - assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); - throw re; - } + testRegisterFunctionMissingArguments( + tenant, + namespace, + null, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism, + null); + } catch (RestException re){ + assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); + throw re; + } } @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided") @@ -292,16 +298,16 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionMissingPackageDetails() { try { testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - null, - outputTopic, + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + null, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, null); } catch (RestException re){ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -355,17 +361,17 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionWrongParallelism() { try { testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, - outputSerdeClassName, - className, - -2, - null); + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + -2, + null); } catch (RestException re){ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -398,17 +404,17 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionWrongOutputTopic() { try { testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - function + "-output-topic/test:", - outputSerdeClassName, - className, - parallelism, - null); + tenant, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + function + "-output-topic/test:", + outputSerdeClassName, + className, + parallelism, + null); } catch (RestException re){ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -419,17 +425,17 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionHttpUrl() { try { testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - null, - topicsToSerDeClassName, - null, - outputTopic, - outputSerdeClassName, - className, - parallelism, - "http://localhost:1234/test"); + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + null, + outputTopic, + outputSerdeClassName, + className, + parallelism, + "http://localhost:1234/test"); } catch (RestException re){ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -491,14 +497,14 @@ public class FunctionApiV2ResourceTest { private void registerDefaultFunction() { FunctionConfig functionConfig = createDefaultFunctionConfig(); resource.registerFunction( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - null, - null, - new Gson().toJson(functionConfig), + tenant, + namespace, + function, + mockedInputStream, + mockedFormData, + null, + null, + new Gson().toJson(functionConfig), null); } @@ -523,8 +529,8 @@ public class FunctionApiV2ResourceTest { Utils.uploadFileToBookkeeper( anyString(), - any(File.class), - any(Namespace.class)); + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); registerDefaultFunction(); @@ -540,15 +546,15 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); RequestResult rr = new RequestResult() - .setSuccess(true) - .setMessage("function registered"); + .setSuccess(true) + .setMessage("function registered"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); @@ -587,15 +593,15 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); RequestResult rr = new RequestResult() - .setSuccess(false) - .setMessage("function failed to register"); + .setSuccess(false) + .setMessage("function failed to register"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); @@ -612,14 +618,14 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture( - new IOException("Function registeration interrupted")); + new IOException("Function registeration interrupted")); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); registerDefaultFunction(); @@ -637,16 +643,16 @@ public class FunctionApiV2ResourceTest { public void testUpdateFunctionMissingTenant() { try { testUpdateFunctionMissingArguments( - null, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + null, + namespace, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, "Tenant is not provided"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -658,16 +664,16 @@ public class FunctionApiV2ResourceTest { public void testUpdateFunctionMissingNamespace() { try { testUpdateFunctionMissingArguments( - tenant, - null, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + null, + function, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, "Namespace is not provided"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -679,16 +685,16 @@ public class FunctionApiV2ResourceTest { public void testUpdateFunctionMissingFunctionName() { try { testUpdateFunctionMissingArguments( - tenant, - namespace, - null, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + namespace, + null, + mockedInputStream, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, "Function Name is not provided"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -703,16 +709,16 @@ public class FunctionApiV2ResourceTest { doNothing().when(Utils.class); Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - null, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - className, - parallelism, + className, + parallelism, "Update contains no change"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -751,16 +757,16 @@ public class FunctionApiV2ResourceTest { doNothing().when(Utils.class); Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - null, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - null, - parallelism, + null, + parallelism, "Update contains no change"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -775,17 +781,17 @@ public class FunctionApiV2ResourceTest { doNothing().when(Utils.class); Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - null, - topicsToSerDeClassName, - mockedFormData, - outputTopic, - outputSerdeClassName, - null, - parallelism + 1, - null); + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + outputTopic, + outputSerdeClassName, + null, + parallelism + 1, + null); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -799,17 +805,17 @@ public class FunctionApiV2ResourceTest { doNothing().when(Utils.class); Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - null, - topicsToSerDeClassName, - mockedFormData, - "DifferentOutput", - outputSerdeClassName, - null, - parallelism, - "Output topics differ"); + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + "DifferentOutput", + outputSerdeClassName, + null, + parallelism, + "Output topics differ"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -825,17 +831,17 @@ public class FunctionApiV2ResourceTest { Map<String, String> someOtherInput = new HashMap<>(); someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - null, - someOtherInput, - mockedFormData, - outputTopic, - outputSerdeClassName, - null, - parallelism, - "Input Topics cannot be altered"); + tenant, + namespace, + function, + null, + someOtherInput, + mockedFormData, + outputTopic, + outputSerdeClassName, + null, + parallelism, + "Input Topics cannot be altered"); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); throw re; @@ -892,14 +898,14 @@ public class FunctionApiV2ResourceTest { } resource.updateFunction( - tenant, - namespace, - function, - inputStream, - details, - null, - null, - new Gson().toJson(functionConfig), + tenant, + namespace, + function, + inputStream, + details, + null, + null, + new Gson().toJson(functionConfig), null); } @@ -917,14 +923,14 @@ public class FunctionApiV2ResourceTest { functionConfig.setOutputSerdeClassName(outputSerdeClassName); resource.updateFunction( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - null, - null, - new Gson().toJson(functionConfig), + tenant, + namespace, + function, + mockedInputStream, + mockedFormData, + null, + null, + new Gson().toJson(functionConfig), null); } @@ -946,8 +952,8 @@ public class FunctionApiV2ResourceTest { doThrow(new IOException("upload failure")).when(Utils.class); Utils.uploadFileToBookkeeper( anyString(), - any(File.class), - any(Namespace.class)); + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); @@ -963,15 +969,15 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); RequestResult rr = new RequestResult() - .setSuccess(true) - .setMessage("function registered"); + .setSuccess(true) + .setMessage("function registered"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); @@ -1000,18 +1006,18 @@ public class FunctionApiV2ResourceTest { RequestResult rr = new RequestResult() .setSuccess(true) .setMessage("function registered"); - CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); - when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); resource.updateFunction( - tenant, - namespace, - function, - null, - null, - filePackageUrl, - null, - new Gson().toJson(functionConfig), + tenant, + namespace, + function, + null, + null, + filePackageUrl, + null, + new Gson().toJson(functionConfig), null); } @@ -1022,15 +1028,15 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); RequestResult rr = new RequestResult() - .setSuccess(false) - .setMessage("function failed to register"); + .setSuccess(false) + .setMessage("function failed to register"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); @@ -1047,14 +1053,14 @@ public class FunctionApiV2ResourceTest { mockStatic(Utils.class); doNothing().when(Utils.class); Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + any(Namespace.class), + any(InputStream.class), + anyString()); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture( - new IOException("Function registeration interrupted")); + new IOException("Function registeration interrupted")); when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); updateDefaultFunction(); @@ -1073,9 +1079,9 @@ public class FunctionApiV2ResourceTest { try { testDeregisterFunctionMissingArguments( - null, - namespace, - function + null, + namespace, + function ); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -1087,9 +1093,9 @@ public class FunctionApiV2ResourceTest { public void testDeregisterFunctionMissingNamespace() { try { testDeregisterFunctionMissingArguments( - tenant, - null, - function + tenant, + null, + function ); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -1100,10 +1106,10 @@ public class FunctionApiV2ResourceTest { @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided") public void testDeregisterFunctionMissingFunctionName() { try { - testDeregisterFunctionMissingArguments( - tenant, - namespace, - null + testDeregisterFunctionMissingArguments( + tenant, + namespace, + null ); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -1117,17 +1123,17 @@ public class FunctionApiV2ResourceTest { String function ) { resource.deregisterFunction( - tenant, - namespace, - function, + tenant, + namespace, + function, null); } private void deregisterDefaultFunction() { resource.deregisterFunction( - tenant, - namespace, - function, + tenant, + namespace, + function, null); } @@ -1147,8 +1153,8 @@ public class FunctionApiV2ResourceTest { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); RequestResult rr = new RequestResult() - .setSuccess(true) - .setMessage("function deregistered"); + .setSuccess(true) + .setMessage("function deregistered"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult); @@ -1161,8 +1167,8 @@ public class FunctionApiV2ResourceTest { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); RequestResult rr = new RequestResult() - .setSuccess(false) - .setMessage("function failed to deregister"); + .setSuccess(false) + .setMessage("function failed to deregister"); CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult); @@ -1195,12 +1201,12 @@ public class FunctionApiV2ResourceTest { // @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided") - public void testGetFunctionMissingTenant() { + public void testGetFunctionMissingTenant() throws IOException { try { testGetFunctionMissingArguments( - null, - namespace, - function + null, + namespace, + function ); } catch (RestException re) { @@ -1210,12 +1216,12 @@ public class FunctionApiV2ResourceTest { } @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided") - public void testGetFunctionMissingNamespace() { + public void testGetFunctionMissingNamespace() throws IOException { try { testGetFunctionMissingArguments( - tenant, - null, - function + tenant, + null, + function ); } catch (RestException re) { @@ -1225,12 +1231,12 @@ public class FunctionApiV2ResourceTest { } @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided") - public void testGetFunctionMissingFunctionName() { + public void testGetFunctionMissingFunctionName() throws IOException { try { testGetFunctionMissingArguments( - tenant, - namespace, - null + tenant, + namespace, + null ); } catch (RestException re) { @@ -1243,25 +1249,28 @@ public class FunctionApiV2ResourceTest { String tenant, String namespace, String function - ) { + ) throws IOException { resource.getFunctionInfo( - tenant, - namespace, - function + tenant, + namespace, + function ); } - private FunctionConfig getDefaultFunctionInfo() { - return resource.getFunctionInfo( - tenant, - namespace, - function - ); + private FunctionDetails getDefaultFunctionInfo() throws IOException { + String json = (String) resource.getFunctionInfo( + tenant, + namespace, + function + ).getEntity(); + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + mergeJson(json, functionDetailsBuilder); + return functionDetailsBuilder.build(); } @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist") - public void testGetNotExistedFunction() { + public void testGetNotExistedFunction() throws IOException { try { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); getDefaultFunctionInfo(); @@ -1272,7 +1281,7 @@ public class FunctionApiV2ResourceTest { } @Test - public void testGetFunctionSuccess() { + public void testGetFunctionSuccess() throws IOException { when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); SinkSpec sinkSpec = SinkSpec.newBuilder() @@ -1289,17 +1298,17 @@ public class FunctionApiV2ResourceTest { .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); FunctionMetaData metaData = FunctionMetaData.newBuilder() - .setCreateTime(System.currentTimeMillis()) - .setFunctionDetails(functionDetails) - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package")) - .setVersion(1234) - .build(); + .setCreateTime(System.currentTimeMillis()) + .setFunctionDetails(functionDetails) + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package")) + .setVersion(1234) + .build(); when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData); - FunctionConfig functionConfig = getDefaultFunctionInfo(); + FunctionDetails actual = getDefaultFunctionInfo(); assertEquals( - FunctionConfigUtils.convertFromDetails(functionDetails), - functionConfig); + functionDetails, + actual); } // @@ -1310,8 +1319,8 @@ public class FunctionApiV2ResourceTest { public void testListFunctionsMissingTenant() { try { testListFunctionsMissingArguments( - null, - namespace + null, + namespace ); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -1323,8 +1332,8 @@ public class FunctionApiV2ResourceTest { public void testListFunctionsMissingNamespace() { try { testListFunctionsMissingArguments( - tenant, - null + tenant, + null ); } catch (RestException re) { assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); @@ -1337,17 +1346,17 @@ public class FunctionApiV2ResourceTest { String namespace ) { resource.listFunctions( - tenant, - namespace + tenant, + namespace ); } private List<String> listDefaultFunctions() { - return resource.listFunctions( - tenant, - namespace - ); + return new Gson().fromJson((String) resource.listFunctions( + tenant, + namespace + ).getEntity(), List.class); } @Test @@ -1369,33 +1378,11 @@ public class FunctionApiV2ResourceTest { } @Test - public void testOnlyGetSources() { - List<String> functions = Lists.newArrayList("test-2"); - List<FunctionMetaData> functionMetaDataList = new LinkedList<>(); - FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails( - FunctionDetails.newBuilder().setName("test-1").build()).build(); - functionMetaDataList.add(f1); - FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails( - FunctionDetails.newBuilder().setName("test-2").build()).build(); - functionMetaDataList.add(f2); - FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails( - FunctionDetails.newBuilder().setName("test-3").build()).build(); - functionMetaDataList.add(f3); - when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList); - doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1); - doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2); - doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3); - - List<String> functionList = listDefaultFunctions(); - assertEquals(functions, functionList); - } - - @Test public void testDownloadFunctionHttpUrl() throws Exception { String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar"; String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - FunctionsImpl function = new FunctionsImpl(null); - StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl); + FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService); + StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity(); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); @@ -1409,8 +1396,8 @@ public class FunctionApiV2ResourceTest { public void testDownloadFunctionFile() throws Exception { String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - FunctionsImpl function = new FunctionsImpl(null); - StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation); + FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService); + StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity(); File pkgFile = new File(testDir, UUID.randomUUID().toString()); OutputStream output = new FileOutputStream(pkgFile); streamOutput.write(output); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java similarity index 99% copy from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java copy to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index 25a3a66..580345e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import com.google.common.collect.Lists; import com.google.gson.Gson; @@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.RestException; import org.apache.pulsar.functions.worker.rest.api.ComponentImpl; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -93,7 +94,7 @@ import static org.testng.Assert.assertEquals; @PrepareForTest(Utils.class) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" }) @Slf4j -public class FunctionApiV2ResourceTest { +public class FunctionApiV3ResourceTest { @ObjectFactory public IObjectFactory getObjectFactory() { @@ -1393,7 +1394,7 @@ public class FunctionApiV2ResourceTest { @Test public void testDownloadFunctionHttpUrl() throws Exception { String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar"; - String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionsImpl function = new FunctionsImpl(null); StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl); File pkgFile = new File(testDir, UUID.randomUUID().toString()); @@ -1408,7 +1409,7 @@ public class FunctionApiV2ResourceTest { @Test public void testDownloadFunctionFile() throws Exception { String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionsImpl function = new FunctionsImpl(null); StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation); File pkgFile = new File(testDir, UUID.randomUUID().toString()); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java similarity index 99% rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index ee9a014..bec3ca9 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import com.google.common.collect.Lists; import com.google.gson.Gson; @@ -87,12 +87,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; /** - * Unit test of {@link SinkApiV2Resource}. + * Unit test of {@link SinkApiV3Resource}. */ @PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j -public class SinkApiV2ResourceTest { +public class SinkApiV3ResourceTest { @ObjectFactory public IObjectFactory getObjectFactory() { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java similarity index 99% rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 194f624..72d23e2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.worker.rest.api.v2; +package org.apache.pulsar.functions.worker.rest.api.v3; import com.google.common.collect.Lists; import com.google.gson.Gson; @@ -86,12 +86,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; /** - * Unit test of {@link SourceApiV2Resource}. + * Unit test of {@link SourceApiV3Resource}. */ @PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j -public class SourceApiV2ResourceTest { +public class SourceApiV3ResourceTest { @ObjectFactory public IObjectFactory getObjectFactory() {