This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5ea4231 fix submit function via url (#3934) 5ea4231 is described below commit 5ea423150ba4d879207a22899cdd6e8154e6382f Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Mar 29 16:41:31 2019 -0500 fix submit function via url (#3934) * fix submit function via url * cleaning up * add test * make method private * add additional tests * cleaning up * improving tests --- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 119 +++++++++++++++++++-- .../org/apache/pulsar/functions/utils/Utils.java | 44 ++------ 2 files changed, 119 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 042a952..abd9684 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -20,6 +20,8 @@ package org.apache.pulsar.io; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpServer; import lombok.ToString; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; @@ -61,11 +63,16 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.BufferedInputStream; import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.lang.reflect.Method; import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.net.URL; import java.util.Arrays; import java.util.Collections; @@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest { private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); + private Thread fileServerThread; + private static final int fileServerPort = PortManager.nextFreePort(); + private HttpServer fileServer; @DataProvider(name = "validRoleName") public Object[][] validRoleName() { @@ -213,12 +223,71 @@ public class PulsarFunctionE2ETest { System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, ""); - Thread.sleep(100); + // setting up simple web sever to test submitting function via URL + fileServerThread = new Thread(() -> { + try { + fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0); + fileServer.createContext("/pulsar-io-data-generator.nar", he -> { + try { + + Headers headers = he.getResponseHeaders(); + headers.add("Content-Type", "application/octet-stream"); + + File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile()); + byte[] bytes = new byte [(int)file.length()]; + + FileInputStream fileInputStream = new FileInputStream(file); + BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); + bufferedInputStream.read(bytes, 0, bytes.length); + + he.sendResponseHeaders(200, file.length()); + OutputStream outputStream = he.getResponseBody(); + outputStream.write(bytes, 0, bytes.length); + outputStream.close(); + + } catch (Exception e) { + log.error("Error when downloading: {}", e, e); + } + }); + fileServer.createContext("/pulsar-functions-api-examples.jar", he -> { + try { + + Headers headers = he.getResponseHeaders(); + headers.add("Content-Type", "application/octet-stream"); + + File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile()); + byte[] bytes = new byte [(int)file.length()]; + + FileInputStream fileInputStream = new FileInputStream(file); + BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); + bufferedInputStream.read(bytes, 0, bytes.length); + + he.sendResponseHeaders(200, file.length()); + OutputStream outputStream = he.getResponseBody(); + outputStream.write(bytes, 0, bytes.length); + outputStream.close(); + + } catch (Exception e) { + log.error("Error when downloading: {}", e, e); + } + }); + fileServer.setExecutor(null); // creates a default executor + log.info("Starting file server..."); + fileServer.start(); + } catch (Exception e) { + log.error("Failed to start file server: ", e); + fileServer.stop(0); + } + + }); + fileServerThread.start(); } @AfterMethod void shutdown() throws Exception { log.info("--- Shutting down ---"); + fileServer.stop(0); + fileServerThread.interrupt(); pulsarClient.close(); admin.close(); functionsWorkerService.stop(); @@ -309,8 +378,7 @@ public class PulsarFunctionE2ETest { * * @throws Exception */ - @Test(timeOut = 20000) - public void testE2EPulsarFunction() throws Exception { + private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; @@ -328,7 +396,6 @@ public class PulsarFunctionE2ETest { Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe(); - String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); @@ -386,7 +453,18 @@ public class PulsarFunctionE2ETest { } @Test(timeOut = 20000) - public void testPulsarSinkStats() throws Exception { + public void testE2EPulsarFunctionWithFile() throws Exception { + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + testE2EPulsarFunction(jarFilePathUrl); + } + + @Test(timeOut = 40000) + public void testE2EPulsarFunctionWithUrl() throws Exception { + String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort); + testE2EPulsarFunction(jarFilePathUrl); + } + + private void testPulsarSinkStats(String jarFilePathUrl) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sourceTopic = "persistent://" + replNamespace + "/input"; @@ -401,7 +479,6 @@ public class PulsarFunctionE2ETest { // create a producer that creates a topic at broker Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); - String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName); admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl); @@ -413,7 +490,7 @@ public class PulsarFunctionE2ETest { } catch (PulsarAdminException e) { return false; } - }, 5, 150); + }, 50, 150); // validate pulsar sink consumer has started on the topic assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); @@ -586,7 +663,18 @@ public class PulsarFunctionE2ETest { } @Test(timeOut = 20000) - public void testPulsarSourceStats() throws Exception { + public void testPulsarSinkStatsWithFile() throws Exception { + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + testPulsarSinkStats(jarFilePathUrl); + } + + @Test(timeOut = 40000) + public void testPulsarSinkStatsWithUrl() throws Exception { + String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort); + testPulsarSinkStats(jarFilePathUrl); + } + + private void testPulsarSourceStats(String jarFilePathUrl) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sinkTopic = "persistent://" + replNamespace + "/output"; @@ -595,7 +683,6 @@ public class PulsarFunctionE2ETest { Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); - String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic); admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl); @@ -615,7 +702,7 @@ public class PulsarFunctionE2ETest { } catch (PulsarAdminException e) { return false; } - }, 10, 150); + }, 50, 150); assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1); String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort); @@ -688,6 +775,18 @@ public class PulsarFunctionE2ETest { } @Test(timeOut = 20000) + public void testPulsarSourceStatsWithFile() throws Exception { + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile(); + testPulsarSourceStats(jarFilePathUrl); + } + + @Test(timeOut = 40000) + public void testPulsarSourceStatsWithUrl() throws Exception { + String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort); + testPulsarSourceStats(jarFilePathUrl); + } + + @Test(timeOut = 20000) public void testPulsarFunctionStats() throws Exception { final String namespacePortion = "io"; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 2318f5a..38002a1 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -269,12 +269,11 @@ public class Utils { URL website = new URL(destPkgUrl); File tempFile = File.createTempFile("function", ".tmp"); ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile()); try (FileOutputStream fos = new FileOutputStream(tempFile)) { - fos.getChannel().transferFrom(rbc, 0, 10); - } - if (tempFile.exists()) { - tempFile.delete(); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); } + log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile()); return tempFile; } else { throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); @@ -318,39 +317,16 @@ public class Utils { throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath)); } } + if (!isEmpty(pkgUrl)) { - if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { - try { - URL url = new URL(pkgUrl); - File file = new File(url.toURI()); - if (!file.exists()) { - throw new IOException(pkgUrl + " does not exists locally"); - } - return NarClassLoader.getFromArchive(file, Collections.emptySet()); - } catch (Exception e) { - throw new IllegalArgumentException( - "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage()); - } - } else if (pkgUrl.startsWith("http")) { - try { - URL website = new URL(pkgUrl); - File tempFile = File.createTempFile("function", ".tmp"); - ReadableByteChannel rbc = Channels.newChannel(website.openStream()); - try (FileOutputStream fos = new FileOutputStream(tempFile)) { - fos.getChannel().transferFrom(rbc, 0, 10); - } - if (tempFile.exists()) { - tempFile.delete(); - } - return NarClassLoader.getFromArchive(tempFile, Collections.emptySet()); - } catch (Exception e) { - throw new IllegalArgumentException( - "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage()); - } - } else { - throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]"); + try { + return NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), Collections.emptySet()); + } catch (Exception e) { + throw new IllegalArgumentException( + "Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage()); } } + if (uploadedInputStreamFileName != null) { try { return NarClassLoader.getFromArchive(uploadedInputStreamFileName,