srkukarni closed pull request #2917: Remove functions-util dependency from 
pulsar-client-admin
URL: https://github.com/apache/pulsar/pull/2917
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 5af899a598..e5e2af7a04 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -40,11 +40,11 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f76df6b1d7..9c5cdb069b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -43,15 +43,13 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 19e05a9472..c0d81e5c80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -206,7 +206,7 @@ public void testAuthorization() throws Exception {
         final String functionName = "PulsarSink-test";
         final String subscriptionName = "test-sub";
 
-        String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
+        String jarFilePathUrl = String.format("%s:%s", 
org.apache.pulsar.common.functions.Utils.FILE,
                 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
         FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, 
tenant, namespacePortion,
                 functionName, "my.*", sinkTopic, subscriptionName);
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index e39f36213f..79625bf740 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -46,12 +46,6 @@
       <version>${project.version}</version>
     </dependency>
 
-      <dependency>
-          <groupId>${project.groupId}</groupId>
-          <artifactId>pulsar-functions-utils</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-
     <dependency>
       <groupId>org.glassfish.jersey.core</groupId>
       <artifactId>jersey-client</artifactId>
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 077a74217b..de7331a311 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -138,7 +138,6 @@ public void setup() throws Exception {
             .thenReturn(true);
         when(Reflections.classImplementsIface(anyString(), 
any())).thenReturn(true);
         when(Reflections.createInstance(eq(DummyFunction.class.getName()), 
any(File.class))).thenReturn(new DummyFunction());
-        PowerMockito.stub(PowerMockito.method(Utils.class, 
"fileExists")).toReturn(true);
     }
 
 //    @Test
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 8e168f3169..5ceb0dfa45 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -74,6 +74,13 @@
       <artifactId>stream-storage-java-client</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-utils</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 80a7476320..324a028f0e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import com.google.protobuf.util.JsonFormat;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.utils.Utils;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -65,7 +65,7 @@ void processArguments() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(admin.worker().getFunctionsStats());
+            String json = 
JsonFormat.printer().print(admin.worker().getFunctionsStats());
             GsonBuilder gsonBuilder = new GsonBuilder();
             if (indent) {
                 gsonBuilder.setPrettyPrinting();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 24597eb202..dca4df6756 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -36,6 +36,7 @@
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
+import com.google.protobuf.util.JsonFormat;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -64,7 +65,7 @@
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.common.functions.Utils;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -616,7 +617,7 @@ void runCmd() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(
+            String json = JsonFormat.printer().print(
                     isBlank(instanceId) ? 
admin.functions().getFunctionStatus(tenant, namespace, functionName)
                             : admin.functions().getFunctionStatus(tenant, 
namespace, functionName,
                                     Integer.parseInt(instanceId)));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index d14c0534b0..270a3bd636 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -22,7 +22,6 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -37,10 +36,10 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Paths;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import com.google.protobuf.util.JsonFormat;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,9 +52,7 @@
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.utils.io.Connectors;
+import org.apache.pulsar.common.functions.Utils;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO sinks 
(egress data from Pulsar)")
@@ -183,22 +180,8 @@ public void runCmd() throws Exception {
         }
 
         @Override
-        protected String validateSinkType(String sinkType) throws IOException {
-            // Validate the connector sink type from the locally available 
connectors
-            String pulsarHome = System.getenv("PULSAR_HOME");
-            if (pulsarHome == null) {
-                pulsarHome = Paths.get("").toAbsolutePath().toString();
-            }
-            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
-            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
-
-            if (!connectors.getSinks().containsKey(sinkType)) {
-                throw new ParameterException("Invalid sink type '" + sinkType 
+ "' -- Available sinks are: "
-                        + connectors.getSinks().keySet());
-            }
-
-            // Sink type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
-            return connectors.getSinks().get(sinkType).toString();
+        protected String validateSinkType(String sinkType) {
+            return sinkType;
         }
     }
 
@@ -432,7 +415,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
             }
 
             if (!Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) 
&&
-                    !sinkConfig.getArchive().startsWith(BUILTIN)) {
+                    !sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
                 if (!new File(sinkConfig.getArchive()).exists()) {
                     throw new IllegalArgumentException(String.format("Sink 
Archive file %s does not exist", sinkConfig.getArchive()));
                 }
@@ -545,7 +528,7 @@ void runCmd() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(
+            String json = JsonFormat.printer().print(
                     isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, 
namespace, sinkName)
                             : admin.sink().getSinkStatus(tenant, namespace, 
sinkName,
                             Integer.parseInt(instanceId)));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 32ebfc996b..069f2f80ba 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -22,7 +22,6 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -37,13 +36,13 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
-import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.protobuf.util.JsonFormat;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -56,9 +55,7 @@
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.utils.io.Connectors;
+import org.apache.pulsar.common.functions.Utils;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO Sources 
(ingress data into Pulsar)")
@@ -187,22 +184,8 @@ public void runCmd() throws Exception {
         }
 
         @Override
-        protected String validateSourceType(String sourceType) throws 
IOException {
-            // Validate the connector source type from the locally available 
connectors
-            String pulsarHome = System.getenv("PULSAR_HOME");
-            if (pulsarHome == null) {
-                pulsarHome = Paths.get("").toAbsolutePath().toString();
-            }
-            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
-            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
-
-            if (!connectors.getSources().containsKey(sourceType)) {
-                throw new ParameterException("Invalid source type '" + 
sourceType + "' -- Available sources are: "
-                        + connectors.getSources().keySet());
-            }
-
-            // Source type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
-            return connectors.getSources().get(sourceType).toString();
+        protected String validateSourceType(String sourceType) {
+            return sourceType;
         }
     }
 
@@ -388,7 +371,7 @@ protected void validateSourceConfigs(SourceConfig 
sourceConfig) {
                 throw new ParameterException("Source archive not specfied");
             }
             if 
(!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) &&
-                !sourceConfig.getArchive().startsWith(BUILTIN)) {
+                !sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
                 if (!new File(sourceConfig.getArchive()).exists()) {
                     throw new IllegalArgumentException(String.format("Source 
Archive %s does not exist", sourceConfig.getArchive()));
                 }
@@ -501,7 +484,7 @@ void runCmd() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(
+            String json = JsonFormat.printer().print(
                     isBlank(instanceId) ? 
admin.source().getSourceStatus(tenant, namespace, sourceName)
                             : admin.source().getSourceStatus(tenant, 
namespace, sourceName,
                             Integer.parseInt(instanceId)));
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index cbe52961f9..09418dfa70 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -43,7 +43,7 @@
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
new file mode 100644
index 0000000000..855d2e4411
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+public class Utils {
+    public static String HTTP = "http";
+    public static String FILE = "file";
+    public static String BUILTIN = "builtin";
+
+    public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) 
{
+        return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
+                || functionPkgUrl.startsWith(FILE));
+    }
+}
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index ed52a96a0d..40a6ee7565 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -25,6 +25,8 @@
 import com.google.gson.JsonParser;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@
 import org.apache.pulsar.functions.proto.Function;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
 
 import static org.apache.pulsar.functions.utils.Utils.*;
 
@@ -94,7 +98,7 @@ void start() throws Exception {
             parallelism = functionConfig.getParallelism();
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                 userCodeFile = functionConfig.getJar();
-                if (isFunctionPackageUrlSupported(userCodeFile)) {
+                if 
(org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile))
 {
                     classLoader = extractClassLoader(userCodeFile);
                 } else {
                     File file = new File(userCodeFile);
@@ -109,10 +113,14 @@ void start() throws Exception {
             functionDetails = FunctionConfigUtils.convert(functionConfig, 
classLoader);
         } else if (!StringUtils.isEmpty(sourceConfigString)) {
             SourceConfig sourceConfig = new 
Gson().fromJson(sourceConfigString, SourceConfig.class);
+            String builtInSource = isBuiltInSource(sourceConfig.getArchive());
+            if (builtInSource != null) {
+                sourceConfig.setArchive(builtInSource);
+            }
             NarClassLoader classLoader;
             parallelism = sourceConfig.getParallelism();
             userCodeFile = sourceConfig.getArchive();
-            if (isFunctionPackageUrlSupported(userCodeFile)) {
+            if 
(org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile))
 {
                 classLoader = extractNarClassLoader(null, userCodeFile, null);
             } else {
                 File file = new File(userCodeFile);
@@ -124,10 +132,14 @@ void start() throws Exception {
             functionDetails = SourceConfigUtils.convert(sourceConfig, 
classLoader);
         } else {
             SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, 
SinkConfig.class);
+            String builtInSink = isBuiltInSource(sinkConfig.getArchive());
+            if (builtInSink != null) {
+                sinkConfig.setArchive(builtInSink);
+            }
             NarClassLoader classLoader;
             parallelism = sinkConfig.getParallelism();
             userCodeFile = sinkConfig.getArchive();
-            if (isFunctionPackageUrlSupported(userCodeFile)) {
+            if 
(org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile))
 {
                 classLoader = extractNarClassLoader(null, userCodeFile, null);
             } else {
                 File file = new File(userCodeFile);
@@ -221,4 +233,38 @@ public void run() {
 
         }
     }
+
+    private String isBuiltInSource(String sourceType) throws IOException {
+        // Validate the connector source type from the locally available 
connectors
+        Connectors connectors = getConnectors();
+
+        if (connectors.getSources().containsKey(sourceType)) {
+            // Source type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSources().get(sourceType).toString();
+        } else {
+            return null;
+        }
+    }
+
+    private String isBuiltInSink(String sinkType) throws IOException {
+        // Validate the connector source type from the locally available 
connectors
+        Connectors connectors = getConnectors();
+
+        if (connectors.getSinks().containsKey(sinkType)) {
+            // Source type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSinks().get(sinkType).toString();
+        } else {
+            return null;
+        }
+    }
+
+    private Connectors getConnectors() throws IOException {
+        // Validate the connector source type from the locally available 
connectors
+        String pulsarHome = System.getenv("PULSAR_HOME");
+        if (pulsarHome == null) {
+            pulsarHome = Paths.get("").toAbsolutePath().toString();
+        }
+        String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
+        return ConnectorUtils.searchForConnectors(connectorsDir);
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 968e63e1af..8fbd676d5b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -35,13 +35,12 @@
 import java.net.MalformedURLException;
 import java.util.*;
 
-import static java.util.Objects.isNull;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.apache.pulsar.functions.utils.Utils.BUILTIN;
+import static org.apache.pulsar.common.functions.Utils.BUILTIN;
 import static org.apache.pulsar.functions.utils.Utils.loadJar;
 
 public class FunctionConfigUtils {
@@ -505,13 +504,13 @@ private static void doCommonChecks(FunctionConfig 
functionConfig) {
             throw new IllegalArgumentException("Dead Letter Topic specified, 
however max retries is set to infinity");
         }
 
-        if (!isEmpty(functionConfig.getJar()) && 
!Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
+        if (!isEmpty(functionConfig.getJar()) && 
!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
                 && functionConfig.getJar().startsWith(BUILTIN)) {
             if (!new File(functionConfig.getJar()).exists()) {
                 throw new IllegalArgumentException("The supplied jar file does 
not exist");
             }
         }
-        if (!isEmpty(functionConfig.getPy()) && 
!Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
+        if (!isEmpty(functionConfig.getPy()) && 
!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
                 && functionConfig.getPy().startsWith(BUILTIN)) {
             if (!new File(functionConfig.getPy()).exists()) {
                 throw new IllegalArgumentException("The supplied python file 
does not exist");
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 2cbd460245..b79ee60be8 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -56,10 +56,10 @@ public static FunctionDetails convert(SinkConfig 
sinkConfig, NarClassLoader clas
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
 
-        boolean isBuiltin = 
!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = 
!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if 
(!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(Utils.FILE)) {
+            if 
(!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
 {
                 if (isBlank(sinkConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be 
present for archive with file-url");
                 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 6ce1db3fd0..0df1aec5ed 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -53,10 +53,10 @@ public static FunctionDetails convert(SourceConfig 
sourceConfig, NarClassLoader
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
 
-        boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && 
sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && 
sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if (!StringUtils.isEmpty(sourceConfig.getArchive()) && 
sourceConfig.getArchive().startsWith(Utils.FILE)) {
+            if (!StringUtils.isEmpty(sourceConfig.getArchive()) && 
sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
 {
                 if 
(org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be 
present for archive with file-url");
                 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 3b9c8da812..8181e9f255 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.functions.utils;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -58,10 +56,6 @@
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class Utils {
 
-    public static String HTTP = "http";
-    public static String FILE = "file";
-    public static String BUILTIN = "builtin";
-
     public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
     }
@@ -217,15 +211,6 @@ public static Runtime 
convertRuntime(FunctionConfig.Runtime runtime) {
         return typeArg;
     }
 
-    public static boolean fileExists(String file) {
-        return new File(file).exists();
-    }
-
-    public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) 
{
-        return isNotBlank(functionPkgUrl) && 
(functionPkgUrl.startsWith(Utils.HTTP)
-                || functionPkgUrl.startsWith(Utils.FILE));
-    }
-
     /**
      * Load a jar
      * @param jar file of jar
@@ -248,7 +233,7 @@ public static ClassLoader extractClassLoader(String 
destPkgUrl) throws IOExcepti
     }
 
     public static File extractFileFromPkg(String destPkgUrl) throws 
IOException, URISyntaxException {
-        if (destPkgUrl.startsWith(FILE)) {
+        if 
(destPkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
             URL url = new URL(destPkgUrl);
             File file = new File(url.toURI());
             if (!file.exists()) {
@@ -309,7 +294,7 @@ public static NarClassLoader extractNarClassLoader(Path 
archivePath, String pkgU
             }
         }
         if (!StringUtils.isEmpty(pkgUrl)) {
-            if (pkgUrl.startsWith(FILE)) {
+            if 
(pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
                 try {
                     URL url = new URL(pkgUrl);
                     File file = new File(url.toURI());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 5df48de1e8..4f8d04a71d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.apache.pulsar.functions.utils.Utils.FILE;
-import static org.apache.pulsar.functions.utils.Utils.HTTP;
+import static org.apache.pulsar.common.functions.Utils.FILE;
+import static org.apache.pulsar.common.functions.Utils.HTTP;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
-import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
+import static 
org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.MoreFiles;
@@ -56,7 +56,6 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index d0106602ba..41f04af445 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -37,13 +37,10 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.pulsar.functions.proto.Function;
-import static org.apache.pulsar.functions.utils.Utils.FILE;
-import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl;
 
 @Slf4j
 public final class Utils {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 914a691524..c850d90409 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -901,10 +901,10 @@ public Response downloadFunction(final String path) {
         return Response.status(Status.OK).entity(new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException {
-                if (path.startsWith(HTTP)) {
+                if 
(path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) {
                     URL url = new URL(path);
                     IOUtils.copy(url.openStream(), output);
-                } else if (path.startsWith(FILE)) {
+                } else if 
(path.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
                     URL url = new URL(path);
                     File file;
                     try {
@@ -970,7 +970,7 @@ private FunctionDetails 
validateUpdateRequestParamsWithPkgUrl(String tenant, Str
             String functionPkgUrl, String functionDetailsJson, String 
componentConfigJson,
             String componentType)
             throws IllegalArgumentException, IOException, URISyntaxException {
-        if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
+        if 
(!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl))
 {
             throw new IllegalArgumentException("Function Package url is not 
valid. supported url (http/https/file)");
         }
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
@@ -1081,7 +1081,7 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
             SourceConfigUtils.inferMissingArguments(sourceConfig);
             if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
                 String builtinArchive = sourceConfig.getArchive();
-                if (builtinArchive.startsWith(BUILTIN)) {
+                if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                     builtinArchive = 
builtinArchive.replaceFirst("^builtin://", "");
                 }
                 try {
@@ -1099,7 +1099,7 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
             SinkConfigUtils.inferMissingArguments(sinkConfig);
             if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
                 String builtinArchive = sinkConfig.getArchive();
-                if (builtinArchive.startsWith(BUILTIN)) {
+                if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                     builtinArchive = 
builtinArchive.replaceFirst("^builtin://", "");
                 }
                 try {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index bf4b4aafa5..a4926e304c 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -38,7 +38,7 @@
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.testng.annotations.Test;
-import static org.apache.pulsar.functions.utils.Utils.FILE;
+import static org.apache.pulsar.common.functions.Utils.FILE;
 
 /**
  * Unit test of {@link FunctionActioner}.
diff --git a/pulsar-sql/presto-distribution/LICENSE 
b/pulsar-sql/presto-distribution/LICENSE
index 2bf467a3fb..76d2034174 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -228,40 +228,17 @@ The Apache Software License, Version 2.0
  * Google Guice
     - guice-4.2.0.jar
     - guice-multibindings-4.2.0.jar
- * Google Gson
-    - gson-2.8.2.jar
- * Google Common Protos
-    - proto-google-common-protos-1.0.0.jar
  * Apache Commons
     - commons-math3-3.6.1.jar
+    - commons-beanutils-core-1.8.0.jar
     - commons-beanutils-core-1.8.3.jar
-    - commons-beanutils-1.7.0.jar
     - commons-compress-1.15.jar
     - commons-lang3-3.3.2.jar
     - commons-lang3-3.4.jar
-    - commons-collections-3.2.2.jar
-    - commons-configuration-1.6.jar
-    - commons-digester-1.8.jar
-    - commons-lang-2.4.jar
-    - commons-logging-1.1.1.jar
  * Netty
     - netty-3.6.2.Final.jar
-    - netty-all-4.1.22.Final.jar
-    - netty-buffer-4.1.22.Final.jar
-    - netty-codec-4.1.22.Final.jar
-    - netty-codec-http-4.1.22.Final.jar
-    - netty-codec-http2-4.1.22.Final.jar
-    - netty-codec-socks-4.1.22.Final.jar
-    - netty-common-4.1.22.Final.jar
-    - netty-handler-4.1.22.Final.jar
-    - netty-handler-proxy-4.1.22.Final.jar
-    - netty-resolver-4.1.22.Final.jar
-    - netty-tcnative-boringssl-static-2.0.7.Final.jar
-    - netty-transport-4.1.22.Final.jar
  * Joda Time
     - joda-time-2.9.9.jar
- * TypeTools
-    - typetools-0.5.0.jar
  * Jetty
     - http2-client-9.4.11.v20180605.jar
     - http2-common-9.4.11.v20180605.jar
@@ -277,7 +254,6 @@ The Apache Software License, Version 2.0
     - jetty-server-9.4.11.v20180605.jar
     - jetty-servlet-9.4.11.v20180605.jar
     - jetty-util-9.4.11.v20180605.jar
-    - jetty-util-9.4.12.v20180830.jar
   * Javassist
     - javassist-3.22.0-CR2.jar
   * Asynchronous Http Client
@@ -321,7 +297,6 @@ The Apache Software License, Version 2.0
     - units-1.0.jar
   * Error Prone Annotations
     - error_prone_annotations-2.1.3.jar
-    - error_prone_annotations-2.1.2.jar
   * Esri Geometry API For Java
     - esri-geometry-api-2.1.0.jar
   * Fastutil
@@ -354,7 +329,6 @@ The Apache Software License, Version 2.0
   * OkHttp
     - okhttp-3.9.0.jar
     - okhttp-urlconnection-3.9.0.jar
-    - okhttp-2.5.0.jar
   * OpenCSV
     - opencsv-2.3.jar
   * Plexus
@@ -380,7 +354,6 @@ The Apache Software License, Version 2.0
     - jcommander-1.48.jar
   * FindBugs JSR305
     - jsr305-3.0.2.jar
-    - jsr305-3.0.0.jar
   * Objenesis
     - objenesis-2.1.jar
     - objenesis-2.6.jar
@@ -417,22 +390,6 @@ The Apache Software License, Version 2.0
     - simpleclient_servlet-0.5.0.jar
   * LZ4
     - lz4-java-1.5.0.jar
-  * Bookkeeper
-    - circe-checksum-4.7.2.jar
-  * GRPC
-    - grpc-all-1.12.0.jar
-    - grpc-auth-1.12.0.jar
-    - grpc-context-1.12.0.jar
-    - grpc-core-1.12.0.jar
-    - grpc-netty-1.12.0.jar
-    - grpc-okhttp-1.12.0.jar
-    - grpc-protobuf-1.12.0.jar
-    - grpc-protobuf-lite-1.12.0.jar
-    - grpc-protobuf-nano-1.12.0.jar
-    - grpc-stub-1.12.0.jar
-  * OpenCensus
-    - opencensus-api-0.11.0.jar
-    - opencensus-contrib-grpc-metrics-0.11.0.jar
 
 Protocol Buffers License
  * Protocol Buffers
@@ -441,11 +398,6 @@ Protocol Buffers License
 
 BSD 3-clause "New" or "Revised" License
   *  RE2J TD -- re2j-td-1.4.jar
-  * google-auth-library-credentials-0.9.0.jar
-  * protobuf-java-util-3.5.1.jar
-
-BSD 2-clause
-  * protobuf-javanano-3.0.0-alpha-5.jar
 
 BSD License
  * ANTLR 4 Runtime -- antlr4-runtime-4.6.jar


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to