srkukarni closed pull request #2917: Remove functions-util dependency from pulsar-client-admin URL: https://github.com/apache/pulsar/pull/2917
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 5af899a598..e5e2af7a04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -40,11 +40,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index f76df6b1d7..9c5cdb069b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -43,15 +43,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 19e05a9472..c0d81e5c80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -206,7 +206,7 @@ public void testAuthorization() throws Exception { final String functionName = "PulsarSink-test"; final String subscriptionName = "test-sub"; - String jarFilePathUrl = String.format("%s:%s", Utils.FILE, + String jarFilePathUrl = String.format("%s:%s", org.apache.pulsar.common.functions.Utils.FILE, PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()); FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index e39f36213f..79625bf740 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -46,12 +46,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-functions-utils</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 077a74217b..de7331a311 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -138,7 +138,6 @@ public void setup() throws Exception { .thenReturn(true); when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true); when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction()); - PowerMockito.stub(PowerMockito.method(Utils.class, "fileExists")).toReturn(true); } // @Test diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index 8e168f3169..5ceb0dfa45 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -74,6 +74,13 @@ <artifactId>stream-storage-java-client</artifactId> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-functions-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java index 80a7476320..324a028f0e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.admin.cli; +import com.google.protobuf.util.JsonFormat; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.functions.utils.Utils; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; @@ -65,7 +65,7 @@ void processArguments() throws Exception { @Override void runCmd() throws Exception { - String json = Utils.printJson(admin.worker().getFunctionsStats()); + String json = JsonFormat.printer().print(admin.worker().getFunctionsStats()); GsonBuilder gsonBuilder = new GsonBuilder(); if (indent) { gsonBuilder.setPrettyPrinting(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 24597eb202..dca4df6756 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -36,6 +36,7 @@ import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; +import com.google.protobuf.util.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -64,7 +65,7 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.common.functions.Utils; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -616,7 +617,7 @@ void runCmd() throws Exception { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.functions().getFunctionStatus(tenant, namespace, functionName) : admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index d14c0534b0..270a3bd636 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -37,10 +36,10 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; +import com.google.protobuf.util.JsonFormat; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -53,9 +52,7 @@ import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.functions.utils.*; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import org.apache.pulsar.functions.utils.io.Connectors; +import org.apache.pulsar.common.functions.Utils; @Getter @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)") @@ -183,22 +180,8 @@ public void runCmd() throws Exception { } @Override - protected String validateSinkType(String sinkType) throws IOException { - // Validate the connector sink type from the locally available connectors - String pulsarHome = System.getenv("PULSAR_HOME"); - if (pulsarHome == null) { - pulsarHome = Paths.get("").toAbsolutePath().toString(); - } - String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); - Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); - - if (!connectors.getSinks().containsKey(sinkType)) { - throw new ParameterException("Invalid sink type '" + sinkType + "' -- Available sinks are: " - + connectors.getSinks().keySet()); - } - - // Sink type is a valid built-in connector type. For local-run we'll fill it up with its own archive path - return connectors.getSinks().get(sinkType).toString(); + protected String validateSinkType(String sinkType) { + return sinkType; } } @@ -432,7 +415,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { } if (!Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()) && - !sinkConfig.getArchive().startsWith(BUILTIN)) { + !sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { if (!new File(sinkConfig.getArchive()).exists()) { throw new IllegalArgumentException(String.format("Sink Archive file %s does not exist", sinkConfig.getArchive())); } @@ -545,7 +528,7 @@ void runCmd() throws Exception { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName) : admin.sink().getSinkStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 32ebfc996b..069f2f80ba 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -37,13 +36,13 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import com.google.protobuf.util.JsonFormat; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -56,9 +55,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; -import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import org.apache.pulsar.functions.utils.io.Connectors; +import org.apache.pulsar.common.functions.Utils; @Getter @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)") @@ -187,22 +184,8 @@ public void runCmd() throws Exception { } @Override - protected String validateSourceType(String sourceType) throws IOException { - // Validate the connector source type from the locally available connectors - String pulsarHome = System.getenv("PULSAR_HOME"); - if (pulsarHome == null) { - pulsarHome = Paths.get("").toAbsolutePath().toString(); - } - String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); - Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); - - if (!connectors.getSources().containsKey(sourceType)) { - throw new ParameterException("Invalid source type '" + sourceType + "' -- Available sources are: " - + connectors.getSources().keySet()); - } - - // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path - return connectors.getSources().get(sourceType).toString(); + protected String validateSourceType(String sourceType) { + return sourceType; } } @@ -388,7 +371,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { throw new ParameterException("Source archive not specfied"); } if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) && - !sourceConfig.getArchive().startsWith(BUILTIN)) { + !sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { if (!new File(sourceConfig.getArchive()).exists()) { throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive())); } @@ -501,7 +484,7 @@ void runCmd() throws Exception { @Override void runCmd() throws Exception { - String json = Utils.printJson( + String json = JsonFormat.printer().print( isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName) : admin.source().getSourceStatus(tenant, namespace, sourceName, Integer.parseInt(instanceId))); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index cbe52961f9..09418dfa70 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -43,7 +43,7 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; -import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.Utils; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java new file mode 100644 index 0000000000..855d2e4411 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.functions; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +public class Utils { + public static String HTTP = "http"; + public static String FILE = "file"; + public static String BUILTIN = "builtin"; + + public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { + return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP) + || functionPkgUrl.startsWith(FILE)); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java index ed52a96a0d..40a6ee7565 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -25,6 +25,8 @@ import com.google.gson.JsonParser; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -42,6 +44,8 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import static org.apache.pulsar.functions.utils.Utils.*; @@ -94,7 +98,7 @@ void start() throws Exception { parallelism = functionConfig.getParallelism(); if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { userCodeFile = functionConfig.getJar(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractClassLoader(userCodeFile); } else { File file = new File(userCodeFile); @@ -109,10 +113,14 @@ void start() throws Exception { functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader); } else if (!StringUtils.isEmpty(sourceConfigString)) { SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class); + String builtInSource = isBuiltInSource(sourceConfig.getArchive()); + if (builtInSource != null) { + sourceConfig.setArchive(builtInSource); + } NarClassLoader classLoader; parallelism = sourceConfig.getParallelism(); userCodeFile = sourceConfig.getArchive(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractNarClassLoader(null, userCodeFile, null); } else { File file = new File(userCodeFile); @@ -124,10 +132,14 @@ void start() throws Exception { functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader); } else { SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class); + String builtInSink = isBuiltInSource(sinkConfig.getArchive()); + if (builtInSink != null) { + sinkConfig.setArchive(builtInSink); + } NarClassLoader classLoader; parallelism = sinkConfig.getParallelism(); userCodeFile = sinkConfig.getArchive(); - if (isFunctionPackageUrlSupported(userCodeFile)) { + if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { classLoader = extractNarClassLoader(null, userCodeFile, null); } else { File file = new File(userCodeFile); @@ -221,4 +233,38 @@ public void run() { } } + + private String isBuiltInSource(String sourceType) throws IOException { + // Validate the connector source type from the locally available connectors + Connectors connectors = getConnectors(); + + if (connectors.getSources().containsKey(sourceType)) { + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSources().get(sourceType).toString(); + } else { + return null; + } + } + + private String isBuiltInSink(String sinkType) throws IOException { + // Validate the connector source type from the locally available connectors + Connectors connectors = getConnectors(); + + if (connectors.getSinks().containsKey(sinkType)) { + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSinks().get(sinkType).toString(); + } else { + return null; + } + } + + private Connectors getConnectors() throws IOException { + // Validate the connector source type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + return ConnectorUtils.searchForConnectors(connectorsDir); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 968e63e1af..8fbd676d5b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -35,13 +35,12 @@ import java.net.MalformedURLException; import java.util.*; -import static java.util.Objects.isNull; import static org.apache.commons.lang.StringUtils.isNotBlank; import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.functions.utils.Utils.BUILTIN; +import static org.apache.pulsar.common.functions.Utils.BUILTIN; import static org.apache.pulsar.functions.utils.Utils.loadJar; public class FunctionConfigUtils { @@ -505,13 +504,13 @@ private static void doCommonChecks(FunctionConfig functionConfig) { throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity"); } - if (!isEmpty(functionConfig.getJar()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) + if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) && functionConfig.getJar().startsWith(BUILTIN)) { if (!new File(functionConfig.getJar()).exists()) { throw new IllegalArgumentException("The supplied jar file does not exist"); } } - if (!isEmpty(functionConfig.getPy()) && !Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) + if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) && functionConfig.getPy().startsWith(BUILTIN)) { if (!new File(functionConfig.getPy()).exists()) { throw new IllegalArgumentException("The supplied python file does not exist"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 2cbd460245..b79ee60be8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -56,10 +56,10 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); if (!isBuiltin) { - if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(Utils.FILE)) { + if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { if (isBlank(sinkConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 6ce1db3fd0..0df1aec5ed 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -53,10 +53,10 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN); + boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); if (!isBuiltin) { - if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) { + if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 3b9c8da812..8181e9f255 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.functions.utils; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -58,10 +56,6 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class Utils { - public static String HTTP = "http"; - public static String FILE = "file"; - public static String BUILTIN = "builtin"; - public static String printJson(MessageOrBuilder msg) throws IOException { return JsonFormat.printer().print(msg); } @@ -217,15 +211,6 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) { return typeArg; } - public static boolean fileExists(String file) { - return new File(file).exists(); - } - - public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP) - || functionPkgUrl.startsWith(Utils.FILE)); - } - /** * Load a jar * @param jar file of jar @@ -248,7 +233,7 @@ public static ClassLoader extractClassLoader(String destPkgUrl) throws IOExcepti } public static File extractFileFromPkg(String destPkgUrl) throws IOException, URISyntaxException { - if (destPkgUrl.startsWith(FILE)) { + if (destPkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { URL url = new URL(destPkgUrl); File file = new File(url.toURI()); if (!file.exists()) { @@ -309,7 +294,7 @@ public static NarClassLoader extractNarClassLoader(Path archivePath, String pkgU } } if (!StringUtils.isEmpty(pkgUrl)) { - if (pkgUrl.startsWith(FILE)) { + if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { try { URL url = new URL(pkgUrl); File file = new File(url.toURI()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 5df48de1e8..4f8d04a71d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,11 +18,11 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.functions.utils.Utils.FILE; -import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.common.functions.Utils.FILE; +import static org.apache.pulsar.common.functions.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.utils.Utils.getSinkType; -import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; +import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.MoreFiles; @@ -56,7 +56,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; -import org.apache.pulsar.functions.proto.Function.Instance; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.runtime.RuntimeFactory; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index d0106602ba..41f04af445 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -37,13 +37,10 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; import org.apache.pulsar.functions.proto.Function; -import static org.apache.pulsar.functions.utils.Utils.FILE; -import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @Slf4j public final class Utils { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 914a691524..c850d90409 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -901,10 +901,10 @@ public Response downloadFunction(final String path) { return Response.status(Status.OK).entity(new StreamingOutput() { @Override public void write(final OutputStream output) throws IOException { - if (path.startsWith(HTTP)) { + if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) { URL url = new URL(path); IOUtils.copy(url.openStream(), output); - } else if (path.startsWith(FILE)) { + } else if (path.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { URL url = new URL(path); File file; try { @@ -970,7 +970,7 @@ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, Str String functionPkgUrl, String functionDetailsJson, String componentConfigJson, String componentType) throws IllegalArgumentException, IOException, URISyntaxException { - if (!isFunctionPackageUrlSupported(functionPkgUrl)) { + if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, @@ -1081,7 +1081,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp SourceConfigUtils.inferMissingArguments(sourceConfig); if (!StringUtils.isEmpty(sourceConfig.getArchive())) { String builtinArchive = sourceConfig.getArchive(); - if (builtinArchive.startsWith(BUILTIN)) { + if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { builtinArchive = builtinArchive.replaceFirst("^builtin://", ""); } try { @@ -1099,7 +1099,7 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp SinkConfigUtils.inferMissingArguments(sinkConfig); if (!StringUtils.isEmpty(sinkConfig.getArchive())) { String builtinArchive = sinkConfig.getArchive(); - if (builtinArchive.startsWith(BUILTIN)) { + if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { builtinArchive = builtinArchive.replaceFirst("^builtin://", ""); } try { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index bf4b4aafa5..a4926e304c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -38,7 +38,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.testng.annotations.Test; -import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.common.functions.Utils.FILE; /** * Unit test of {@link FunctionActioner}. diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 2bf467a3fb..76d2034174 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -228,40 +228,17 @@ The Apache Software License, Version 2.0 * Google Guice - guice-4.2.0.jar - guice-multibindings-4.2.0.jar - * Google Gson - - gson-2.8.2.jar - * Google Common Protos - - proto-google-common-protos-1.0.0.jar * Apache Commons - commons-math3-3.6.1.jar + - commons-beanutils-core-1.8.0.jar - commons-beanutils-core-1.8.3.jar - - commons-beanutils-1.7.0.jar - commons-compress-1.15.jar - commons-lang3-3.3.2.jar - commons-lang3-3.4.jar - - commons-collections-3.2.2.jar - - commons-configuration-1.6.jar - - commons-digester-1.8.jar - - commons-lang-2.4.jar - - commons-logging-1.1.1.jar * Netty - netty-3.6.2.Final.jar - - netty-all-4.1.22.Final.jar - - netty-buffer-4.1.22.Final.jar - - netty-codec-4.1.22.Final.jar - - netty-codec-http-4.1.22.Final.jar - - netty-codec-http2-4.1.22.Final.jar - - netty-codec-socks-4.1.22.Final.jar - - netty-common-4.1.22.Final.jar - - netty-handler-4.1.22.Final.jar - - netty-handler-proxy-4.1.22.Final.jar - - netty-resolver-4.1.22.Final.jar - - netty-tcnative-boringssl-static-2.0.7.Final.jar - - netty-transport-4.1.22.Final.jar * Joda Time - joda-time-2.9.9.jar - * TypeTools - - typetools-0.5.0.jar * Jetty - http2-client-9.4.11.v20180605.jar - http2-common-9.4.11.v20180605.jar @@ -277,7 +254,6 @@ The Apache Software License, Version 2.0 - jetty-server-9.4.11.v20180605.jar - jetty-servlet-9.4.11.v20180605.jar - jetty-util-9.4.11.v20180605.jar - - jetty-util-9.4.12.v20180830.jar * Javassist - javassist-3.22.0-CR2.jar * Asynchronous Http Client @@ -321,7 +297,6 @@ The Apache Software License, Version 2.0 - units-1.0.jar * Error Prone Annotations - error_prone_annotations-2.1.3.jar - - error_prone_annotations-2.1.2.jar * Esri Geometry API For Java - esri-geometry-api-2.1.0.jar * Fastutil @@ -354,7 +329,6 @@ The Apache Software License, Version 2.0 * OkHttp - okhttp-3.9.0.jar - okhttp-urlconnection-3.9.0.jar - - okhttp-2.5.0.jar * OpenCSV - opencsv-2.3.jar * Plexus @@ -380,7 +354,6 @@ The Apache Software License, Version 2.0 - jcommander-1.48.jar * FindBugs JSR305 - jsr305-3.0.2.jar - - jsr305-3.0.0.jar * Objenesis - objenesis-2.1.jar - objenesis-2.6.jar @@ -417,22 +390,6 @@ The Apache Software License, Version 2.0 - simpleclient_servlet-0.5.0.jar * LZ4 - lz4-java-1.5.0.jar - * Bookkeeper - - circe-checksum-4.7.2.jar - * GRPC - - grpc-all-1.12.0.jar - - grpc-auth-1.12.0.jar - - grpc-context-1.12.0.jar - - grpc-core-1.12.0.jar - - grpc-netty-1.12.0.jar - - grpc-okhttp-1.12.0.jar - - grpc-protobuf-1.12.0.jar - - grpc-protobuf-lite-1.12.0.jar - - grpc-protobuf-nano-1.12.0.jar - - grpc-stub-1.12.0.jar - * OpenCensus - - opencensus-api-0.11.0.jar - - opencensus-contrib-grpc-metrics-0.11.0.jar Protocol Buffers License * Protocol Buffers @@ -441,11 +398,6 @@ Protocol Buffers License BSD 3-clause "New" or "Revised" License * RE2J TD -- re2j-td-1.4.jar - * google-auth-library-credentials-0.9.0.jar - * protobuf-java-util-3.5.1.jar - -BSD 2-clause - * protobuf-javanano-3.0.0-alpha-5.jar BSD License * ANTLR 4 Runtime -- antlr4-runtime-4.6.jar ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services