This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1026edc Handle builtin/functionpkgurl properly in externally managed schedulers (#2840) 1026edc is described below commit 1026edcbe0eefefb61acb7a30731efdb6865b1a1 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Oct 25 06:46:52 2018 -0700 Handle builtin/functionpkgurl properly in externally managed schedulers (#2840) * Handle builtin/functionpkgurl properly in externally managed schedulers * Correct filename * Handle giving out the right location for function pacakge * Reverted changes * Fixed unittest --- .../org/apache/pulsar/functions/utils/Utils.java | 19 ++-- .../pulsar/functions/worker/FunctionActioner.java | 36 +++---- .../org/apache/pulsar/functions/worker/Utils.java | 12 +-- .../functions/worker/rest/api/FunctionsImpl.java | 109 ++++++++++++++------- .../rest/api/v2/FunctionApiV2ResourceTest.java | 16 +-- .../worker/rest/api/v2/SinkApiV2ResourceTest.java | 65 ++++++------ .../rest/api/v2/SourceApiV2ResourceTest.java | 64 ++++++------ 7 files changed, 180 insertions(+), 141 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 92f3de6..3b9c8da 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -238,18 +238,23 @@ public class Utils { } public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException { + File file = extractFileFromPkg(destPkgUrl); + try { + return loadJar(file); + } catch (MalformedURLException e) { + throw new IllegalArgumentException( + "Corrupt User PackageFile " + file + " with error " + e.getMessage()); + } + } + + public static File extractFileFromPkg(String destPkgUrl) throws IOException, URISyntaxException { if (destPkgUrl.startsWith(FILE)) { URL url = new URL(destPkgUrl); File file = new File(url.toURI()); if (!file.exists()) { throw new IOException(destPkgUrl + " does not exists locally"); } - try { - return loadJar(file); - } catch (MalformedURLException e) { - throw new IllegalArgumentException( - "Corrupt User PackageFile " + file + " with error " + e.getMessage()); - } + return file; } else if (destPkgUrl.startsWith("http")) { URL website = new URL(destPkgUrl); File tempFile = File.createTempFile("function", ".tmp"); @@ -260,7 +265,7 @@ public class Utils { if (tempFile.exists()) { tempFile.delete(); } - return loadJar(tempFile); + return tempFile; } else { throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 52af689..5df48de 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -143,26 +143,26 @@ public class FunctionActioner implements AutoCloseable { String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); - if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { - URL url = new URL(pkgLocation); - File pkgFile = new File(url.toURI()); - packageFile = pkgFile.getAbsolutePath(); - } else if (isFunctionCodeBuiltin(functionDetails)) { - File pkgFile = getBuiltinArchive(functionDetails); - packageFile = pkgFile.getAbsolutePath(); - } else if (runtimeFactory.externallyManaged()) { + if (runtimeFactory.externallyManaged()) { packageFile = pkgLocation; } else { - File pkgDir = new File( - workerConfig.getDownloadDirectory(), - getDownloadPackagePath(functionMetaData, instanceId)); - pkgDir.mkdirs(); - - File pkgFile = new File( - pkgDir, - new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName()); - downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); - packageFile = pkgFile.getAbsolutePath(); + if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { + URL url = new URL(pkgLocation); + File pkgFile = new File(url.toURI()); + packageFile = pkgFile.getAbsolutePath(); + } else if (isFunctionCodeBuiltin(functionDetails)) { + File pkgFile = getBuiltinArchive(functionDetails); + packageFile = pkgFile.getAbsolutePath(); + } else { + File pkgDir = new File(workerConfig.getDownloadDirectory(), + getDownloadPackagePath(functionMetaData, instanceId)); + pkgDir.mkdirs(); + File pkgFile = new File( + pkgDir, + new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName()); + downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); + packageFile = pkgFile.getAbsolutePath(); + } } InstanceConfig instanceConfig = new InstanceConfig(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index ebb68d5..d010660 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -18,12 +18,7 @@ */ package org.apache.pulsar.functions.worker; -import java.io.ByteArrayOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; +import java.io.*; import java.net.URI; import java.net.URL; import java.nio.channels.Channels; @@ -80,6 +75,11 @@ public final class Utils { return String.format("%s-%s", UUID.randomUUID().toString(), packageName); } + public static void uploadFileToBookkeeper(String packagePath, File sourceFile, Namespace dlogNamespace) throws IOException { + FileInputStream uploadedInputStream = new FileInputStream(sourceFile); + uploadToBookeeper(dlogNamespace, uploadedInputStream, packagePath); + } + public static void uploadToBookeeper(Namespace dlogNamespace, InputStream uploadedInputStream, String destPkgPath) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 11a10ab..087238c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -194,23 +194,77 @@ public class FunctionsImpl { FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); + + PackageLocationMetaData.Builder packageLocationMetaDataBuilder; + try { + packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, + functionPkgUrl, fileDetail, uploadedInputStreamAsFile); + } catch (Exception e) { + return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) + .build(); + } + + functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + return updateRequest(functionMetaDataBuilder.build()); + } + + private PackageLocationMetaData.Builder getFunctionPackageLocation(FunctionDetails functionDetails, + String componentType, String functionPkgUrl, + final FormDataContentDisposition fileDetail, + File uploadedInputStreamAsFile) throws Exception { + String tenant = functionDetails.getTenant(); + String namespace = functionDetails.getNamespace(); + String componentName = functionDetails.getName(); PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); - if (isBuiltin) { - packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); + if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { + // For externally managed schedulers, the pkgUrl/builtin stuff should be copied to bk + if (isBuiltin) { + File sinkOrSource; + if (componentType.equals(SOURCE)) { + String archiveName = functionDetails.getSource().getBuiltin(); + sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile(); + } else { + String archiveName = functionDetails.getSink().getBuiltin(); + sinkOrSource = worker().getConnectorsManager().getSinkArchive(archiveName).toFile(); + } + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + sinkOrSource.getName())); + packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName()); + log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); + Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace()); + } else if (isPkgUrlProvided) { + File file = extractFileFromPkg(functionPkgUrl); + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + file.getName())); + packageLocationMetaDataBuilder.setOriginalFileName(file.getName()); + log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); + Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace()); + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, + fileDetail.getName())); + packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName()); + log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); + Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace()); + } } else { - packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); - if (!isPkgUrlProvided) { + // For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk + if (isBuiltin) { + packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + } else if (isPkgUrlProvided) { + packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl); + } else { + packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); + log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath()); + Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace()); } } - - functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); - return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) - : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); + return packageLocationMetaDataBuilder; } + public Response updateFunction(final String tenant, final String namespace, final String componentName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson, @@ -273,22 +327,17 @@ public class FunctionsImpl { FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); - PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); - - boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); - if (isBuiltin) { - packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); - } else { - packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName())); - if (!isPkgUrlProvided) { - packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); - } + PackageLocationMetaData.Builder packageLocationMetaDataBuilder; + try { + packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, + functionPkgUrl, fileDetail, uploadedInputStreamAsFile); + } catch (Exception e) { + return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) + .build(); } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); - return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) - : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile); + return updateRequest(functionMetaDataBuilder.build()); } public Response deregisterFunction(final String tenant, final String namespace, final String componentName, @@ -633,22 +682,6 @@ public class FunctionsImpl { return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build(); } - private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) { - // Upload to bookkeeper - try { - log.info("Uploading function package to {}", functionMetaData.getPackageLocation()); - FileInputStream uploadedInputStream = new FileInputStream(uploadedInputStreamAsFile); - - Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, - functionMetaData.getPackageLocation().getPackagePath()); - } catch (IOException e) { - log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e); - return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) - .build(); - } - return updateRequest(functionMetaData); - } - private Response updateRequest(FunctionMetaData functionMetaData) { // Submit to FMT diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 6c5e299..cd55916 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -358,10 +358,10 @@ public class FunctionApiV2ResourceTest { public void testRegisterFunctionUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); @@ -644,10 +644,10 @@ public class FunctionApiV2ResourceTest { public void testUpdateFunctionUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 52ad7f2..7bf1a46 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -52,6 +52,7 @@ import org.testng.annotations.Test; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; @@ -307,10 +308,10 @@ public class SinkApiV2ResourceTest { public void testRegisterSinkUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false); @@ -323,10 +324,10 @@ public class SinkApiV2ResourceTest { public void testRegisterSinkSuccess() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false); @@ -344,10 +345,10 @@ public class SinkApiV2ResourceTest { public void testRegisterSinkFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false); @@ -366,10 +367,10 @@ public class SinkApiV2ResourceTest { public void testRegisterSinkInterrupted() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false); @@ -539,10 +540,10 @@ public class SinkApiV2ResourceTest { public void testUpdateSinkUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); @@ -555,10 +556,10 @@ public class SinkApiV2ResourceTest { public void testUpdateSinkSuccess() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); @@ -613,10 +614,10 @@ public class SinkApiV2ResourceTest { public void testUpdateSinkFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); @@ -635,10 +636,10 @@ public class SinkApiV2ResourceTest { public void testUpdateSinkInterrupted() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index f2ada5c..2002b3a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -307,10 +307,10 @@ public class SourceApiV2ResourceTest { public void testRegisterSourceUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); @@ -323,10 +323,10 @@ public class SourceApiV2ResourceTest { public void testRegisterSourceSuccess() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); @@ -344,10 +344,10 @@ public class SourceApiV2ResourceTest { public void testRegisterSourceFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); @@ -366,10 +366,10 @@ public class SourceApiV2ResourceTest { public void testRegisterSourceInterrupted() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); @@ -549,10 +549,10 @@ public class SourceApiV2ResourceTest { public void testUpdateSourceUploadFailure() throws Exception { mockStatic(Utils.class); doThrow(new IOException("upload failure")).when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); @@ -565,10 +565,10 @@ public class SourceApiV2ResourceTest { public void testUpdateSourceSuccess() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); @@ -624,10 +624,10 @@ public class SourceApiV2ResourceTest { public void testUpdateSourceFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); @@ -646,10 +646,10 @@ public class SourceApiV2ResourceTest { public void testUpdateSourceInterrupted() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); - Utils.uploadToBookeeper( - any(Namespace.class), - any(InputStream.class), - anyString()); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);