This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b4658e2 Made upload/download generic so that it doesnt have to be function specific (#1629) b4658e2 is described below commit b4658e2f94ba150e00ffe01ec92b358d8c443400 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Apr 21 14:13:21 2018 -0700 Made upload/download generic so that it doesnt have to be function specific (#1629) --- .../pulsar/broker/admin/impl/FunctionsBase.java | 21 +++------ .../org/apache/pulsar/client/admin/Functions.java | 30 +++++------- .../client/admin/internal/FunctionsImpl.java | 16 +++---- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 49 +++++++++----------- .../functions/worker/rest/api/FunctionsImpl.java | 53 +++++----------------- .../worker/rest/api/v2/FunctionApiV2Resource.java | 21 +++------ 6 files changed, 67 insertions(+), 123 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 5e0747b..832cb66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -155,24 +155,17 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi } @POST - @Path("/{tenant}/{namespace}/{functionName}/upload") + @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) - public Response uploadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail) { - return functions.uploadFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail); + public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("path") String path) { + return functions.uploadFunction(uploadedInputStream, path); } @GET - @Path("/{tenant}/{namespace}/{functionName}/download") - public Response downloadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @QueryParam("filename") String fileName) { - return functions.downloadFunction(tenant, namespace, functionName, fileName); + @Path("/download") + public Response downloadFunction(final @QueryParam("path") String path) { + return functions.downloadFunction(path); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 795683d..8f058c7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -160,38 +160,30 @@ public interface Functions { String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException; /** - * Upload Function Code. + * Upload Data. * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param function - * Function name - * @param fileName - * Code where file is located + * @param sourceFile + * dataFile that needs to be uploaded + * @param path + * Path where data should be stored * * @throws PulsarAdminException * Unexpected error */ - void uploadFunction(String tenant, String namespace, String function, String fileName) throws PulsarAdminException; + void uploadFunction(String sourceFile, String path) throws PulsarAdminException; /** * Download Function Code. * - * @param tenant - * Tenant name - * @param namespace - * Namespace name - * @param function - * Function name - * @param fileName - * Code where function should be downloaded + * @param destinationFile + * file where data should be downloaded to + * @param path + * Path where data is located * * @throws PulsarAdminException * Unexpected error */ - void downloadFunction(String tenant, String namespace, String function, String fileName) throws PulsarAdminException; + void downloadFunction(String destinationFile, String path) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index f831b9e..30e6bd9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -169,13 +169,14 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override - public void uploadFunction(String tenant, String namespace, String functionName, String fileName) throws PulsarAdminException { + public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); - mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + mp.bodyPart(new FileDataBodyPart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM_TYPE)); - request(functions.path(tenant).path(namespace).path(functionName).path("upload")) + mp.bodyPart(new FormDataBodyPart("path", path, MediaType.TEXT_PLAIN_TYPE)); + request(functions.path("upload")) .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); } catch (Exception e) { throw getApiException(e); @@ -183,13 +184,12 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override - public void downloadFunction(String tenant, String namespace, String function, String path) throws PulsarAdminException { + public void downloadFunction(String destinationPath, String path) throws PulsarAdminException { try { - Path pathToFile = Paths.get(path); - InputStream response = request(functions.path(tenant).path(namespace).path(function).path("download") - .queryParam("filename", pathToFile.getFileName().toString())).get(InputStream.class); + InputStream response = request(functions.path("download") + .queryParam("path", path)).get(InputStream.class); if (response != null) { - File targetFile = new File(path); + File targetFile = new File(destinationPath); java.nio.file.Files.copy( response, targetFile.toPath(), diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 339b947..52feab9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -693,46 +693,41 @@ public class CmdFunctions extends CmdBase { } } - @Parameters(commandDescription = "Upload Pulsar Function code to Pulsar") - class UploadFunction extends FunctionCommand { + @Parameters(commandDescription = "Upload File Data to Pulsar") + class UploadFunction extends BaseCommand { @Parameter( - names = "--jar", - description = "Path to the jar file for the function (if the function is written in Java)", - listConverter = StringConverter.class) - protected String jarFile; + names = "--sourceFile", + description = "The file whose contents need to be uploaded", + listConverter = StringConverter.class, required = true) + protected String sourceFile; @Parameter( - names = "--py", - description = "Path to the main Python file for the function (if the function is written in Python)", - listConverter = StringConverter.class) - protected String pyFile; - + names = "--path", + description = "Path where the contents need to be stored", + listConverter = StringConverter.class, required = true) + protected String path; @Override void runCmd() throws Exception { - if (jarFile == null && pyFile == null) { - throw new RuntimeException("Either a jar File or a python file needs to be specified"); - } - String userCodeFile; - if (jarFile != null) { - userCodeFile = jarFile; - } else { - userCodeFile = pyFile; - } - admin.functions().uploadFunction(tenant, namespace, functionName, userCodeFile); + admin.functions().uploadFunction(sourceFile, path); print("Uploaded successfully"); } } - @Parameters(commandDescription = "Download Pulsar Function code from Pulsar") - class DownloadFunction extends FunctionCommand { + @Parameters(commandDescription = "Download File Data from Pulsar") + class DownloadFunction extends BaseCommand { + @Parameter( + names = "--destinationFile", + description = "The file where downloaded contents need to be stored", + listConverter = StringConverter.class, required = true) + protected String destinationFile; @Parameter( - names = "--downloadPath", - description = "Path where the file needs to be downloaded)", + names = "--path", + description = "Path where the contents are to be stored", listConverter = StringConverter.class, required = true) - protected String downloadPath; + protected String path; @Override void runCmd() throws Exception { - admin.functions().downloadFunction(tenant, namespace, functionName, downloadPath); + admin.functions().downloadFunction(destinationFile, path); print("Downloaded successfully"); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index b870619..6f98a42 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -529,52 +529,32 @@ public class FunctionsImpl { } @POST - @Path("/{tenant}/{namespace}/{functionName}/upload") + @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) - public Response uploadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail) { + public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("path") String path) { // validate parameters try { - if (tenant == null) { - throw new IllegalArgumentException("Tenant is not provided"); - } - if (namespace == null) { - throw new IllegalArgumentException("Namespace is not provided"); - } - if (functionName == null) { - throw new IllegalArgumentException("Function Name is not provided"); - } - if (uploadedInputStream == null || fileDetail == null) { - throw new IllegalArgumentException("Function Package is not provided"); + if (uploadedInputStream == null || path == null) { + throw new IllegalArgumentException("Function Package is not provided " + path); } } catch (IllegalArgumentException e) { - log.error("Invalid upload function request @ /{}/{}/{}", - tenant, namespace, functionName, e); + log.error("Invalid upload function request @ /{}", path, e); return Response.status(Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } - String packageLocation = String.format( - "%s/%s/%s/%s", - tenant, - namespace, - functionName, - fileDetail.getFileName()); - // Upload to bookkeeper try { - log.info("Uploading function package to {}", packageLocation); + log.info("Uploading function package to {}", path); Utils.uploadToBookeeper( worker().getDlogNamespace(), uploadedInputStream, - packageLocation); + path); } catch (IOException e) { - log.error("Error uploading file {}", packageLocation, e); + log.error("Error uploading file {}", path, e); return Response.serverError() .type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())) @@ -585,23 +565,14 @@ public class FunctionsImpl { } @GET - @Path("/{tenant}/{namespace}/{functionName}/download") - public Response downloadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @QueryParam("filename") String fileName) { - String packageLocation = String.format( - "%s/%s/%s/%s", - tenant, - namespace, - functionName, - fileName); + @Path("/download") + public Response downloadFunction(final @QueryParam("path") String path) { return Response.status(Status.OK).entity( new StreamingOutput() { @Override public void write(final OutputStream output) throws IOException { Utils.downloadFromBookkeeper(worker().getDlogNamespace(), - output, packageLocation); + output, path); } }).build(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index a2e7c0a..9ed70a4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -142,24 +142,17 @@ public class FunctionApiV2Resource extends FunctionApiResource { } @POST - @Path("/{tenant}/{namespace}/{functionName}/upload") + @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA) - public Response uploadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail) { - return functions.uploadFunction( - tenant, namespace, functionName, uploadedInputStream, fileDetail); + public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("path") String path) { + return functions.uploadFunction(uploadedInputStream, path); } @GET - @Path("/{tenant}/{namespace}/{functionName}/download") - public Response downloadFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @QueryParam("filename") String fileName) { - return functions.downloadFunction(tenant, namespace, functionName, fileName); + @Path("/download") + public Response downloadFunction(final @QueryParam("path") String path) { + return functions.downloadFunction(path); } } -- To stop receiving notification emails like this one, please contact si...@apache.org.