This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8f294c9 Remove functions-util dependency from pulsar-client-admin (#2917) 8f294c9 is described below commit 8f294c9ff67c36deee1d84d1dbc2dab8aa612bb4 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Nov 3 00:24:43 2018 -0700 Remove functions-util dependency from pulsar-client-admin (#2917) * Remove functions-util dependency from pulsar-client-admin * Simplified * reverted accidental change * fixed unittest * Updated license --- .../worker/PulsarWorkerAssignmentTest.java | 2 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 4 +- .../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +- pulsar-client-admin/pom.xml | 6 --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 1 - pulsar-client-tools/pom.xml | 7 +++ .../apache/pulsar/admin/cli/CmdFunctionWorker.java | 4 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 5 ++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 29 +++--------- .../org/apache/pulsar/admin/cli/CmdSources.java | 29 +++--------- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 2 +- .../org/apache/pulsar/common/functions/Utils.java | 32 +++++++++++++ .../pulsar/functions/runtime/LocalRunner.java | 52 ++++++++++++++++++++-- .../functions/utils/FunctionConfigUtils.java | 7 ++- .../pulsar/functions/utils/SinkConfigUtils.java | 4 +- .../pulsar/functions/utils/SourceConfigUtils.java | 4 +- .../org/apache/pulsar/functions/utils/Utils.java | 19 +------- .../pulsar/functions/worker/FunctionActioner.java | 7 ++- .../org/apache/pulsar/functions/worker/Utils.java | 3 -- .../functions/worker/rest/api/FunctionsImpl.java | 10 ++--- .../functions/worker/FunctionActionerTest.java | 2 +- pulsar-sql/presto-distribution/LICENSE | 50 +-------------------- 22 files changed, 128 insertions(+), 153 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index f5b346b..c0d289d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -40,11 +40,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index e425cf7..494e0db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -43,15 +43,13 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 19e05a9..c0d81e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -206,7 +206,7 @@ public class PulsarFunctionTlsTest { final String functionName = "PulsarSink-test"; final String subscriptionName = "test-sub"; - String jarFilePathUrl = String.format("%s:%s", Utils.FILE, + String jarFilePathUrl = String.format("%s:%s", org.apache.pulsar.common.functions.Utils.FILE, PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()); FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index e39f362..79625bf 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -46,12 +46,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-functions-utils</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 077a742..de7331a 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -138,7 +138,6 @@ public class CmdFunctionsTest { .thenReturn(true); when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true); when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction()); - PowerMockito.stub(PowerMockito.method(Utils.class, "fileExists")).toReturn(true); } // @Test diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index 8e168f3..5ceb0df 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -74,6 +74,13 @@ <artifactId>stream-storage-java-client</artifactId> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-functions-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java index 80a7476..324a028 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.admin.cli; +import com.google.protobuf.util.JsonFormat; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.functions.utils.Utils; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; @@ -65,7 +65,7 @@ public class CmdFunctionWorker extends CmdBase { @Override void runCmd() throws Exception { - String json = Utils.printJson(admin.worker().getFunctionsStats()); + String json = JsonFormat.printer().print(admin.worker().getFunctionsStats()); GsonBuilder gsonBuilder = new GsonBuilder(); if (indent) { gsonBuilder.setPrettyPrinting(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 24597eb..dca4df6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -36,6 +36,7 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; +import com.google.protobuf.util.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -64,7 +65,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.common.functions.Utils; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -616,7 +617,7 @@ public class CmdFunctions extends CmdBase { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.functions().getFunctionStatus(tenant, namespace, functionName) : admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index d14c053..270a3bd 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -37,10 +36,10 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; +import com.google.protobuf.util.JsonFormat; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -53,9 +52,7 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.functions.utils.*; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import org.apache.pulsar.functions.utils.io.Connectors; +import org.apache.pulsar.common.functions.Utils; @Getter @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)") @@ -183,22 +180,8 @@ public class CmdSinks extends CmdBase { } @Override - protected String validateSinkType(String sinkType) throws IOException { - // Validate the connector sink type from the locally available connectors - String pulsarHome = System.getenv("PULSAR_HOME"); - if (pulsarHome == null) { - pulsarHome = Paths.get("").toAbsolutePath().toString(); - } - String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); - Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); - - if (!connectors.getSinks().containsKey(sinkType)) { - throw new ParameterException("Invalid sink type '" + sinkType + "' -- Available sinks are: " - + connectors.getSinks().keySet()); - } - - // Sink type is a valid built-in connector type. For local-run we'll fill it up with its own archive path - return connectors.getSinks().get(sinkType).toString(); + protected String validateSinkType(String sinkType) { + return sinkType; } } @@ -432,7 +415,7 @@ public class CmdSinks extends CmdBase { } if (!Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) && - !sinkConfig.getArchive().startsWith(BUILTIN)) { + !sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { if (!new File(sinkConfig.getArchive()).exists()) { throw new IllegalArgumentException(String.format("Sink Archive file %s does not exist", sinkConfig.getArchive())); } @@ -545,7 +528,7 @@ public class CmdSinks extends CmdBase { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName) : admin.sink().getSinkStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 32ebfc9..069f2f8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -37,13 +36,13 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import com.google.protobuf.util.JsonFormat; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -56,9 +55,7 @@ import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; -import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import org.apache.pulsar.functions.utils.io.Connectors; +import org.apache.pulsar.common.functions.Utils; @Getter @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)") @@ -187,22 +184,8 @@ public class CmdSources extends CmdBase { } @Override - protected String validateSourceType(String sourceType) throws IOException { - // Validate the connector source type from the locally available connectors - String pulsarHome = System.getenv("PULSAR_HOME"); - if (pulsarHome == null) { - pulsarHome = Paths.get("").toAbsolutePath().toString(); - } - String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); - Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); - - if (!connectors.getSources().containsKey(sourceType)) { - throw new ParameterException("Invalid source type '" + sourceType + "' -- Available sources are: " - + connectors.getSources().keySet()); - } - - // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path - return connectors.getSources().get(sourceType).toString(); + protected String validateSourceType(String sourceType) { + return sourceType; } } @@ -388,7 +371,7 @@ public class CmdSources extends CmdBase { throw new ParameterException("Source archive not specfied"); } if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) && - !sourceConfig.getArchive().startsWith(BUILTIN)) { + !sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { if (!new File(sourceConfig.getArchive()).exists()) { throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive())); } @@ -501,7 +484,7 @@ public class CmdSources extends CmdBase { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName) : admin.source().getSourceStatus(tenant, namespace, sourceName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index cbe5296..09418df 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.Utils; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java new file mode 100644 index 0000000..855d2e4 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.functions; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +public class Utils { + public static String HTTP = "http"; + public static String FILE = "file"; + public static String BUILTIN = "builtin"; + + public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { + return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP) + || functionPkgUrl.startsWith(FILE)); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java index ed52a96..40a6ee7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -25,6 +25,8 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -42,6 +44,8 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import static org.apache.pulsar.functions.utils.Utils.*; @@ -94,7 +98,7 @@ public class LocalRunner { parallelism = functionConfig.getParallelism(); if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { userCodeFile = functionConfig.getJar(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractClassLoader(userCodeFile); } else { File file = new File(userCodeFile); @@ -109,10 +113,14 @@ public class LocalRunner { functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader); } else if (!StringUtils.isEmpty(sourceConfigString)) { SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class); + String builtInSource = isBuiltInSource(sourceConfig.getArchive()); + if (builtInSource != null) { + sourceConfig.setArchive(builtInSource); + } NarClassLoader classLoader; parallelism = sourceConfig.getParallelism(); userCodeFile = sourceConfig.getArchive(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractNarClassLoader(null, userCodeFile, null); } else { File file = new File(userCodeFile); @@ -124,10 +132,14 @@ public class LocalRunner { functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader); } else { SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class); + String builtInSink = isBuiltInSource(sinkConfig.getArchive()); + if (builtInSink != null) { + sinkConfig.setArchive(builtInSink); + } NarClassLoader classLoader; parallelism = sinkConfig.getParallelism(); userCodeFile = sinkConfig.getArchive(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractNarClassLoader(null, userCodeFile, null); } else { File file = new File(userCodeFile); @@ -221,4 +233,38 @@ public class LocalRunner { } } + + private String isBuiltInSource(String sourceType) throws IOException { + // Validate the connector source type from the locally available connectors + Connectors connectors = getConnectors(); + + if (connectors.getSources().containsKey(sourceType)) { + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSources().get(sourceType).toString(); + } else { + return null; + } + } + + private String isBuiltInSink(String sinkType) throws IOException { + // Validate the connector source type from the locally available connectors + Connectors connectors = getConnectors(); + + if (connectors.getSinks().containsKey(sinkType)) { + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSinks().get(sinkType).toString(); + } else { + return null; + } + } + + private Connectors getConnectors() throws IOException { + // Validate the connector source type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + return ConnectorUtils.searchForConnectors(connectorsDir); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 968e63e..8fbd676 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -35,13 +35,12 @@ import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.*; -import static java.util.Objects.isNull; import static org.apache.commons.lang.StringUtils.isNotBlank; import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; +import static org.apache.pulsar.common.functions.Utils.BUILTIN; import static org.apache.pulsar.functions.utils.Utils.loadJar; public class FunctionConfigUtils { @@ -505,13 +504,13 @@ public class FunctionConfigUtils { throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity"); } - if (!isEmpty(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) + if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) && functionConfig.getJar().startsWith(BUILTIN)) { if (!new File(functionConfig.getJar()).exists()) { throw new IllegalArgumentException("The supplied jar file does not exist"); } } - if (!isEmpty(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) + if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) && functionConfig.getPy().startsWith(BUILTIN)) { if (!new File(functionConfig.getPy()).exists()) { throw new IllegalArgumentException("The supplied python file does not exist"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 2cbd460..b79ee60 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -56,10 +56,10 @@ public class SinkConfigUtils { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); if (!isBuiltin) { - if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.FILE)) { + if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { if (isBlank(sinkConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 6ce1db3..0df1aec 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -53,10 +53,10 @@ public class SourceConfigUtils { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); if (!isBuiltin) { - if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) { + if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 3b9c8da..8181e9f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.functions.utils; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -58,10 +56,6 @@ import net.jodah.typetools.TypeResolver; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class Utils { - public static String HTTP = "http"; - public static String FILE = "file"; - public static String BUILTIN = "builtin"; - public static String printJson(MessageOrBuilder msg) throws IOException { return JsonFormat.printer().print(msg); } @@ -217,15 +211,6 @@ public class Utils { return typeArg; } - public static boolean fileExists(String file) { - return new File(file).exists(); - } - - public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP) - || functionPkgUrl.startsWith(Utils.FILE)); - } - /** * Load a jar * @param jar file of jar @@ -248,7 +233,7 @@ public class Utils { } public static File extractFileFromPkg(String destPkgUrl) throws IOException, URISyntaxException { - if (destPkgUrl.startsWith(FILE)) { + if (destPkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { URL url = new URL(destPkgUrl); File file = new File(url.toURI()); if (!file.exists()) { @@ -309,7 +294,7 @@ public class Utils { } } if (!StringUtils.isEmpty(pkgUrl)) { - if (pkgUrl.startsWith(FILE)) { + if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { try { URL url = new URL(pkgUrl); File file = new File(url.toURI()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 5df48de..4f8d04a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,11 +18,11 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.functions.utils.Utils.FILE; -import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.common.functions.Utils.FILE; +import static org.apache.pulsar.common.functions.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.utils.Utils.getSinkType; -import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; +import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.MoreFiles; @@ -56,7 +56,6 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; -import org.apache.pulsar.functions.proto.Function.Instance; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.runtime.RuntimeFactory; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index d010660..41f04af 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -37,13 +37,10 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; import org.apache.pulsar.functions.proto.Function; -import static org.apache.pulsar.functions.utils.Utils.FILE; -import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @Slf4j public final class Utils { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 1e0d896..57fc331 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -933,10 +933,10 @@ public class FunctionsImpl { return Response.status(Status.OK).entity(new StreamingOutput() { @Override public void write(final OutputStream output) throws IOException { - if (path.startsWith(HTTP)) { + if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) { URL url = new URL(path); IOUtils.copy(url.openStream(), output); - } else if (path.startsWith(FILE)) { + } else if (path.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { URL url = new URL(path); File file; try { @@ -1002,7 +1002,7 @@ public class FunctionsImpl { String functionPkgUrl, String functionDetailsJson, String componentConfigJson, String componentType) throws IllegalArgumentException, IOException, URISyntaxException { - if (!isFunctionPackageUrlSupported(functionPkgUrl)) { + if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, @@ -1113,7 +1113,7 @@ public class FunctionsImpl { SourceConfigUtils.inferMissingArguments(sourceConfig); if (!StringUtils.isEmpty(sourceConfig.getArchive())) { String builtinArchive = sourceConfig.getArchive(); - if (builtinArchive.startsWith(BUILTIN)) { + if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { builtinArchive = builtinArchive.replaceFirst("^builtin://", ""); } try { @@ -1131,7 +1131,7 @@ public class FunctionsImpl { SinkConfigUtils.inferMissingArguments(sinkConfig); if (!StringUtils.isEmpty(sinkConfig.getArchive())) { String builtinArchive = sinkConfig.getArchive(); - if (builtinArchive.startsWith(BUILTIN)) { + if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { builtinArchive = builtinArchive.replaceFirst("^builtin://", ""); } try { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index bf4b4aa..a4926e3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -38,7 +38,7 @@ import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.testng.annotations.Test; -import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.common.functions.Utils.FILE; /** * Unit test of {@link FunctionActioner}. diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 2bf467a..76d2034 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -228,40 +228,17 @@ The Apache Software License, Version 2.0 * Google Guice - guice-4.2.0.jar - guice-multibindings-4.2.0.jar - * Google Gson - - gson-2.8.2.jar - * Google Common Protos - - proto-google-common-protos-1.0.0.jar * Apache Commons - commons-math3-3.6.1.jar + - commons-beanutils-core-1.8.0.jar - commons-beanutils-core-1.8.3.jar - - commons-beanutils-1.7.0.jar - commons-compress-1.15.jar - commons-lang3-3.3.2.jar - commons-lang3-3.4.jar - - commons-collections-3.2.2.jar - - commons-configuration-1.6.jar - - commons-digester-1.8.jar - - commons-lang-2.4.jar - - commons-logging-1.1.1.jar * Netty - netty-3.6.2.Final.jar - - netty-all-4.1.22.Final.jar - - netty-buffer-4.1.22.Final.jar - - netty-codec-4.1.22.Final.jar - - netty-codec-http-4.1.22.Final.jar - - netty-codec-http2-4.1.22.Final.jar - - netty-codec-socks-4.1.22.Final.jar - - netty-common-4.1.22.Final.jar - - netty-handler-4.1.22.Final.jar - - netty-handler-proxy-4.1.22.Final.jar - - netty-resolver-4.1.22.Final.jar - - netty-tcnative-boringssl-static-2.0.7.Final.jar - - netty-transport-4.1.22.Final.jar * Joda Time - joda-time-2.9.9.jar - * TypeTools - - typetools-0.5.0.jar * Jetty - http2-client-9.4.11.v20180605.jar - http2-common-9.4.11.v20180605.jar @@ -277,7 +254,6 @@ The Apache Software License, Version 2.0 - jetty-server-9.4.11.v20180605.jar - jetty-servlet-9.4.11.v20180605.jar - jetty-util-9.4.11.v20180605.jar - - jetty-util-9.4.12.v20180830.jar * Javassist - javassist-3.22.0-CR2.jar * Asynchronous Http Client @@ -321,7 +297,6 @@ The Apache Software License, Version 2.0 - units-1.0.jar * Error Prone Annotations - error_prone_annotations-2.1.3.jar - - error_prone_annotations-2.1.2.jar * Esri Geometry API For Java - esri-geometry-api-2.1.0.jar * Fastutil @@ -354,7 +329,6 @@ The Apache Software License, Version 2.0 * OkHttp - okhttp-3.9.0.jar - okhttp-urlconnection-3.9.0.jar - - okhttp-2.5.0.jar * OpenCSV - opencsv-2.3.jar * Plexus @@ -380,7 +354,6 @@ The Apache Software License, Version 2.0 - jcommander-1.48.jar * FindBugs JSR305 - jsr305-3.0.2.jar - - jsr305-3.0.0.jar * Objenesis - objenesis-2.1.jar - objenesis-2.6.jar @@ -417,22 +390,6 @@ The Apache Software License, Version 2.0 - simpleclient_servlet-0.5.0.jar * LZ4 - lz4-java-1.5.0.jar - * Bookkeeper - - circe-checksum-4.7.2.jar - * GRPC - - grpc-all-1.12.0.jar - - grpc-auth-1.12.0.jar - - grpc-context-1.12.0.jar - - grpc-core-1.12.0.jar - - grpc-netty-1.12.0.jar - - grpc-okhttp-1.12.0.jar - - grpc-protobuf-1.12.0.jar - - grpc-protobuf-lite-1.12.0.jar - - grpc-protobuf-nano-1.12.0.jar - - grpc-stub-1.12.0.jar - * OpenCensus - - opencensus-api-0.11.0.jar - - opencensus-contrib-grpc-metrics-0.11.0.jar Protocol Buffers License * Protocol Buffers @@ -441,11 +398,6 @@ Protocol Buffers License BSD 3-clause "New" or "Revised" License * RE2J TD -- re2j-td-1.4.jar - * google-auth-library-credentials-0.9.0.jar - * protobuf-java-util-3.5.1.jar - -BSD 2-clause - * protobuf-javanano-3.0.0-alpha-5.jar BSD License * ANTLR 4 Runtime -- antlr4-runtime-4.6.jar