This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1026edc  Handle builtin/functionpkgurl properly in externally managed 
schedulers (#2840)
1026edc is described below

commit 1026edcbe0eefefb61acb7a30731efdb6865b1a1
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Oct 25 06:46:52 2018 -0700

    Handle builtin/functionpkgurl properly in externally managed schedulers 
(#2840)
    
    * Handle builtin/functionpkgurl properly in externally managed schedulers
    
    * Correct filename
    
    * Handle giving out the right location for function pacakge
    
    * Reverted changes
    
    * Fixed unittest
---
 .../org/apache/pulsar/functions/utils/Utils.java   |  19 ++--
 .../pulsar/functions/worker/FunctionActioner.java  |  36 +++----
 .../org/apache/pulsar/functions/worker/Utils.java  |  12 +--
 .../functions/worker/rest/api/FunctionsImpl.java   | 109 ++++++++++++++-------
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  16 +--
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  |  65 ++++++------
 .../rest/api/v2/SourceApiV2ResourceTest.java       |  64 ++++++------
 7 files changed, 180 insertions(+), 141 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 92f3de6..3b9c8da 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -238,18 +238,23 @@ public class Utils {
     }
 
     public static ClassLoader extractClassLoader(String destPkgUrl) throws 
IOException, URISyntaxException {
+        File file = extractFileFromPkg(destPkgUrl);
+        try {
+            return loadJar(file);
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException(
+                    "Corrupt User PackageFile " + file + " with error " + 
e.getMessage());
+        }
+    }
+
+    public static File extractFileFromPkg(String destPkgUrl) throws 
IOException, URISyntaxException {
         if (destPkgUrl.startsWith(FILE)) {
             URL url = new URL(destPkgUrl);
             File file = new File(url.toURI());
             if (!file.exists()) {
                 throw new IOException(destPkgUrl + " does not exists locally");
             }
-            try {
-                return loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new IllegalArgumentException(
-                        "Corrupt User PackageFile " + file + " with error " + 
e.getMessage());
-            }
+            return file;
         } else if (destPkgUrl.startsWith("http")) {
             URL website = new URL(destPkgUrl);
             File tempFile = File.createTempFile("function", ".tmp");
@@ -260,7 +265,7 @@ public class Utils {
             if (tempFile.exists()) {
                 tempFile.delete();
             }
-            return loadJar(tempFile);
+            return tempFile;
         } else {
             throw new IllegalArgumentException("Unsupported url protocol "+ 
destPkgUrl +", supported url protocols: [file/http/https]");
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 52af689..5df48de 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -143,26 +143,26 @@ public class FunctionActioner implements AutoCloseable {
         String pkgLocation = 
functionMetaData.getPackageLocation().getPackagePath();
         boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
 
-        if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
-            URL url = new URL(pkgLocation);
-            File pkgFile = new File(url.toURI());
-            packageFile = pkgFile.getAbsolutePath();
-        } else if (isFunctionCodeBuiltin(functionDetails)) {
-            File pkgFile = getBuiltinArchive(functionDetails);
-            packageFile = pkgFile.getAbsolutePath();
-        } else if (runtimeFactory.externallyManaged()) {
+        if (runtimeFactory.externallyManaged()) {
             packageFile = pkgLocation;
         } else {
-            File pkgDir = new File(
-                    workerConfig.getDownloadDirectory(),
-                    getDownloadPackagePath(functionMetaData, instanceId));
-            pkgDir.mkdirs();
-
-            File pkgFile = new File(
-                    pkgDir,
-                    new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(),
 functionMetaData.getPackageLocation())).getName());
-            downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
-            packageFile = pkgFile.getAbsolutePath();
+            if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
+                URL url = new URL(pkgLocation);
+                File pkgFile = new File(url.toURI());
+                packageFile = pkgFile.getAbsolutePath();
+            } else if (isFunctionCodeBuiltin(functionDetails)) {
+                File pkgFile = getBuiltinArchive(functionDetails);
+                packageFile = pkgFile.getAbsolutePath();
+            } else {
+                File pkgDir = new File(workerConfig.getDownloadDirectory(),
+                        getDownloadPackagePath(functionMetaData, instanceId));
+                pkgDir.mkdirs();
+                File pkgFile = new File(
+                        pkgDir,
+                        new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(),
 functionMetaData.getPackageLocation())).getName());
+                downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
+                packageFile = pkgFile.getAbsolutePath();
+            }
         }
 
         InstanceConfig instanceConfig = new InstanceConfig();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index ebb68d5..d010660 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,12 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.ByteArrayOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
+import java.io.*;
 import java.net.URI;
 import java.net.URL;
 import java.nio.channels.Channels;
@@ -80,6 +75,11 @@ public final class Utils {
         return String.format("%s-%s", UUID.randomUUID().toString(), 
packageName);
     }
 
+    public static void uploadFileToBookkeeper(String packagePath, File 
sourceFile, Namespace dlogNamespace) throws IOException {
+        FileInputStream uploadedInputStream = new FileInputStream(sourceFile);
+        uploadToBookeeper(dlogNamespace, uploadedInputStream, packagePath);
+    }
+
     public static void uploadToBookeeper(Namespace dlogNamespace,
                                          InputStream uploadedInputStream,
                                          String destPkgPath)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 11a10ab..087238c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -194,23 +194,77 @@ public class FunctionsImpl {
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
+
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+        try {
+            packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
+                functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
+        } catch (Exception e) {
+            return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
+                    .build();
+        }
+
+        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        return updateRequest(functionMetaDataBuilder.build());
+    }
+
+    private PackageLocationMetaData.Builder 
getFunctionPackageLocation(FunctionDetails functionDetails,
+                                                                       String 
componentType, String functionPkgUrl,
+                                                                       final 
FormDataContentDisposition fileDetail,
+                                                                       File 
uploadedInputStreamAsFile) throws Exception {
+        String tenant = functionDetails.getTenant();
+        String namespace = functionDetails.getNamespace();
+        String componentName = functionDetails.getName();
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
         boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
-        if (isBuiltin) {
-            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+        if 
(worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+            // For externally managed schedulers, the pkgUrl/builtin stuff 
should be copied to bk
+            if (isBuiltin) {
+                File sinkOrSource;
+                if (componentType.equals(SOURCE)) {
+                    String archiveName = 
functionDetails.getSource().getBuiltin();
+                    sinkOrSource = 
worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
+                } else {
+                    String archiveName = 
functionDetails.getSink().getBuiltin();
+                    sinkOrSource = 
worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
+                }
+                
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName,
+                        sinkOrSource.getName()));
+                
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
+                log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
+                
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
sinkOrSource, worker().getDlogNamespace());
+            } else if (isPkgUrlProvided) {
+                File file = extractFileFromPkg(functionPkgUrl);
+                
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName,
+                        file.getName()));
+                
packageLocationMetaDataBuilder.setOriginalFileName(file.getName());
+                log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
+                
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
file, worker().getDlogNamespace());
+            } else {
+                
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName,
+                        fileDetail.getName()));
+                
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getName());
+                log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
+                
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
uploadedInputStreamAsFile, worker().getDlogNamespace());
+            }
         } else {
-            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
-                    : createPackagePath(tenant, namespace, componentName, 
fileDetail.getFileName()));
-            if (!isPkgUrlProvided) {
+            // For pulsar managed schedulers, the pkgUrl/builtin stuff should 
be copied to bk
+            if (isBuiltin) {
+                packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+            } else if (isPkgUrlProvided) {
+                packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
+            } else {
+                
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, 
namespace, componentName, fileDetail.getFileName()));
                 
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
+                log.info("Uploading {} package to {}", componentType, 
packageLocationMetaDataBuilder.getPackagePath());
+                
Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), 
uploadedInputStreamAsFile, worker().getDlogNamespace());
             }
         }
-
-        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
-                : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStreamAsFile);
+        return packageLocationMetaDataBuilder;
     }
 
+
     public Response updateFunction(final String tenant, final String 
namespace, final String componentName,
             final InputStream uploadedInputStream, final 
FormDataContentDisposition fileDetail,
             final String functionPkgUrl, final String functionDetailsJson, 
final String componentConfigJson,
@@ -273,22 +327,17 @@ public class FunctionsImpl {
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
-
-        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
-        if (isBuiltin) {
-            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
-        } else {
-            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
-                    : createPackagePath(tenant, namespace, componentName, 
fileDetail.getFileName()));
-            if (!isPkgUrlProvided) {
-                
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
-            }
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+        try {
+            packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
+                    functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
+        } catch (Exception e) {
+            return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
+                    .build();
         }
 
         
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
-                : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStreamAsFile);
+        return updateRequest(functionMetaDataBuilder.build());
     }
 
     public Response deregisterFunction(final String tenant, final String 
namespace, final String componentName,
@@ -633,22 +682,6 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(new 
Gson().toJson(retval.toArray())).build();
     }
 
-    private Response updateRequest(FunctionMetaData functionMetaData, File 
uploadedInputStreamAsFile) {
-        // Upload to bookkeeper
-        try {
-            log.info("Uploading function package to {}", 
functionMetaData.getPackageLocation());
-            FileInputStream uploadedInputStream = new 
FileInputStream(uploadedInputStreamAsFile);
-
-            Utils.uploadToBookeeper(worker().getDlogNamespace(), 
uploadedInputStream,
-                    functionMetaData.getPackageLocation().getPackagePath());
-        } catch (IOException e) {
-            log.error("Error uploading file {}", 
functionMetaData.getPackageLocation(), e);
-            return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
-                    .build();
-        }
-        return updateRequest(functionMetaData);
-    }
-
     private Response updateRequest(FunctionMetaData functionMetaData) {
 
         // Submit to FMT
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 6c5e299..cd55916 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -358,10 +358,10 @@ public class FunctionApiV2ResourceTest {
     public void testRegisterFunctionUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+            any(File.class),
+            any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(false);
 
@@ -644,10 +644,10 @@ public class FunctionApiV2ResourceTest {
     public void testUpdateFunctionUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+            any(File.class),
+            any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 52ad7f2..7bf1a46 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -52,6 +52,7 @@ import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -307,10 +308,10 @@ public class SinkApiV2ResourceTest {
     public void testRegisterSinkUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+            any(File.class),
+            any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(false);
 
@@ -323,10 +324,10 @@ public class SinkApiV2ResourceTest {
     public void testRegisterSinkSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(false);
 
@@ -344,10 +345,10 @@ public class SinkApiV2ResourceTest {
     public void testRegisterSinkFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(false);
 
@@ -366,10 +367,10 @@ public class SinkApiV2ResourceTest {
     public void testRegisterSinkInterrupted() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(false);
 
@@ -539,10 +540,10 @@ public class SinkApiV2ResourceTest {
     public void testUpdateSinkUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
@@ -555,10 +556,10 @@ public class SinkApiV2ResourceTest {
     public void testUpdateSinkSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
@@ -613,10 +614,10 @@ public class SinkApiV2ResourceTest {
     public void testUpdateSinkFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
@@ -635,10 +636,10 @@ public class SinkApiV2ResourceTest {
     public void testUpdateSinkInterrupted() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index f2ada5c..2002b3a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -307,10 +307,10 @@ public class SourceApiV2ResourceTest {
     public void testRegisterSourceUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(false);
 
@@ -323,10 +323,10 @@ public class SourceApiV2ResourceTest {
     public void testRegisterSourceSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(false);
 
@@ -344,10 +344,10 @@ public class SourceApiV2ResourceTest {
     public void testRegisterSourceFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(false);
 
@@ -366,10 +366,10 @@ public class SourceApiV2ResourceTest {
     public void testRegisterSourceInterrupted() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(false);
 
@@ -549,10 +549,10 @@ public class SourceApiV2ResourceTest {
     public void testUpdateSourceUploadFailure() throws Exception {
         mockStatic(Utils.class);
         doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
 
@@ -565,10 +565,10 @@ public class SourceApiV2ResourceTest {
     public void testUpdateSourceSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
 
@@ -624,10 +624,10 @@ public class SourceApiV2ResourceTest {
     public void testUpdateSourceFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
 
@@ -646,10 +646,10 @@ public class SourceApiV2ResourceTest {
     public void testUpdateSourceInterrupted() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
 

Reply via email to