This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d2546f0 [REST API Doc]Sink API refinement (#4520) d2546f0 is described below commit d2546f0c38e378ddf668a27330d7f9c5412a04b7 Author: Yijie Shen <henry.yijies...@gmail.com> AuthorDate: Tue Jun 18 03:35:02 2019 +0800 [REST API Doc]Sink API refinement (#4520) * revisit documentation for pulsar admin api * format * update json body * sinkConfig as doc base --- .../apache/pulsar/broker/admin/impl/SinksBase.java | 294 ++++++++++++++++----- pulsar-common/pom.xml | 5 + .../pulsar/common/functions/UpdateOptions.java | 6 + 3 files changed, 246 insertions(+), 59 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index 50839d8..c527bd3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -18,9 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.*; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.common.functions.UpdateOptions; @@ -63,19 +61,81 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode") @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"), - @ApiResponse(code = 408, message = "Request timeout"), - @ApiResponse(code = 200, message = "Pulsar Function successfully created") + @ApiResponse(code = 400, message = "Invalid request (sink already exists, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Sink successfully created"), + @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"), + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}") @Consumes(MediaType.MULTIPART_FORM_DATA) - public void registerSink(final @PathParam("tenant") String tenant, + public void registerSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, + @ApiParam( + value = + "A JSON value presenting a sink config playload. All available configuration options are: \n" + + "classname \n" + + " The sink's class name if archive is file-url-path (file://) \n" + + "sourceSubscriptionName \n" + + " Pulsar source subscription name if user wants a specific \n" + + " subscription-name for input-topic consumer \n" + + "inputs \n" + + " The sink's input topic or topics (specified as a JSON array) \n" + + "topicsPattern \n" + + " TopicsPattern to consume from list of topics under a namespace that " + + " match the pattern. [input] and [topicsPattern] are mutually " + + " exclusive. Add SerDe class name for a pattern in customSerdeInputs " + + " (supported for java fun only)" + + "topicToSerdeClassName \n" + + " The map of input topics to SerDe class names (specified as a JSON object) \n" + + "topicToSchemaType \n" + + " The map of input topics to Schema types or class names (specified as a JSON object) \n" + + "inputSpecs \n" + + " The map of input topics to its consumer configuration, each configuration has schema of " + + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" + + "configs \n" + + " The map of configs (specified as a JSON object) \n" + + "secrets \n" + + " a map of secretName(aka how the secret is going to be \n" + + " accessed in the function via context) to an object that \n" + + " encapsulates how the secret is fetched by the underlying \n" + + " secrets provider. The type of an value here can be found by the \n" + + " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" + + "parallelism \n" + + " The sink's parallelism factor (i.e. the number of sink instances to run \n" + + "processingGuarantees \n" + + " The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" + + "retainOrdering \n" + + " Boolean denotes whether sink consumes and sinks messages in order \n" + + "resources \n" + + " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime) \n" + + "autoAck \n" + + " Boolean denotes whether or not the framework will automatically acknowledge messages \n" + + "timeoutMs \n" + + " Long denotes the message timeout in milliseconds \n" + + "cleanupSubscription \n" + + " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" + + "runtimeFlags \n" + + " Any flags that you want to pass to the runtime as a single string \n", + examples = @Example( + value = @ExampleProperty( + mediaType = MediaType.APPLICATION_JSON, + value = "{ \n" + + "\t\"classname\": \"org.example.MySinkTest\",\n" + + "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" + + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" + + "\t\"parallelism\": 10\n" + + "}" + ) + ) + ) final @FormDataParam("sinkConfig") String sinkConfigJson) { sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, @@ -85,19 +145,84 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @PUT @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode") @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"), - @ApiResponse(code = 200, message = "Pulsar Function successfully updated") + @ApiResponse(code = 400, message = "Invalid request (sink doesn't exist, update contains no change, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Sink successfully updated"), + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}") @Consumes(MediaType.MULTIPART_FORM_DATA) - public void updateSink(final @PathParam("tenant") String tenant, + public void updateSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("url") String functionPkgUrl, + @ApiParam( + value = + "A JSON value presenting a sink config playload. All available configuration options are: \n" + + "classname \n" + + " The sink's class name if archive is file-url-path (file://) \n" + + "sourceSubscriptionName \n" + + " Pulsar source subscription name if user wants a specific \n" + + " subscription-name for input-topic consumer \n" + + "inputs \n" + + " The sink's input topic or topics (specified as a JSON array) \n" + + "topicsPattern \n" + + " TopicsPattern to consume from list of topics under a namespace that " + + " match the pattern. [input] and [topicsPattern] are mutually " + + " exclusive. Add SerDe class name for a pattern in customSerdeInputs " + + " (supported for java fun only)" + + "topicToSerdeClassName \n" + + " The map of input topics to SerDe class names (specified as a JSON object) \n" + + "topicToSchemaType \n" + + " The map of input topics to Schema types or class names (specified as a JSON object) \n" + + "inputSpecs \n" + + " The map of input topics to its consumer configuration, each configuration has schema of " + + " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" + + "configs \n" + + " The map of configs (specified as a JSON object) \n" + + "secrets \n" + + " a map of secretName(aka how the secret is going to be \n" + + " accessed in the function via context) to an object that \n" + + " encapsulates how the secret is fetched by the underlying \n" + + " secrets provider. The type of an value here can be found by the \n" + + " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" + + "parallelism \n" + + " The sink's parallelism factor (i.e. the number of sink instances to run \n" + + "processingGuarantees \n" + + " The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" + + "retainOrdering \n" + + " Boolean denotes whether sink consumes and sinks messages in order \n" + + "resources \n" + + " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime) \n" + + "autoAck \n" + + " Boolean denotes whether or not the framework will automatically acknowledge messages \n" + + "timeoutMs \n" + + " Long denotes the message timeout in milliseconds \n" + + "cleanupSubscription \n" + + " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" + + "runtimeFlags \n" + + " Any flags that you want to pass to the runtime as a single string \n", + examples = @Example( + value = @ExampleProperty( + mediaType = MediaType.APPLICATION_JSON, + value = "{ \n" + + "\t\"classname\": \"org.example.SinkStressTest\", \n" + + "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" + + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" + + "\t\"parallelism\": 5\n" + + "}" + ) + ) + ) final @FormDataParam("sinkConfig") String sinkConfigJson, + @ApiParam() final @FormDataParam("updateOptions") UpdateOptions updateOptions) { sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, @@ -109,15 +234,20 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @DELETE @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode") @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function doesn't exist"), - @ApiResponse(code = 408, message = "Request timeout"), - @ApiResponse(code = 200, message = "The function was successfully deleted") + @ApiResponse(code = 400, message = "Invalid deregister request"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 200, message = "The sink was successfully deleted"), + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), + @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"), + @ApiResponse(code = 408, message = "Got InterruptedException while deregistering the sink"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}") - public void deregisterSink(final @PathParam("tenant") String tenant, + public void deregisterSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) { sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData()); } @@ -128,14 +258,16 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> response = SinkConfig.class ) @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 408, message = "Request timeout"), - @ApiResponse(code = 404, message = "The function doesn't exist") + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}") - public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant, + public SinkConfig getSinkInfo(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) throws IOException { return sink.getSinkInfo(tenant, namespace, sinkName); } @@ -146,16 +278,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class ) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The sink doesn't exist") + @ApiResponse(code = 400, message = "The sink instance does not exist"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal Server Error (got exception while getting status, etc.)"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Produces(MediaType.APPLICATION_JSON) @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status") public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus( + @ApiParam(value = "The sink's tenant") final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, + @ApiParam(value = "The sink instanceId") final @PathParam("instanceId") String instanceId) throws IOException { return sink.getSinkInstanceStatus( tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); @@ -167,14 +304,18 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> response = SinkStatus.class ) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 404, message = "The sink doesn't exist") + @ApiResponse(code = 400, message = "Invalid get status request"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."), }) @Produces(MediaType.APPLICATION_JSON) @Path("/{tenant}/{namespace}/{sinkName}/status") - public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant, + public SinkStatus getSinkStatus(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) throws IOException { return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData()); } @@ -186,11 +327,15 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> responseContainer = "Collection" ) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + @ApiResponse(code = 400, message = "Invalid list request"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 500, message = "Internal server error (failed to authorize, etc.)"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}") - public List<String> listSinks(final @PathParam("tenant") String tenant, + public List<String> listSinks(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace) { return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData()); } @@ -198,15 +343,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Restart sink instance", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid restart request"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink instance, failed to authorize, etc.)"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart") @Consumes(MediaType.APPLICATION_JSON) - public void restartSink(final @PathParam("tenant") String tenant, + public void restartSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, + @ApiParam(value = "The sink instanceId") final @PathParam("instanceId") String instanceId) { sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); } @@ -214,14 +365,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Restart all sink instances", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid restart request"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink, failed to authorize, etc.)"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/restart") @Consumes(MediaType.APPLICATION_JSON) - public void restartSink(final @PathParam("tenant") String tenant, + public void restartSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) { sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); } @@ -229,15 +385,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Stop sink instance", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid stop request"), + @ApiResponse(code = 404, message = "The sink instance does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop") @Consumes(MediaType.APPLICATION_JSON) - public void stopSink(final @PathParam("tenant") String tenant, + public void stopSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, + @ApiParam(value = "The sink instanceId") final @PathParam("instanceId") String instanceId) { sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); } @@ -245,14 +407,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Stop all sink instances", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid stop request"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/stop") @Consumes(MediaType.APPLICATION_JSON) - public void stopSink(final @PathParam("tenant") String tenant, + public void stopSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) { sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); } @@ -260,15 +427,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Start sink instance", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid start request"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start") @Consumes(MediaType.APPLICATION_JSON) - public void startSink(final @PathParam("tenant") String tenant, + public void startSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName, + @ApiParam(value = "The sink instanceId") final @PathParam("instanceId") String instanceId) { sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData()); } @@ -276,14 +449,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> @POST @ApiOperation(value = "Start all sink instances", response = Void.class) @ApiResponses(value = { - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 404, message = "The function does not exist"), - @ApiResponse(code = 500, message = "Internal server error") + @ApiResponse(code = 400, message = "Invalid start request"), + @ApiResponse(code = 404, message = "The sink does not exist"), + @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"), + @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), + @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.") }) @Path("/{tenant}/{namespace}/{sinkName}/start") @Consumes(MediaType.APPLICATION_JSON) - public void startSink(final @PathParam("tenant") String tenant, + public void startSink(@ApiParam(value = "The sink's tenant") + final @PathParam("tenant") String tenant, + @ApiParam(value = "The sink's namespace") final @PathParam("namespace") String namespace, + @ApiParam(value = "The sink's name") final @PathParam("sinkName") String sinkName) { sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData()); } @@ -294,9 +472,7 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService> response = List.class ) @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), - @ApiResponse(code = 400, message = "Invalid request"), - @ApiResponse(code = 408, message = "Request timeout") + @ApiResponse(code = 200, message = "Get builtin sinks successfully.") }) @Path("/builtinsinks") public List<ConnectorDefinition> getSinkList() { diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 9b40e00..74ff8aa 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -104,6 +104,11 @@ </dependency> <dependency> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + </dependency> + + <dependency> <groupId>org.apache.bookkeeper</groupId> <artifactId>circe-checksum</artifactId> <version>${bookkeeper.version}</version> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java index b5d956d..d1186ca 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java @@ -18,12 +18,18 @@ */ package org.apache.pulsar.common.functions; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor +@ApiModel(value = "UpdateOptions", description = "Options while updating the sink") public class UpdateOptions { + @ApiModelProperty( + value = "Whether or not to update the auth data", + name = "update-auth-data") private boolean updateAuthData = false; }