This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea23cdf  Don't create new instances of user classes during validation 
(#4281)
ea23cdf is described below

commit ea23cdff87bc18b3486a970e4d047210aabfe844
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu May 16 22:55:31 2019 -0700

    Don't create new instances of user classes during validation (#4281)
    
    * Don't create new instances of user classes during validation
    
    * fixing tests
    
    * uncomment functions
---
 .../pulsar/functions/utils/FunctionCommon.java     | 61 +++++++---------------
 .../functions/utils/FunctionConfigUtils.java       | 17 +++++-
 .../pulsar/functions/utils/SinkConfigUtils.java    | 14 ++++-
 .../pulsar/functions/utils/SourceConfigUtils.java  | 14 ++++-
 .../pulsar/functions/utils/ValidatorUtils.java     | 14 +++--
 .../pulsar/functions/utils/io/ConnectorUtils.java  |  8 +--
 .../pulsar/functions/worker/FunctionActioner.java  |  6 +--
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  2 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  6 +--
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 26 +++++++--
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 26 ++++++++-
 11 files changed, 127 insertions(+), 67 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 2ca9134..571a614 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -86,46 +86,33 @@ public class FunctionCommon {
         }
     }
 
-    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, 
ClassLoader classLoader) {
-        Object userClass = createInstance(functionConfig.getClassName(), 
classLoader);
+    public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, 
ClassLoader classLoader) throws ClassNotFoundException {
         boolean isWindowConfigPresent = functionConfig.getWindowConfig() != 
null;
-        return getFunctionTypes(userClass, isWindowConfigPresent);
+        Class functionClass = 
classLoader.loadClass(functionConfig.getClassName());
+        return getFunctionTypes(functionClass, isWindowConfigPresent);
     }
     
-    public static Class<?>[] getFunctionTypes(Object userClass, boolean 
isWindowConfigPresent) {
-
+    public static Class<?>[] getFunctionTypes(Class userClass, boolean 
isWindowConfigPresent) {
         Class<?>[] typeArgs;
         // if window function
         if (isWindowConfigPresent) {
-            if (userClass instanceof WindowFunction) {
-                WindowFunction function = (WindowFunction) userClass;
-                if (function == null) {
-                    throw new IllegalArgumentException(
-                            String.format("The WindowFunction class %s could 
not be instantiated", userClass));
-                }
-                typeArgs = 
TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass());
+            if (WindowFunction.class.isAssignableFrom(userClass)) {
+                typeArgs = 
TypeResolver.resolveRawArguments(WindowFunction.class, userClass);
             } else {
-                java.util.function.Function function = 
(java.util.function.Function) userClass;
-                if (function == null) {
-                    throw new IllegalArgumentException(
-                            String.format("The Java util function class %s 
could not be instantiated", userClass));
-                }
-                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
+                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
                 if (!typeArgs[0].equals(Collection.class)) {
                     throw new IllegalArgumentException("Window function must 
take a collection as input");
                 }
-                Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, 
function.getClass());
+                Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
                 Type collectionType = ((ParameterizedType) 
type).getActualTypeArguments()[0];
                 Type actualInputType = ((ParameterizedType) 
collectionType).getActualTypeArguments()[0];
                 typeArgs[0] = (Class<?>) actualInputType;
             }
         } else {
-            if (userClass instanceof Function) {
-                Function pulsarFunction = (Function) userClass;
-                typeArgs = TypeResolver.resolveRawArguments(Function.class, 
pulsarFunction.getClass());
+            if (Function.class.isAssignableFrom(userClass)) {
+                typeArgs = TypeResolver.resolveRawArguments(Function.class, 
userClass);
             } else {
-                java.util.function.Function function = 
(java.util.function.Function) userClass;
-                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
+                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
             }
         }
 
@@ -200,30 +187,20 @@ public class FunctionCommon {
     }
 
 
-    public static Class<?> getSourceType(String className, ClassLoader 
classloader) {
+    public static Class<?> getSourceType(String className, ClassLoader 
classLoader) throws ClassNotFoundException {
 
-        Object userClass = Reflections.createInstance(className, classloader);
-        Class<?> typeArg;
-        Source source = (Source) userClass;
-        if (source == null) {
-            throw new IllegalArgumentException(String.format("The Pulsar 
source class %s could not be instantiated",
-                    className));
-        }
-        typeArg = TypeResolver.resolveRawArgument(Source.class, 
source.getClass());
+        Class userClass = classLoader.loadClass(className);
+
+        Class<?> typeArg = TypeResolver.resolveRawArgument(Source.class, 
userClass);
 
         return typeArg;
     }
 
-    public static Class<?> getSinkType(String className, ClassLoader 
classLoader) {
+    public static Class<?> getSinkType(String className, ClassLoader 
classLoader) throws ClassNotFoundException {
 
-        Object userClass = Reflections.createInstance(className, classLoader);
-        Class<?> typeArg;
-        Sink sink = (Sink) userClass;
-        if (sink == null) {
-            throw new IllegalArgumentException(String.format("The Pulsar sink 
class %s could not be instantiated",
-                    className));
-        }
-        typeArg = TypeResolver.resolveRawArgument(Sink.class, sink.getClass());
+        Class userClass = classLoader.loadClass(className);
+
+        Class<?> typeArg = TypeResolver.resolveRawArgument(Sink.class, 
userClass);
 
         return typeArg;
     }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index e207e1e..b96c7c4 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -50,7 +50,12 @@ public class FunctionConfigUtils {
         Class<?>[] typeArgs = null;
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
             if (classLoader != null) {
-                typeArgs = FunctionCommon.getFunctionTypes(functionConfig, 
classLoader);
+                try {
+                    typeArgs = FunctionCommon.getFunctionTypes(functionConfig, 
classLoader);
+                } catch (ClassNotFoundException e) {
+                    throw new IllegalArgumentException(
+                            String.format("Function class %s must be in class 
path", functionConfig.getClassName()), e);
+                }
             }
         }
 
@@ -363,7 +368,14 @@ public class FunctionConfigUtils {
     }
 
     private static void doJavaChecks(FunctionConfig functionConfig, 
ClassLoader clsLoader) {
-        Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, 
clsLoader);
+
+        Class<?>[] typeArgs;
+        try {
+            typeArgs = FunctionCommon.getFunctionTypes(functionConfig, 
clsLoader);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+                    String.format("Function class %s must be in class path", 
functionConfig.getClassName()), e);
+        }
         // inputs use default schema, so there is no check needed there
 
         // Check if the Input serialization/deserialization class exists in 
jar or already loaded and that it
@@ -614,6 +626,7 @@ public class FunctionConfigUtils {
             } else {
                 throw new IllegalArgumentException("Function Package is not 
provided");
             }
+
             doJavaChecks(functionConfig, classLoader);
             return classLoader;
         } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 40e96b2..e59bf46 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -345,7 +345,12 @@ public class SinkConfigUtils {
                 tmptypeArg = getSinkType(sinkClassName, narClassLoader);
                 tmpclassLoader = narClassLoader;
             } catch (Exception e) {
-                tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+                try {
+                    tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+                } catch (ClassNotFoundException e1) {
+                    throw new IllegalArgumentException(
+                            String.format("Sink class %s must be in class 
path", sinkClassName), e1);
+                }
                 tmpclassLoader = jarClassLoader;
             }
             typeArg = tmptypeArg;
@@ -362,7 +367,12 @@ public class SinkConfigUtils {
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract sink 
class from archive", e1);
             }
-            typeArg = getSinkType(sinkClassName, classLoader);
+            try {
+                typeArg = getSinkType(sinkClassName, classLoader);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        String.format("Sink class %s must be in class path", 
sinkClassName), e);
+            }
         }
 
         if (sinkConfig.getTopicToSerdeClassName() != null) {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 90d63df..e02bb8b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -250,7 +250,12 @@ public class SourceConfigUtils {
                 tmptypeArg = getSourceType(sourceClassName, narClassLoader);
                 tmpclassLoader = narClassLoader;
             } catch (Exception e) {
-                tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
+                try {
+                    tmptypeArg = getSourceType(sourceClassName, 
jarClassLoader);
+                } catch (ClassNotFoundException e1) {
+                    throw new IllegalArgumentException(
+                            String.format("Source class %s must be in class 
path", sourceClassName), e1);
+                }
                 tmpclassLoader = jarClassLoader;
             }
             typeArg = tmptypeArg;
@@ -267,7 +272,12 @@ public class SourceConfigUtils {
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract source 
class from archive", e1);
             }
-            typeArg = getSourceType(sourceClassName, classLoader);
+            try {
+                typeArg = getSourceType(sourceClassName, classLoader);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        String.format("Source class %s must be in class path", 
sourceClassName), e);
+            }
         }
 
         // Only one of serdeClassName or schemaType should be set
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 3a2209a..4cb102f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -144,11 +144,17 @@ public class ValidatorUtils {
         }
 
         // validate function class-type
-        Object functionObject = 
createInstance(functionDetailsBuilder.getClassName(), classLoader);
-        Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionObject, 
false);
+        Class functionClass;
+        try {
+            functionClass = 
classLoader.loadClass(functionDetailsBuilder.getClassName());
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+                    String.format("Function class %s must be in class path", 
functionDetailsBuilder.getClassName()), e);
+        }
+        Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionClass, 
false);
 
-        if (!(functionObject instanceof 
org.apache.pulsar.functions.api.Function)
-                && !(functionObject instanceof java.util.function.Function)) {
+        if 
(!(org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass))
+                && 
!(java.util.function.Function.class.isAssignableFrom(functionClass))) {
             throw new RuntimeException("User class must either be Function or 
java.util.Function");
         }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 8780671..2b78bf0 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -58,8 +58,8 @@ public class ConnectorUtils {
 
         try {
             // Try to load source class and check it implements Source 
interface
-            Object instance = 
ncl.loadClass(conf.getSourceClass()).newInstance();
-            if (!(instance instanceof Source)) {
+            Class sourceClass = ncl.loadClass(conf.getSourceClass());
+            if (!(Source.class.isAssignableFrom(sourceClass))) {
                 throw new IOException("Class " + conf.getSourceClass() + " 
does not implement interface "
                         + Source.class.getName());
             }
@@ -86,8 +86,8 @@ public class ConnectorUtils {
 
         try {
             // Try to load source class and check it implements Sink interface
-            Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
-            if (!(instance instanceof Sink)) {
+            Class sinkClass = ncl.loadClass(conf.getSinkClass());
+            if (!(Sink.class.isAssignableFrom(sinkClass))) {
                 throw new IOException(
                         "Class " + conf.getSinkClass() + " does not implement 
interface " + Sink.class.getName());
             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 1fea200..597cff7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -390,7 +390,7 @@ public class FunctionActioner {
                 File.separatorChar);
     }
 
-    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) 
throws IOException {
+    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) 
throws IOException, ClassNotFoundException {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
             if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
@@ -423,7 +423,7 @@ public class FunctionActioner {
     }
 
     private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, 
File archive, String className)
-            throws IOException {
+            throws IOException, ClassNotFoundException {
         try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, 
Collections.emptySet())) {
             String typeArg = getSourceType(className, ncl).getName();
 
@@ -441,7 +441,7 @@ public class FunctionActioner {
     }
 
     private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, 
File archive, String className)
-            throws IOException {
+            throws IOException, ClassNotFoundException {
         try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, 
Collections.emptySet())) {
             String typeArg = getSinkType(className, ncl).getName();
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 17b0234..053b19d 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -339,7 +339,7 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "User class must be in class path")
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Function class UnknownClass must be in class 
path")
     public void testRegisterFunctionWrongClassName() {
         try {
             testRegisterFunctionMissingArguments(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 696bd19..1eff4e8 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -46,9 +46,9 @@ import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -92,7 +92,7 @@ import static org.testng.Assert.assertEquals;
  * Unit test of {@link FunctionApiV2Resource}.
  */
 @PrepareForTest({WorkerUtils.class, InstanceUtils.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
 @Slf4j
 public class FunctionApiV3ResourceTest {
 
@@ -332,7 +332,7 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "User class must be in class path")
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Function class UnknownClass must be in class 
path")
     public void testRegisterFunctionWrongClassName() {
         try {
             testRegisterFunctionMissingArguments(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 59ee08b..2988e3a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -225,7 +225,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Sink Name is not provided")
-    public void testRegisterSinkMissingFunctionName() {
+    public void testRegisterSinkMissingSinkName() {
         try {
             testRegisterSinkMissingArguments(
                 tenant,
@@ -263,6 +263,26 @@ public class SinkApiV3ResourceTest {
         }
     }
 
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Sink class UnknownClass must be in class 
path")
+    public void testRegisterSinkWrongClassName() {
+            try {
+                testRegisterSinkMissingArguments(
+                        tenant,
+                        namespace,
+                        sink,
+                        mockedInputStream,
+                        mockedFormData,
+                        topicsToSerDeClassName,
+                        "UnknownClass",
+                        parallelism,
+                        null
+                );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), 
Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "zip file is empty")
     public void testRegisterSinkMissingPackageDetails() {
         try {
@@ -489,7 +509,7 @@ public class SinkApiV3ResourceTest {
 
         RequestResult rr = new RequestResult()
             .setSuccess(true)
-            .setMessage("source registered");
+            .setMessage("sink registered");
         CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
         
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
@@ -914,7 +934,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test
-    public void testUpdateSinkWithUrl() throws IOException {
+    public void testUpdateSinkWithUrl() throws IOException, 
ClassNotFoundException {
         Configurator.setRootLevel(Level.DEBUG);
 
         String filePackageUrl = "file://" + JAR_FILE_PATH;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index c56066b..0381af7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -242,6 +242,27 @@ public class SourceApiV3ResourceTest {
         }
     }
 
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Source class UnknownClass must be in class 
path")
+    public void testRegisterSourceWrongClassName() {
+        try {
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    mockedInputStream,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    "UnknownClass",
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), 
Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Source Package is not provided")
     public void testRegisterSourceMissingPackage() {
         try {
@@ -443,6 +464,7 @@ public class SourceApiV3ResourceTest {
         }
     }
 
+    @Test
     public void testRegisterSourceSuccess() throws Exception {
         mockStatic(WorkerUtils.class);
         doNothing().when(WorkerUtils.class);
@@ -451,6 +473,8 @@ public class SourceApiV3ResourceTest {
                 any(File.class),
                 any(Namespace.class));
 
+        PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", 
any()).thenCallRealMethod();
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(false);
 
         RequestResult rr = new RequestResult()
@@ -928,7 +952,7 @@ public class SourceApiV3ResourceTest {
     }
 
     @Test
-    public void testUpdateSourceWithUrl() throws IOException {
+    public void testUpdateSourceWithUrl() throws IOException, 
ClassNotFoundException {
         Configurator.setRootLevel(Level.DEBUG);
 
         String filePackageUrl = "file://" + JAR_FILE_PATH;

Reply via email to