This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea4231  fix submit function via url (#3934)
5ea4231 is described below

commit 5ea423150ba4d879207a22899cdd6e8154e6382f
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Mar 29 16:41:31 2019 -0500

    fix submit function via url (#3934)
    
    * fix submit function via url
    
    * cleaning up
    
    * add test
    
    * make method private
    
    * add additional tests
    
    * cleaning up
    
    * improving tests
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 119 +++++++++++++++++++--
 .../org/apache/pulsar/functions/utils/Utils.java   |  44 ++------
 2 files changed, 119 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 042a952..abd9684 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
 import lombok.ToString;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
@@ -61,11 +63,16 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
@@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest {
     private final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+    private Thread fileServerThread;
+    private static final int fileServerPort = PortManager.nextFreePort();
+    private HttpServer fileServer;
 
     @DataProvider(name = "validRoleName")
     public Object[][] validRoleName() {
@@ -213,12 +223,71 @@ public class PulsarFunctionE2ETest {
 
         System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
 
-        Thread.sleep(100);
+        // setting up simple web sever to test submitting function via URL
+        fileServerThread = new Thread(() -> {
+            try {
+                fileServer = HttpServer.create(new 
InetSocketAddress(fileServerPort), 0);
+                fileServer.createContext("/pulsar-io-data-generator.nar", he 
-> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", 
"application/octet-stream");
+
+                        File file = new 
File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new 
FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new 
BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.createContext("/pulsar-functions-api-examples.jar", 
he -> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", 
"application/octet-stream");
+
+                        File file = new 
File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new 
FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new 
BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.setExecutor(null); // creates a default executor
+                log.info("Starting file server...");
+                fileServer.start();
+            } catch (Exception e) {
+                log.error("Failed to start file server: ", e);
+                fileServer.stop(0);
+            }
+
+        });
+        fileServerThread.start();
     }
 
     @AfterMethod
     void shutdown() throws Exception {
         log.info("--- Shutting down ---");
+        fileServer.stop(0);
+        fileServerThread.interrupt();
         pulsarClient.close();
         admin.close();
         functionsWorkerService.stop();
@@ -309,8 +378,7 @@ public class PulsarFunctionE2ETest {
      *
      * @throws Exception
      */
-    @Test(timeOut = 20000)
-    public void testE2EPulsarFunction() throws Exception {
+    private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception 
{
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -328,7 +396,6 @@ public class PulsarFunctionE2ETest {
         Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
         Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
 
-        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
         FunctionConfig functionConfig = createFunctionConfig(tenant, 
namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
@@ -386,7 +453,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
-    public void testPulsarSinkStats() throws Exception {
+    public void testE2EPulsarFunctionWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionWithUrl() throws Exception {
+        String jarFilePathUrl = 
String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar";, 
fileServerPort);
+        testE2EPulsarFunction(jarFilePathUrl);
+    }
+
+    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -401,7 +479,6 @@ public class PulsarFunctionE2ETest {
         // create a producer that creates a topic at broker
         Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
-        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
         SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, 
functionName, sourceTopic, subscriptionName);
         admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
 
@@ -413,7 +490,7 @@ public class PulsarFunctionE2ETest {
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 5, 150);
+        }, 50, 150);
         // validate pulsar sink consumer has started on the topic
         
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
@@ -586,7 +663,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
-    public void testPulsarSourceStats() throws Exception {
+    public void testPulsarSinkStatsWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSinkStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSinkStatsWithUrl() throws Exception {
+        String jarFilePathUrl = 
String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar";, 
fileServerPort);
+        testPulsarSinkStats(jarFilePathUrl);
+    }
+
+    private void testPulsarSourceStats(String jarFilePathUrl) throws Exception 
{
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + "/output";
@@ -595,7 +683,6 @@ public class PulsarFunctionE2ETest {
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
 
-        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
         SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, functionName, sinkTopic);
         admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
 
@@ -615,7 +702,7 @@ public class PulsarFunctionE2ETest {
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 10, 150);
+        }, 50, 150);
         assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
 
         String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
@@ -688,6 +775,18 @@ public class PulsarFunctionE2ETest {
     }
 
     @Test(timeOut = 20000)
+    public void testPulsarSourceStatsWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSourceStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSourceStatsWithUrl() throws Exception {
+        String jarFilePathUrl = 
String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar";, 
fileServerPort);
+        testPulsarSourceStats(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 20000)
     public void testPulsarFunctionStats() throws Exception {
 
         final String namespacePortion = "io";
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 2318f5a..38002a1 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -269,12 +269,11 @@ public class Utils {
             URL website = new URL(destPkgUrl);
             File tempFile = File.createTempFile("function", ".tmp");
             ReadableByteChannel rbc = 
Channels.newChannel(website.openStream());
+            log.info("Downloading function package from {} to {} ...", 
destPkgUrl, tempFile.getAbsoluteFile());
             try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                fos.getChannel().transferFrom(rbc, 0, 10);
-            }
-            if (tempFile.exists()) {
-                tempFile.delete();
+                fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
             }
+            log.info("Downloading function package from {} to {} completed!", 
destPkgUrl, tempFile.getAbsoluteFile());
             return tempFile;
         } else {
             throw new IllegalArgumentException("Unsupported url protocol "+ 
destPkgUrl +", supported url protocols: [file/http/https]");
@@ -318,39 +317,16 @@ public class Utils {
                 throw new IllegalArgumentException(String.format("The archive 
%s is corrupted", archivePath));
             }
         }
+
         if (!isEmpty(pkgUrl)) {
-            if 
(pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-                try {
-                    URL url = new URL(pkgUrl);
-                    File file = new File(url.toURI());
-                    if (!file.exists()) {
-                        throw new IOException(pkgUrl + " does not exists 
locally");
-                    }
-                    return NarClassLoader.getFromArchive(file, 
Collections.emptySet());
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            "Corrupt User PackageFile " + pkgUrl + " with 
error " + e.getMessage());
-                }
-            } else if (pkgUrl.startsWith("http")) {
-                try {
-                    URL website = new URL(pkgUrl);
-                    File tempFile = File.createTempFile("function", ".tmp");
-                    ReadableByteChannel rbc = 
Channels.newChannel(website.openStream());
-                    try (FileOutputStream fos = new 
FileOutputStream(tempFile)) {
-                        fos.getChannel().transferFrom(rbc, 0, 10);
-                    }
-                    if (tempFile.exists()) {
-                        tempFile.delete();
-                    }
-                    return NarClassLoader.getFromArchive(tempFile, 
Collections.emptySet());
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            "Corrupt User PackageFile " + pkgUrl + " with 
error " + e.getMessage());
-                }
-            } else {
-                throw new IllegalArgumentException("Unsupported url protocol 
"+ pkgUrl +", supported url protocols: [file/http/https]");
+            try {
+                return 
NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), 
Collections.emptySet());
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + pkgUrl + " with error " 
+ e.getMessage());
             }
         }
+
         if (uploadedInputStreamFileName != null) {
             try {
                 return 
NarClassLoader.getFromArchive(uploadedInputStreamFileName,

Reply via email to