This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e8025d5 fix issue when submitting NAR via file url (#4577) e8025d5 is described below commit e8025d50c5d2cf0a632ad1573308b676d0607923 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Jul 3 14:31:23 2019 -0700 fix issue when submitting NAR via file url (#4577) * fix issue when submitting NAR via file url * fix unit tests * add more specific errors * fix test --- .../worker/PulsarFunctionLocalRunTest.java | 21 ++- .../org/apache/pulsar/functions/LocalRunner.java | 1 - .../pulsar/functions/utils/SinkConfigUtils.java | 145 +++++++++++++-------- .../pulsar/functions/utils/SourceConfigUtils.java | 122 ++++++++++------- .../functions/worker/rest/api/FunctionsImpl.java | 1 - .../functions/worker/rest/api/SinksImpl.java | 1 - .../functions/worker/rest/api/SourcesImpl.java | 1 - .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 17 ++- .../rest/api/v3/SourceApiV3ResourceTest.java | 5 +- 9 files changed, 192 insertions(+), 122 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index bdcd27d..db7dbf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -599,12 +599,11 @@ public class PulsarFunctionLocalRunTest { testPulsarSourceLocalRun(null); } - // TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar -// @Test(timeOut = 20000) -// public void testPulsarSourceLocalRunWithFile() throws Exception { -// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); -// testPulsarSourceStats(jarFilePathUrl); -// } + @Test(timeOut = 20000) + public void testPulsarSourceLocalRunWithFile() throws Exception { + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + testPulsarSourceLocalRun(jarFilePathUrl); + } @Test(timeOut = 40000) public void testPulsarSourceLocalRunWithUrl() throws Exception { @@ -705,11 +704,11 @@ public class PulsarFunctionLocalRunTest { testPulsarSinkStats(null); } -// @Test(timeOut = 20000) -// public void testPulsarSinkStatsWithFile() throws Exception { -// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); -// testPulsarSinkStats(jarFilePathUrl); -// } + @Test(timeOut = 20000) + public void testPulsarSinkStatsWithFile() throws Exception { + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + testPulsarSinkStats(jarFilePathUrl); + } @Test(timeOut = 40000) public void testPulsarSinkStatsWithUrl() throws Exception { diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 1540744..f4c6eca 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -248,7 +248,6 @@ public class LocalRunner { .loadClass(LocalRunner.class.getName()) .getProtectionDomain().getCodeSource().getLocation().getFile(); } - log.info("userCodeFile: {}", userCodeFile); String builtInSource = isBuiltInSource(userCodeFile); if (builtInSource != null) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 42eedc3..f0a39b9 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -42,7 +42,11 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Type; import java.nio.file.Path; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -283,7 +287,7 @@ public class SinkConfigUtils { } public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath, - File uploadedInputStreamAsFile) { + File sinkPackageFile) { if (isEmpty(sinkConfig.getTenant())) { throw new IllegalArgumentException("Sink tenant cannot be null"); } @@ -318,79 +322,112 @@ public class SinkConfigUtils { throw new IllegalArgumentException("Sink timeout must be a positive number"); } - String sinkClassName; - final Class<?> typeArg; - final ClassLoader classLoader; - if (!isEmpty(sinkConfig.getClassName())) { - sinkClassName = sinkConfig.getClassName(); - // We really don't know if we should use nar class loader or regular classloader - ClassLoader jarClassLoader = null; - ClassLoader narClassLoader = null; - try { - jarClassLoader = FunctionCommon.extractClassLoader(archivePath, uploadedInputStreamAsFile); - } catch (Exception e) { + if (archivePath == null && sinkPackageFile == null) { + throw new IllegalArgumentException("Sink package is not provided"); + } + + Class<?> typeArg; + ClassLoader classLoader; + String sinkClassName = sinkConfig.getClassName(); + ClassLoader jarClassLoader = null; + ClassLoader narClassLoader = null; + + Exception jarClassLoaderException = null; + Exception narClassLoaderException = null; + + try { + jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile); + } catch (Exception e) { + jarClassLoaderException = e; + } + try { + narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile); + } catch (Exception e) { + narClassLoaderException = e; + } + + // if sink class name is not provided, we can only try to load archive as a NAR + if (isEmpty(sinkClassName)) { + if (narClassLoader == null) { + throw new IllegalArgumentException("Sink package does not have the correct format. " + + "Pulsar cannot determine if the package is a NAR package or JAR package." + + "Sink classname is not provided and attempts to load it as a NAR package produced error: " + + narClassLoaderException.getMessage()); } try { - narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile); - } catch (Exception e) { - } - if (jarClassLoader == null && narClassLoader == null) { - throw new IllegalArgumentException("Invalid Sink Package"); + sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to extract Sink class from archive", e); } - // We use typeArg and classLoader as arguments for lambda functions that require them to be final - // Thus we use these tmp vars - Class<?> tmptypeArg; - ClassLoader tmpclassLoader; try { - tmptypeArg = getSinkType(sinkClassName, narClassLoader); - tmpclassLoader = narClassLoader; - } catch (Exception e) { + typeArg = getSinkType(sinkClassName, narClassLoader); + classLoader = narClassLoader; + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("Sink class %s must be in class path", sinkClassName), e); + } + + } else { + // if sink class name is provided, we need to try to load it as a JAR and as a NAR. + if (jarClassLoader != null) { + try { + typeArg = getSinkType(sinkClassName, jarClassLoader); + classLoader = jarClassLoader; + } catch (ClassNotFoundException e) { + // class not found in JAR try loading as a NAR and searching for the class + if (narClassLoader != null) { + try { + typeArg = getSinkType(sinkClassName, narClassLoader); + classLoader = narClassLoader; + } catch (ClassNotFoundException e1) { + throw new IllegalArgumentException( + String.format("Sink class %s must be in class path", sinkClassName), e1); + } + } else { + throw new IllegalArgumentException( + String.format("Sink class %s must be in class path", sinkClassName), e); + } + } + } else if (narClassLoader != null) { try { - tmptypeArg = getSinkType(sinkClassName, jarClassLoader); + typeArg = getSinkType(sinkClassName, narClassLoader); + classLoader = narClassLoader; } catch (ClassNotFoundException e1) { throw new IllegalArgumentException( String.format("Sink class %s must be in class path", sinkClassName), e1); } - tmpclassLoader = jarClassLoader; - } - typeArg = tmptypeArg; - classLoader = tmpclassLoader; - } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { - throw new IllegalArgumentException("Class-name must be present for archive with file-url"); - } else { - classLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile); - if (classLoader == null) { - throw new IllegalArgumentException("Sink Package is not provided"); - } - try { - sinkClassName = ConnectorUtils.getIOSinkClass(classLoader); - } catch (IOException e1) { - throw new IllegalArgumentException("Failed to extract sink class from archive", e1); - } - try { - typeArg = getSinkType(sinkClassName, classLoader); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException( - String.format("Sink class %s must be in class path", sinkClassName), e); + } else { + StringBuilder errorMsg = new StringBuilder("Sink package does not have the correct format." + + " Pulsar cannot determine if the package is a NAR package or JAR package."); + + if (jarClassLoaderException != null) { + errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage()); + } + + if (narClassLoaderException != null) { + errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage()); + } + + throw new IllegalArgumentException(errorMsg.toString()); } } if (sinkConfig.getTopicToSerdeClassName() != null) { - sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { - ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true); - }); + for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) { + ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true); + } } if (sinkConfig.getTopicToSchemaType() != null) { - sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { + for (String schemaType : sinkConfig.getTopicToSchemaType().values()) { ValidatorUtils.validateSchema(schemaType, typeArg, classLoader, true); - }); + } } // topicsPattern does not need checks if (sinkConfig.getInputSpecs() != null) { - sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> { + for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) { // Only one is set if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) { throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); @@ -401,7 +438,7 @@ public class SinkConfigUtils { if (!isEmpty(consumerSpec.getSchemaType())) { ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, classLoader, true); } - }); + } } return new ExtractedSinkDetails(sinkClassName, typeArg.getName()); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index edbb6d7..149204c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -222,61 +222,93 @@ public class SourceConfigUtils { if (sourceConfig.getResources() != null) { ResourceConfigUtils.validate(sourceConfig.getResources()); } + if (archivePath == null && sourcePackageFile == null) { + throw new IllegalArgumentException("Source package is not provided"); + } - String sourceClassName; - final Class<?> typeArg; - final ClassLoader classLoader; - if (!isEmpty(sourceConfig.getClassName())) { - sourceClassName = sourceConfig.getClassName(); - // We really don't know if we should use nar class loader or regular classloader - ClassLoader jarClassLoader = null; - ClassLoader narClassLoader = null; - try { - jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile); - } catch (Exception e) { + Class<?> typeArg; + ClassLoader classLoader; + String sourceClassName = sourceConfig.getClassName(); + ClassLoader jarClassLoader = null; + ClassLoader narClassLoader = null; + + Exception jarClassLoaderException = null; + Exception narClassLoaderException = null; + + try { + jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile); + } catch (Exception e) { + jarClassLoaderException = e; + } + try { + narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile); + } catch (Exception e) { + narClassLoaderException = e; + } + + // if source class name is not provided, we can only try to load archive as a NAR + if (isEmpty(sourceClassName)) { + if (narClassLoader == null) { + throw new IllegalArgumentException("Source package does not have the correct format. " + + "Pulsar cannot determine if the package is a NAR package or JAR package." + + "Source classname is not provided and attempts to load it as a NAR package produced the following error.", + narClassLoaderException); } try { - narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile); - } catch (Exception e) { - } - if (jarClassLoader == null && narClassLoader == null) { - throw new IllegalArgumentException("Invalid Source Package"); + sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to extract source class from archive", e); } - // We use typeArg and classLoader as arguments for lambda functions that require them to be final - // Thus we use these tmp vars - Class<?> tmptypeArg; - ClassLoader tmpclassLoader; try { - tmptypeArg = getSourceType(sourceClassName, narClassLoader); - tmpclassLoader = narClassLoader; - } catch (Exception e) { + typeArg = getSourceType(sourceClassName, narClassLoader); + classLoader = narClassLoader; + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("Source class %s must be in class path", sourceClassName), e); + } + + } else { + // if source class name is provided, we need to try to load it as a JAR and as a NAR. + if (jarClassLoader != null) { try { - tmptypeArg = getSourceType(sourceClassName, jarClassLoader); + typeArg = getSourceType(sourceClassName, jarClassLoader); + classLoader = jarClassLoader; + } catch (ClassNotFoundException e) { + // class not found in JAR try loading as a NAR and searching for the class + if (narClassLoader != null) { + try { + typeArg = getSourceType(sourceClassName, narClassLoader); + classLoader = narClassLoader; + } catch (ClassNotFoundException e1) { + throw new IllegalArgumentException( + String.format("Source class %s must be in class path", sourceClassName), e1); + } + } else { + throw new IllegalArgumentException( + String.format("Source class %s must be in class path", sourceClassName), e); + } + } + } else if (narClassLoader != null) { + try { + typeArg = getSourceType(sourceClassName, narClassLoader); + classLoader = narClassLoader; } catch (ClassNotFoundException e1) { throw new IllegalArgumentException( String.format("Source class %s must be in class path", sourceClassName), e1); } - tmpclassLoader = jarClassLoader; - } - typeArg = tmptypeArg; - classLoader = tmpclassLoader; - } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { - throw new IllegalArgumentException("Class-name must be present for archive with file-url"); - } else { - classLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile); - if (classLoader == null) { - throw new IllegalArgumentException("Source Package is not provided"); - } - try { - sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader); - } catch (IOException e1) { - throw new IllegalArgumentException("Failed to extract source class from archive", e1); - } - try { - typeArg = getSourceType(sourceClassName, classLoader); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException( - String.format("Source class %s must be in class path", sourceClassName), e); + } else { + StringBuilder errorMsg = new StringBuilder("Source package does not have the correct format." + + " Pulsar cannot determine if the package is a NAR package or JAR package."); + + if (jarClassLoaderException != null) { + errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage()); + } + + if (narClassLoaderException != null) { + errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage()); + } + + throw new IllegalArgumentException(errorMsg.toString()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index af23496..d96ed30 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -339,7 +339,6 @@ public class FunctionsImpl extends ComponentImpl { componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - log.info("componentPackageFile: {}", componentPackageFile); WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 9052266..659cb66 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -344,7 +344,6 @@ public class SinksImpl extends ComponentImpl { componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - log.info("componentPackageFile: {}", componentPackageFile); WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName, diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index e14f167..e9cfeb0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -341,7 +341,6 @@ public class SourcesImpl extends ComponentImpl { componentPackageFile = FunctionCommon.createPkgTempFile(); componentPackageFile.deleteOnExit(); - log.info("componentPackageFile: {}", componentPackageFile); WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 793e9cd..41b3204 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.worker.rest.api.v3; import com.google.common.collect.Lists; -import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; @@ -243,7 +242,7 @@ public class SinkApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Package is not provided") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package is not provided") public void testRegisterSinkMissingPackage() { try { testRegisterSinkMissingArguments( @@ -283,7 +282,10 @@ public class SinkApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package does not have the" + + " correct format. Pulsar cannot determine if the package is a NAR package" + + " or JAR package.Sink classname is not provided and attempts to load it as a NAR package produced error: " + + "zip file is empty") public void testRegisterSinkMissingPackageDetails() { try { testRegisterSinkMissingArguments( @@ -303,7 +305,7 @@ public class SinkApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract Sink class from archive") public void testRegisterSinkInvalidJarNoSink() throws IOException { try { FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH); @@ -948,6 +950,7 @@ public class SinkApiV3ResourceTest { anyString(), any(File.class), any(Namespace.class)); + PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod(); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); @@ -961,7 +964,7 @@ public class SinkApiV3ResourceTest { } @Test - public void testUpdateSinkWithUrl() throws IOException, ClassNotFoundException { + public void testUpdateSinkWithUrl() throws Exception { Configurator.setRootLevel(Level.DEBUG); String filePackageUrl = "file://" + JAR_FILE_PATH; @@ -982,6 +985,7 @@ public class SinkApiV3ResourceTest { mockStatic(FunctionCommon.class); doReturn(String.class).when(FunctionCommon.class); FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class)); + PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod(); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class)); @@ -989,7 +993,6 @@ public class SinkApiV3ResourceTest { doReturn(ATLEAST_ONCE).when(FunctionCommon.class); FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); @@ -1019,6 +1022,7 @@ public class SinkApiV3ResourceTest { anyString(), any(File.class), any(Namespace.class)); + PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod(); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); @@ -1044,6 +1048,7 @@ public class SinkApiV3ResourceTest { anyString(), any(File.class), any(Namespace.class)); + PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod(); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 17b4595..276f3c0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -263,7 +263,7 @@ public class SourceApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source package is not provided") public void testRegisterSourceMissingPackage() { try { testRegisterSourceMissingArguments( @@ -979,7 +979,7 @@ public class SourceApiV3ResourceTest { } @Test - public void testUpdateSourceWithUrl() throws IOException, ClassNotFoundException { + public void testUpdateSourceWithUrl() throws Exception { Configurator.setRootLevel(Level.DEBUG); String filePackageUrl = "file://" + JAR_FILE_PATH; @@ -1000,6 +1000,7 @@ public class SourceApiV3ResourceTest { mockStatic(FunctionCommon.class); doReturn(String.class).when(FunctionCommon.class); FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class)); + PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod(); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));