This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1b0589b Update Function Semantics (#2985) 1b0589b is described below commit 1b0589bbe47211fc52430d757fa32a0cb784bcf5 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Nov 20 22:09:34 2018 -0500 Update Function Semantics (#2985) * Make update functions better * Compiled * more checks * bug fix * Added tests * Tests pass * Fixed tests * Fixed tests * Added tests * Added unittests * Fixed unittest * Fixed unittest * Fixed unittest * Timeout fix * Fixed unittest * Fix unittest * Addressed feedback --- .../worker/PulsarWorkerAssignmentTest.java | 10 +- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 7 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 42 ++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 32 ++- .../org/apache/pulsar/admin/cli/CmdSources.java | 20 +- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 3 +- .../apache/pulsar/admin/cli/TestCmdSources.java | 2 +- .../pulsar/common/functions/ConsumerConfig.java | 6 +- .../pulsar/common/functions/FunctionConfig.java | 12 +- .../apache/pulsar/common/functions/Resources.java | 1 + .../pulsar/common/functions/WindowConfig.java | 5 +- .../org/apache/pulsar/common/io/SinkConfig.java | 13 +- .../org/apache/pulsar/common/io/SourceConfig.java | 9 +- .../functions/utils/FunctionConfigUtils.java | 112 +++++++- .../functions/utils/ResourceConfigUtils.java | 18 ++ .../pulsar/functions/utils/SinkConfigUtils.java | 101 ++++++- .../pulsar/functions/utils/SourceConfigUtils.java | 44 ++++ .../functions/utils/FunctionConfigUtilsTest.java | 291 +++++++++++++++++++++ .../functions/utils/SinkConfigUtilsTest.java | 207 +++++++++++++++ .../functions/utils/SourceConfigUtilsTest.java | 166 ++++++++++++ .../org/apache/pulsar/functions/worker/Utils.java | 10 +- .../functions/worker/rest/api/FunctionsImpl.java | 111 +++++++- .../rest/api/v2/FunctionApiV2ResourceTest.java | 139 +++++++--- .../worker/rest/api/v2/SinkApiV2ResourceTest.java | 175 +++++++++++-- .../rest/api/v2/SourceApiV2ResourceTest.java | 171 +++++++++--- 25 files changed, 1541 insertions(+), 166 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index ed44b98..1b4c2aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -223,6 +223,7 @@ public class PulsarWorkerAssignmentTest { final String namespacePortion = "assignment-test"; final String replNamespace = tenant + "/" + namespacePortion; final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String logTopic = "persistent://" + replNamespace + "/log-topic"; final String baseFunctionName = "assign-restart"; final String subscriptionName = "test-sub"; final int totalFunctions = 5; @@ -240,8 +241,7 @@ public class PulsarWorkerAssignmentTest { functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); functionConfig.setParallelism(parallelism); - // set-auto-ack prop =true - functionConfig.setAutoAck(true); + // don't set any log topic admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); } retryStrategically((test) -> { @@ -263,8 +263,8 @@ public class PulsarWorkerAssignmentTest { functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); functionConfig.setParallelism(parallelism); - // set-auto-ack prop =false - functionConfig.setAutoAck(false); + // Now set the log topic + functionConfig.setLogTopic(logTopic); admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl); } @@ -308,7 +308,7 @@ public class PulsarWorkerAssignmentTest { // validate updated function prop = auto-ack=false and instnaceid for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) { String functionName = baseFunctionName + i; - assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck()); + assertEquals(admin.functions().getFunction(tenant, namespacePortion, functionName).getLogTopic(), logTopic); } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 91ff834..e0a0d4e 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.admin.cli; -import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import lombok.extern.slf4j.Slf4j; @@ -35,8 +34,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction; import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; -import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; -import org.apache.pulsar.admin.cli.CmdSources.CreateSource; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -60,10 +57,8 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static java.nio.charset.StandardCharsets.UTF_8; @@ -205,7 +200,7 @@ public class CmdFunctionsTest { assertEquals(fnName, creater.getFunctionName()); assertEquals(inputTopicName, creater.getInputs()); assertEquals(outputTopicName, creater.getOutput()); - assertEquals(false, creater.isAutoAck()); + assertEquals(new Boolean(false), creater.getAutoAck()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 28f9183..3a0d1e1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -287,14 +287,14 @@ public class CmdFunctions extends CmdBase { @Parameter(names = "--autoAck", description = "Whether or not the framework will automatically acknowleges messages", hidden = true) protected Boolean DEPRECATED_autoAck = null; @Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) - protected boolean autoAck = true; + protected Boolean autoAck; // for backwards compatibility purposes @Parameter(names = "--timeoutMs", description = "The message timeout in milliseconds", hidden = true) protected Long DEPRECATED_timeoutMs; @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") protected Long timeoutMs; @Parameter(names = "--max-message-retries", description = "How many times should we try to process a message before giving up") - protected Integer maxMessageRetries = -1; + protected Integer maxMessageRetries; @Parameter(names = "--dead-letter-topic", description = "The topic where all messages which could not be processed successfully are sent") protected String deadLetterTopic; protected FunctionConfig functionConfig; @@ -403,21 +403,29 @@ public class CmdFunctions extends CmdBase { } Resources resources = functionConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - functionConfig.setResources(resources); + if (resources != null) { + functionConfig.setResources(resources); + } if (timeoutMs != null) { functionConfig.setTimeoutMs(timeoutMs); @@ -452,7 +460,9 @@ public class CmdFunctions extends CmdBase { functionConfig.setWindowConfig(windowConfig); - functionConfig.setAutoAck(autoAck); + if (autoAck != null) { + functionConfig.setAutoAck(autoAck); + } if (null != maxMessageRetries) { functionConfig.setMaxMessageRetries(maxMessageRetries); @@ -711,6 +721,24 @@ public class CmdFunctions extends CmdBase { @Parameters(commandDescription = "Update a Pulsar Function that's been deployed to a Pulsar cluster") class UpdateFunction extends FunctionDetailsCommand { + + @Override + protected void validateFunctionConfigs(FunctionConfig functionConfig) { + if (StringUtils.isEmpty(functionConfig.getClassName())) { + if (StringUtils.isEmpty(functionConfig.getName())) { + throw new IllegalArgumentException("Function Name not provided"); + } + } else if (StringUtils.isEmpty(functionConfig.getName())) { + org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig); + } + if (StringUtils.isEmpty(functionConfig.getTenant())) { + org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig); + } + if (StringUtils.isEmpty(functionConfig.getNamespace())) { + org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig); + } + } + @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 594aa1b..662b81c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -210,6 +210,10 @@ public class CmdSinks extends CmdBase { } print("Updated successfully"); } + + protected void validateSinkConfigs(SinkConfig sinkConfig) { + org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig); + } } abstract class SinkDetailsCommand extends BaseCommand { @@ -253,7 +257,7 @@ public class CmdSinks extends CmdBase { @Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order", hidden = true) protected Boolean DEPRECATED_retainOrdering; @Parameter(names = "--retain-ordering", description = "Sink consumes and sinks messages in order") - protected boolean retainOrdering; + protected Boolean retainOrdering; @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected Integer parallelism; @Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) @@ -280,7 +284,7 @@ public class CmdSinks extends CmdBase { @Parameter(names = "--sink-config", description = "User defined configs key/values") protected String sinkConfigString; @Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) - protected boolean autoAck = true; + protected Boolean autoAck; @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") protected Long timeoutMs; @@ -329,7 +333,9 @@ public class CmdSinks extends CmdBase { sinkConfig.setProcessingGuarantees(processingGuarantees); } - sinkConfig.setRetainOrdering(retainOrdering); + if (retainOrdering != null) { + sinkConfig.setRetainOrdering(retainOrdering); + } if (null != inputs) { sinkConfig.setInputs(Arrays.asList(inputs.split(","))); @@ -371,27 +377,37 @@ public class CmdSinks extends CmdBase { } Resources resources = sinkConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - sinkConfig.setResources(resources); + if (resources != null) { + sinkConfig.setResources(resources); + } if (null != sinkConfigString) { sinkConfig.setConfigs(parseConfigs(sinkConfigString)); } - sinkConfig.setAutoAck(autoAck); + if (autoAck != null) { + sinkConfig.setAutoAck(autoAck); + } if (timeoutMs != null) { sinkConfig.setTimeoutMs(timeoutMs); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 92b4f6c..3e1b6ab 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -214,6 +214,10 @@ public class CmdSources extends CmdBase { } print("Updated successfully"); } + + protected void validateSourceConfigs(SourceConfig sourceConfig) { + org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig); + } } abstract class SourceDetailsCommand extends BaseCommand { @@ -337,21 +341,29 @@ public class CmdSources extends CmdBase { } Resources resources = sourceConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - sourceConfig.setResources(resources); + if (resources != null) { + sourceConfig.setResources(resources); + } if (null != sourceConfigString) { sourceConfig.setConfigs(parseConfigs(sourceConfigString)); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index a52b7b8..195d2e7 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -128,7 +128,6 @@ public class TestCmdSinks { sinkConfig.setTenant(TENANT); sinkConfig.setNamespace(NAMESPACE); sinkConfig.setName(NAME); - sinkConfig.setAutoAck(true); sinkConfig.setInputs(INPUTS_LIST); sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP); @@ -450,7 +449,7 @@ public class TestCmdSinks { testSinkConfig.setResources(null); SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setResources(new Resources()); + expectedSinkConfig.setResources(null); testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 2f63626..2eb951a 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -352,7 +352,7 @@ public class TestCmdSources { testSourceConfig.setResources(null); SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setResources(new Resources()); + expectedSourceConfig.setResources(null); testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java index 7652336..832c903 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java @@ -18,15 +18,13 @@ */ package org.apache.pulsar.common.functions; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.*; @Data @Builder @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode public class ConsumerConfig { private String schemaType; private String serdeClassName; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 2bcc5a6..bf27155 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -24,20 +24,16 @@ import java.util.Collection; import java.util.Map; import java.util.TreeMap; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.Resources; -import org.apache.pulsar.common.functions.WindowConfig; +import lombok.*; @Getter @Setter @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class FunctionConfig { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java index ff8cb55..fde3d38 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java @@ -27,6 +27,7 @@ import lombok.*; @ToString @AllArgsConstructor @NoArgsConstructor +@Builder(toBuilder=true) public class Resources { // Default cpu is 1 core private Double cpu = 1d; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java index 33f742b..755201a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java @@ -18,10 +18,7 @@ */ package org.apache.pulsar.common.functions; -import lombok.Data; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import lombok.experimental.Accessors; @Data diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 50fb662..b85de06 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -22,11 +22,7 @@ import java.util.Collection; import java.util.Map; import java.util.TreeMap; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -36,6 +32,9 @@ import org.apache.pulsar.common.functions.Resources; @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor public class SinkConfig { private String tenant; @@ -63,9 +62,9 @@ public class SinkConfig { private Map<String, Object> secrets; private Integer parallelism; private FunctionConfig.ProcessingGuarantees processingGuarantees; - private boolean retainOrdering; + private Boolean retainOrdering; private Resources resources; - private boolean autoAck; + private Boolean autoAck; private Long timeoutMs; private String archive; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 3ca7aad..88955b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -18,11 +18,7 @@ */ package org.apache.pulsar.common.io; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -33,6 +29,9 @@ import java.util.Map; @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor public class SourceConfig { private String tenant; private String namespace; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 7fcf5bb..2459cfd 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -249,7 +249,9 @@ public class FunctionConfigUtils { functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } functionConfig.setAutoAck(functionDetails.getAutoAck()); - functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (functionDetails.getSource().getTimeoutMs() != 0) { + functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + } if (!isEmpty(functionDetails.getSink().getTopic())) { functionConfig.setOutput(functionDetails.getSink().getTopic()); } @@ -566,4 +568,112 @@ public class FunctionConfigUtils { return null; } } + + public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) { + FunctionConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Function Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (newConfig.getInputs() != null) { + newConfig.getInputs().forEach((topicName -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder().isRegexPattern(false).build()); + })); + } + if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) { + newConfig.getInputSpecs().put(newConfig.getTopicsPattern(), + ConsumerConfig.builder() + .isRegexPattern(true) + .build()); + } + if (newConfig.getCustomSerdeInputs() != null) { + newConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .serdeClassName(serdeClassName) + .isRegexPattern(false) + .build()); + }); + } + if (newConfig.getCustomSchemaInputs() != null) { + newConfig.getCustomSchemaInputs().forEach((topicName, schemaClassname) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .schemaType(schemaClassname) + .isRegexPattern(false) + .build()); + }); + } + if (!newConfig.getInputSpecs().isEmpty()) { + newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> { + if (!existingConfig.getInputSpecs().containsKey(topicName)) { + throw new IllegalArgumentException("Input Topics cannot be altered"); + } + if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { + throw new IllegalArgumentException("Input Specs mismatch"); + } + }); + } + if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) { + throw new IllegalArgumentException("Output topics differ"); + } + if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) { + throw new IllegalArgumentException("Output Serde mismatch"); + } + if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) { + throw new IllegalArgumentException("Output Schema mismatch"); + } + if (!StringUtils.isEmpty(newConfig.getLogTopic())) { + mergedConfig.setLogTopic(newConfig.getLogTopic()); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) { + throw new IllegalArgumentException("Retain Orderning cannot be altered"); + } + if (newConfig.getUserConfig() != null) { + mergedConfig.setUserConfig(newConfig.getUserConfig()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getRuntime() != null && !newConfig.getRuntime().equals(existingConfig.getRuntime())) { + throw new IllegalArgumentException("Runtime cannot be altered"); + } + if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) { + throw new IllegalArgumentException("AutoAck cannot be altered"); + } + if (newConfig.getMaxMessageRetries() != null) { + mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries()); + } + if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) { + mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic()); + } + if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName().equals(existingConfig.getSubName())) { + throw new IllegalArgumentException("Subscription Name cannot be altered"); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (newConfig.getWindowConfig() != null) { + mergedConfig.setWindowConfig(newConfig.getWindowConfig()); + } + if (newConfig.getTimeoutMs() != null) { + mergedConfig.setTimeoutMs(newConfig.getTimeoutMs()); + } + return mergedConfig; + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java index ee46ceb..5fbda31 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java @@ -33,4 +33,22 @@ public class ResourceConfigUtils { com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L, "The disk allocation for the function must be positive"); } + + public static Resources merge(Resources existingResources, Resources newResources) { + Resources mergedResources; + if (existingResources != null) { + mergedResources = existingResources.toBuilder().build(); + } else { + mergedResources = new Resources(); + } + if (newResources.getCpu() != null) { + mergedResources.setCpu(newResources.getCpu()); + }if (newResources.getRam() != null) { + mergedResources.setRam(newResources.getRam()); + } + if (newResources.getDisk() != null) { + mergedResources.setDisk(newResources.getDisk()); + } + return mergedResources; + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 3751136..d65b87f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -42,8 +42,6 @@ import java.util.*; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; -import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSinkType; @@ -144,13 +142,17 @@ public class SinkConfigUtils { sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); } - Function.SubscriptionType subType = (sinkConfig.isRetainOrdering() + Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) ? Function.SubscriptionType.FAILOVER : Function.SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); - functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); + if (sinkConfig.getAutoAck() != null) { + functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck()); + } else { + functionDetailsBuilder.setAutoAck(true); + } if (sinkConfig.getTimeoutMs() != null) { sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); } @@ -226,7 +228,9 @@ public class SinkConfigUtils { sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } sinkConfig.setAutoAck(functionDetails.getAutoAck()); - sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (functionDetails.getSource().getTimeoutMs() != 0) { + sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + } if (!isEmpty(functionDetails.getSink().getClassName())) { sinkConfig.setClassName(functionDetails.getSink().getClassName()); } @@ -351,4 +355,91 @@ public class SinkConfigUtils { } return retval; } + + public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig newConfig) { + SinkConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Sink Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && !newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName())) { + throw new IllegalArgumentException("Subscription Name cannot be altered"); + } + if (newConfig.getInputs() != null) { + newConfig.getInputs().forEach((topicName -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder().isRegexPattern(false).build()); + })); + } + if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) { + newConfig.getInputSpecs().put(newConfig.getTopicsPattern(), + ConsumerConfig.builder() + .isRegexPattern(true) + .build()); + } + if (newConfig.getTopicToSerdeClassName() != null) { + newConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .serdeClassName(serdeClassName) + .isRegexPattern(false) + .build()); + }); + } + if (newConfig.getTopicToSchemaType() != null) { + newConfig.getTopicToSchemaType().forEach((topicName, schemaClassname) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .schemaType(schemaClassname) + .isRegexPattern(false) + .build()); + }); + } + if (!newConfig.getInputSpecs().isEmpty()) { + newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> { + if (!existingConfig.getInputSpecs().containsKey(topicName)) { + throw new IllegalArgumentException("Input Topics cannot be altered"); + } + if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { + throw new IllegalArgumentException("Input Specs mismatch"); + } + }); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getConfigs() != null) { + mergedConfig.setConfigs(newConfig.getConfigs()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) { + throw new IllegalArgumentException("Retain Orderning cannot be altered"); + } + if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) { + throw new IllegalArgumentException("AutoAck cannot be altered"); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (newConfig.getTimeoutMs() != null) { + mergedConfig.setTimeoutMs(newConfig.getTimeoutMs()); + } + if (!StringUtils.isEmpty(newConfig.getArchive())) { + mergedConfig.setArchive(newConfig.getArchive()); + } + return mergedConfig; + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 2245580..a7baea2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -240,4 +240,48 @@ public class SourceConfigUtils { return classLoader; } + public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceConfig newConfig) { + SourceConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Function Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (!StringUtils.isEmpty(newConfig.getTopicName()) && !newConfig.getTopicName().equals(existingConfig.getTopicName())) { + throw new IllegalArgumentException("Destination topics differ"); + } + if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) { + mergedConfig.setSerdeClassName(newConfig.getSerdeClassName()); + } + if (!StringUtils.isEmpty(newConfig.getSchemaType())) { + mergedConfig.setSchemaType(newConfig.getSchemaType()); + } + if (newConfig.getConfigs() != null) { + mergedConfig.setConfigs(newConfig.getConfigs()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (!StringUtils.isEmpty(newConfig.getArchive())) { + mergedConfig.setArchive(newConfig.getArchive()); + } + return mergedConfig; + } + } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 6d71d45..0f4f400 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -22,14 +22,18 @@ import com.google.gson.Gson; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; +import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON; import static org.testng.Assert.assertEquals; /** @@ -93,6 +97,293 @@ public class FunctionConfigUtilsTest { } @Test + public void testMergeEqual() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createFunctionConfig(); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Function Names differ") + public void testMergeDifferentName() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("name", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("tenant", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("namespace", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentClassName() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("className", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(functionConfig.getClassName()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered") + public void testMergeDifferentInputs() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("topicsPattern", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ") + public void testMergeDifferentOutput() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("output", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentLogTopic() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("logTopic", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getLogTopic(), + "Different" + ); + mergedConfig.setLogTopic(functionConfig.getLogTopic()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered") + public void testMergeDifferentRetainOrdering() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("retainOrdering", true); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + FunctionConfig functionConfig = createFunctionConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("userConfig", myConfig); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getUserConfig(), + myConfig + ); + mergedConfig.setUserConfig(functionConfig.getUserConfig()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + FunctionConfig functionConfig = createFunctionConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("secrets", mySecrets); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(functionConfig.getSecrets()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Runtime cannot be altered") + public void testMergeDifferentRuntime() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("runtime", PYTHON); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "AutoAck cannot be altered") + public void testMergeDifferentAutoAck() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("autoAck", false); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentMaxMessageRetries() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("maxMessageRetries", 10); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getMaxMessageRetries(), + new Integer(10) + ); + mergedConfig.setMaxMessageRetries(functionConfig.getMaxMessageRetries()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentDeadLetterTopic() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("deadLetterTopic", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getDeadLetterTopic(), + "Different" + ); + mergedConfig.setDeadLetterTopic(functionConfig.getDeadLetterTopic()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Subscription Name cannot be altered") + public void testMergeDifferentSubname() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("subName", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentParallelism() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("parallelism", 101); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(functionConfig.getParallelism()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + FunctionConfig functionConfig = createFunctionConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("resources", resources); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(functionConfig.getResources()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentWindowConfig() { + FunctionConfig functionConfig = createFunctionConfig(); + WindowConfig windowConfig = new WindowConfig(); + windowConfig.setSlidingIntervalCount(123); + windowConfig.setSlidingIntervalDurationMs(123l); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("windowConfig", windowConfig); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getWindowConfig(), + windowConfig + ); + mergedConfig.setWindowConfig(functionConfig.getWindowConfig()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentTimeout() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("timeoutMs", 102l); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getTimeoutMs(), + new Long(102l) + ); + mergedConfig.setTimeoutMs(functionConfig.getTimeoutMs()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + private FunctionConfig createFunctionConfig() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant("test-tenant"); + functionConfig.setNamespace("test-namespace"); + functionConfig.setName("test-function"); + functionConfig.setParallelism(1); + functionConfig.setClassName(IdentityFunction.class.getName()); + Map<String, ConsumerConfig> inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + functionConfig.setInputSpecs(inputSpecs); + functionConfig.setOutput("test-output"); + functionConfig.setOutputSerdeClassName("test-serde"); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + functionConfig.setRetainOrdering(false); + functionConfig.setUserConfig(new HashMap<>()); + functionConfig.setAutoAck(true); + functionConfig.setTimeoutMs(2000l); + functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); + return functionConfig; + } + + private FunctionConfig createUpdatedFunctionConfig(String fieldName, Object fieldValue) { + FunctionConfig functionConfig = createFunctionConfig(); + Class<?> fClass = FunctionConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(functionConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return functionConfig; + } + + @Test public void testFunctionConfigConvertFromDetails() { String name = "test1"; String namespace = "ns1"; diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 0a68c85..858cd72 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -21,14 +21,18 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; import static org.testng.Assert.assertEquals; /** @@ -60,4 +64,207 @@ public class SinkConfigUtilsTest { new Gson().toJson(convertedConfig) ); } + + @Test + public void testMergeEqual() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createSinkConfig(); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Names differ") + public void testMergeDifferentName() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("name", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("tenant", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("namespace", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentClassName() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("className", "Different"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(sinkConfig.getClassName()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered") + public void testMergeDifferentInputs() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("topicsPattern", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered") + public void testMergeDifferentRetainOrdering() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("retainOrdering", true); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + SinkConfig sinkConfig = createSinkConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + SinkConfig newSinkConfig = createUpdatedSinkConfig("configs", myConfig); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getConfigs(), + myConfig + ); + mergedConfig.setConfigs(sinkConfig.getConfigs()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + SinkConfig sinkConfig = createSinkConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + SinkConfig newSinkConfig = createUpdatedSinkConfig("secrets", mySecrets); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(sinkConfig.getSecrets()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "AutoAck cannot be altered") + public void testMergeDifferentAutoAck() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("autoAck", false); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Subscription Name cannot be altered") + public void testMergeDifferentSubname() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("sourceSubscriptionName", "Different"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentParallelism() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("parallelism", 101); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(sinkConfig.getParallelism()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + SinkConfig sinkConfig = createSinkConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + SinkConfig newSinkConfig = createUpdatedSinkConfig("resources", resources); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(sinkConfig.getResources()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentTimeout() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("timeoutMs", 102l); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getTimeoutMs(), + new Long(102l) + ); + mergedConfig.setTimeoutMs(sinkConfig.getTimeoutMs()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + private SinkConfig createSinkConfig() { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant("test-tenant"); + sinkConfig.setNamespace("test-namespace"); + sinkConfig.setName("test-sink"); + sinkConfig.setParallelism(1); + sinkConfig.setClassName(IdentityFunction.class.getName()); + Map<String, ConsumerConfig> inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + sinkConfig.setInputSpecs(inputSpecs); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setRetainOrdering(false); + sinkConfig.setConfigs(new HashMap<>()); + sinkConfig.setAutoAck(true); + sinkConfig.setTimeoutMs(2000l); + sinkConfig.setArchive("DummyArchive.nar"); + return sinkConfig; + } + + private SinkConfig createUpdatedSinkConfig(String fieldName, Object fieldValue) { + SinkConfig sinkConfig = createSinkConfig(); + Class<?> fClass = SinkConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(sinkConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return sinkConfig; + } } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index dcaf2e9..7cb7d3c 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -20,13 +20,18 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; +import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; import static org.testng.Assert.assertEquals; /** @@ -53,4 +58,165 @@ public class SourceConfigUtilsTest { new Gson().toJson(convertedConfig) ); } + + @Test + public void testMergeEqual() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createSourceConfig(); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Function Names differ") + public void testMergeDifferentName() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("name", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("tenant", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("namespace", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test + public void testMergeDifferentClassName() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("className", "Different"); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(sourceConfig.getClassName()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Destination topics differ") + public void testMergeDifferentOutput() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + SourceConfig sourceConfig = createSourceConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + SourceConfig newSourceConfig = createUpdatedSourceConfig("configs", myConfig); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getConfigs(), + myConfig + ); + mergedConfig.setConfigs(sourceConfig.getConfigs()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + SourceConfig sourceConfig = createSourceConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + SourceConfig newSourceConfig = createUpdatedSourceConfig("secrets", mySecrets); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(sourceConfig.getSecrets()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentParallelism() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("parallelism", 101); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(sourceConfig.getParallelism()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + SourceConfig sourceConfig = createSourceConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + SourceConfig newSourceConfig = createUpdatedSourceConfig("resources", resources); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(sourceConfig.getResources()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + private SourceConfig createSourceConfig() { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant("test-tenant"); + sourceConfig.setNamespace("test-namespace"); + sourceConfig.setName("test-source"); + sourceConfig.setParallelism(1); + sourceConfig.setClassName(IdentityFunction.class.getName()); + sourceConfig.setTopicName("test-output"); + sourceConfig.setSerdeClassName("test-serde"); + sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sourceConfig.setConfigs(new HashMap<>()); + return sourceConfig; + } + + private SourceConfig createUpdatedSourceConfig(String fieldName, Object fieldValue) { + SourceConfig sourceConfig = createSourceConfig(); + Class<?> fClass = SourceConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(sourceConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return sourceConfig; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 2cd51b8..7defdf6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -125,8 +125,14 @@ public final class Utils { } public static void downloadFromBookkeeper(Namespace namespace, - OutputStream outputStream, - String packagePath) throws IOException { + File outputFile, + String packagePath) throws IOException { + downloadFromBookkeeper(namespace, new FileOutputStream(outputFile), packagePath); + } + + public static void downloadFromBookkeeper(Namespace namespace, + OutputStream outputStream, + String packagePath) throws IOException { DistributedLogManager dlm = namespace.openLog(packagePath); try (InputStream in = new DLInputStream(dlm)) { int read = 0; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 18de1eb..627f5cc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -329,6 +329,19 @@ public class FunctionsImpl { return getUnavailableResponse(); } + if (tenant == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Tenant is not provided")).build(); + } + if (namespace == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Namespace is not provided")).build(); + } + if (componentName == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(componentType + " Name is not provided")).build(); + } + try { if (!isAuthorizedRole(tenant, clientRole)) { log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace, @@ -349,23 +362,83 @@ public class FunctionsImpl { .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } + String mergedComponentConfigJson; + String existingComponentConfigJson; + + FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (componentType.equals(FUNCTION)) { + FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingFunctionConfig); + FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(componentName); + try { + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } else if (componentType.equals(SOURCE)) { + SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingSourceConfig); + SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(componentName); + try { + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } else { + SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingSinkConfig); + SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(componentName); + try { + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } + + if (existingComponentConfigJson.equals(mergedComponentConfigJson) && isBlank(functionPkgUrl) && uploadedInputStream == null) { + log.error("{}/{}/{} Update contains no changes", tenant, namespace, componentName); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Update contains no change")).build(); + } + FunctionDetails functionDetails; - boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); File uploadedInputStreamAsFile = null; if (uploadedInputStream != null) { uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream); } + File existingPackageAsFile = null; + // validate parameters try { - if (isPkgUrlProvided) { + if (isNotBlank(functionPkgUrl)) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, - functionDetailsJson, componentConfigJson, componentType); - } else { + functionDetailsJson, mergedComponentConfigJson, componentType); + } else if (uploadedInputStream != null) { functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, componentConfigJson, componentType); + fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType); + } else { + functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType); } } catch (Exception e) { - log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } @@ -383,12 +456,16 @@ public class FunctionsImpl { .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); PackageLocationMetaData.Builder packageLocationMetaDataBuilder; - try { - packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, - functionPkgUrl, fileDetail, uploadedInputStreamAsFile); - } catch (Exception e) { - return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) - .build(); + if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) { + try { + packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, + functionPkgUrl, fileDetail, uploadedInputStreamAsFile); + } catch (Exception e) { + return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) + .build(); + } + } else { + packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation()); } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); @@ -1157,6 +1234,16 @@ public class FunctionsImpl { return functionDetails; } + private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName, + PackageLocationMetaData packageLocationMetaData, + String componentConfigJson, String componentType) throws Exception { + File tmpFile = File.createTempFile("functions", null); + tmpFile.deleteOnExit(); + Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath()); + return validateUpdateRequestParams(tenant, namespace, componentName, + null, componentConfigJson, componentType, null, tmpFile); + } + private static File dumpToTmpFile(InputStream uploadedInputStream) { try { File tmpFile = File.createTempFile("functions", null); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index e03d90d..5275cf0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -126,6 +126,7 @@ public class FunctionApiV2ResourceTest { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetadata; @BeforeMethod public void setup() throws Exception { @@ -140,6 +141,7 @@ public class FunctionApiV2ResourceTest { this.mockedPulsarAdmin = mock(PulsarAdmin.class); this.mockedTenants = mock(Tenants.class); this.mockedNamespaces = mock(Namespaces.class); + this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); namespaceList.add(tenant + "/" + namespace); this.mockedWorkerService = mock(WorkerService.class); @@ -153,6 +155,7 @@ public class FunctionApiV2ResourceTest { when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata); // worker config WorkerConfig workerConfig = new WorkerConfig() @@ -434,16 +437,7 @@ public class FunctionApiV2ResourceTest { } private Response registerDefaultFunction() { - FunctionConfig functionConfig = new FunctionConfig(); - functionConfig.setTenant(tenant); - functionConfig.setNamespace(namespace); - functionConfig.setName(function); - functionConfig.setClassName(className); - functionConfig.setParallelism(parallelism); - functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); - functionConfig.setOutput(outputTopic); - functionConfig.setOutputSerdeClassName(outputSerdeClassName); - functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + FunctionConfig functionConfig = createDefaultFunctionConfig(); return resource.registerFunction( tenant, namespace, @@ -617,6 +611,9 @@ public class FunctionApiV2ResourceTest { @Test public void testUpdateFunctionMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, @@ -628,55 +625,104 @@ public class FunctionApiV2ResourceTest { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + "Update contains no change"); } @Test public void testUpdateFunctionMissingInputTopic() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, function, - mockedInputStream, + null, null, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "No input topic(s) specified for the function"); + "Update contains no change"); } @Test - public void testUpdateFunctionMissingPackageDetails() throws IOException { + public void testUpdateFunctionMissingClassName() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, function, - mockedInputStream, - topicsToSerDeClassName, null, + topicsToSerDeClassName, + mockedFormData, outputTopic, outputSerdeClassName, - className, + null, parallelism, - "Function Package is not provided"); + "Update contains no change"); } @Test - public void testUpdateFunctionMissingClassName() throws IOException { + public void testUpdateFunctionChangedParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - null, - parallelism, - "Function classname cannot be null"); + null, + parallelism + 1, + null); + } + + @Test + public void testUpdateFunctionChangedInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + "DifferentOutput", + outputSerdeClassName, + null, + parallelism, + "Output topics differ"); + } + + @Test + public void testUpdateFunctionChangedOutput() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + Map<String, String> someOtherInput = new HashMap<>(); + someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE); + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + null, + someOtherInput, + mockedFormData, + outputTopic, + outputSerdeClassName, + null, + parallelism, + "Input Topics cannot be altered"); } private void testUpdateFunctionMissingArguments( @@ -720,6 +766,14 @@ public class FunctionApiV2ResourceTest { } functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("function registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -732,8 +786,12 @@ public class FunctionApiV2ResourceTest { FunctionsImpl.FUNCTION, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultFunction() throws IOException { @@ -1269,4 +1327,23 @@ public class FunctionApiV2ResourceTest { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + + public static FunctionConfig createDefaultFunctionConfig() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + return functionConfig; + } + + public static FunctionDetails createDefaultFunctionDetails() { + FunctionConfig functionConfig = createDefaultFunctionConfig(); + return FunctionConfigUtils.convert(functionConfig, null); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 5e710e0..a40e0ad 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -28,6 +28,8 @@ import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -39,10 +41,12 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.functions.utils.SinkConfigUtils; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.apache.pulsar.io.cassandra.CassandraStringSink; +import org.apache.pulsar.io.twitter.TwitterFireHose; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -60,12 +64,15 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.nio.file.Path; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE; +import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -81,7 +88,7 @@ import static org.testng.Assert.assertEquals; /** * Unit test of {@link SinkApiV2Resource}. */ -@PrepareForTest({Utils.class, SinkConfigUtils.class}) +@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SinkApiV2ResourceTest { @@ -96,7 +103,7 @@ public class SinkApiV2ResourceTest { private static final String sink = "test-sink"; private static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DEFAULT_SERDE); } private static final String subscriptionName = "test-subscription"; private static final String className = CassandraStringSink.class.getName(); @@ -119,6 +126,7 @@ public class SinkApiV2ResourceTest { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetaData; @BeforeMethod public void setup() throws Exception { @@ -372,13 +380,7 @@ public class SinkApiV2ResourceTest { } private Response registerDefaultSink() throws IOException { - SinkConfig sinkConfig = new SinkConfig(); - sinkConfig.setTenant(tenant); - sinkConfig.setNamespace(namespace); - sinkConfig.setName(sink); - sinkConfig.setClassName(className); - sinkConfig.setParallelism(parallelism); - sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + SinkConfig sinkConfig = createDefaultSinkConfig(); return resource.registerFunction( tenant, namespace, @@ -573,6 +575,10 @@ public class SinkApiV2ResourceTest { @Test public void testUpdateSinkMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSinkMissingArguments( tenant, namespace, @@ -582,21 +588,63 @@ public class SinkApiV2ResourceTest { topicsToSerDeClassName, className, parallelism, - "Sink Package is not provided"); + "Update contains no change"); } @Test - public void testUpdateSinkMissingPackageDetails() throws IOException { + public void testUpdateSinkMissingInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSinkMissingArguments( - tenant, - namespace, + tenant, + namespace, sink, - mockedInputStream, - null, - topicsToSerDeClassName, - className, - parallelism, - "zip file is empty"); + null, + mockedFormData, + null, + className, + parallelism, + "Update contains no change"); + } + + @Test + public void testUpdateSinkDifferentInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + Map<String, String> inputTopics = new HashMap<>(); + inputTopics.put("DifferntTopic", DEFAULT_SERDE); + testUpdateSinkMissingArguments( + tenant, + namespace, + sink, + null, + mockedFormData, + inputTopics, + className, + parallelism, + "Input Topics cannot be altered"); + } + + @Test + public void testUpdateSinkDifferentParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSinkMissingArguments( + tenant, + namespace, + sink, + null, + mockedFormData, + topicsToSerDeClassName, + className, + parallelism + 1, + null); } private void testUpdateSinkMissingArguments( @@ -609,6 +657,24 @@ public class SinkApiV2ResourceTest { String className, Integer parallelism, String expectedError) throws IOException { + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); SinkConfig sinkConfig = new SinkConfig(); @@ -631,6 +697,14 @@ public class SinkApiV2ResourceTest { sinkConfig.setParallelism(parallelism); } + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -643,8 +717,12 @@ public class SinkApiV2ResourceTest { FunctionsImpl.SINK, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultSink() throws IOException { @@ -656,6 +734,24 @@ public class SinkApiV2ResourceTest { sinkConfig.setParallelism(parallelism); sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + return resource.updateFunction( tenant, namespace, @@ -730,6 +826,24 @@ public class SinkApiV2ResourceTest { sinkConfig.setParallelism(parallelism); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + RequestResult rr = new RequestResult() .setSuccess(true) .setMessage("source registered"); @@ -974,7 +1088,7 @@ public class SinkApiV2ResourceTest { .setSubscriptionType(Function.SubscriptionType.SHARED) .setSubscriptionName(subscriptionName) .putInputSpecs("input", Function.ConsumerSpec.newBuilder() - .setSerdeClassName(TopicSchema.DEFAULT_SERDE) + .setSerdeClassName(DEFAULT_SERDE) .setIsRegexPattern(false) .build()).build(); Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder() @@ -985,7 +1099,7 @@ public class SinkApiV2ResourceTest { .setSink(sinkSpec) .setName(sink) .setNamespace(namespace) - .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE) + .setProcessingGuarantees(ATLEAST_ONCE) .setTenant(tenant) .setParallelism(parallelism) .setRuntime(FunctionDetails.Runtime.JAVA) @@ -1101,4 +1215,19 @@ public class SinkApiV2ResourceTest { assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); } + + private SinkConfig createDefaultSinkConfig() { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(sink); + sinkConfig.setClassName(className); + sinkConfig.setParallelism(parallelism); + sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + return sinkConfig; + } + + private FunctionDetails createDefaultFunctionDetails() throws IOException { + return SinkConfigUtils.convert(createDefaultSinkConfig(), null); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 0c420a3..f60afe4 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -37,6 +38,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.functions.utils.SourceConfigUtils; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -55,6 +57,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.*; import java.net.URL; +import java.nio.file.Path; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -72,7 +75,7 @@ import static org.testng.Assert.assertEquals; /** * Unit test of {@link SourceApiV2Resource}. */ -@PrepareForTest({Utils.class}) +@PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SourceApiV2ResourceTest { @@ -107,6 +110,7 @@ public class SourceApiV2ResourceTest { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetaData; @BeforeMethod public void setup() throws Exception { @@ -342,14 +346,7 @@ public class SourceApiV2ResourceTest { } private Response registerDefaultSource() throws IOException { - SourceConfig sourceConfig = new SourceConfig(); - sourceConfig.setTenant(tenant); - sourceConfig.setNamespace(namespace); - sourceConfig.setName(source); - sourceConfig.setClassName(className); - sourceConfig.setParallelism(parallelism); - sourceConfig.setTopicName(outputTopic); - sourceConfig.setSerdeClassName(outputSerdeClassName); + SourceConfig sourceConfig = createDefaultSourceConfig(); return resource.registerFunction( tenant, namespace, @@ -548,56 +545,53 @@ public class SourceApiV2ResourceTest { @Test public void testUpdateSourceMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - null, + null, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Source Package is not provided"); - } - - @Test - public void testUpdateSourceMissingPackageDetails() throws IOException { - testUpdateSourceMissingArguments( - tenant, - namespace, - source, - mockedInputStream, - null, - outputTopic, - outputSerdeClassName, - className, - parallelism, - "zip file is empty"); + "Update contains no change"); } @Test public void testUpdateSourceMissingTopicName() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - mockedInputStream, + null, mockedFormData, null, outputSerdeClassName, className, parallelism, - "Topic name cannot be null"); + "Update contains no change"); } @Test public void testUpdateSourceNegativeParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - mockedInputStream, + null, mockedFormData, outputTopic, outputSerdeClassName, @@ -607,7 +601,49 @@ public class SourceApiV2ResourceTest { } @Test + public void testUpdateSourceChangedParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + null, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism + 1, + null); + } + + @Test + public void testUpdateSourceChangedTopic() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + null, + mockedFormData, + "DifferentTopic", + outputSerdeClassName, + className, + parallelism, + "Destination topics differ"); + } + + @Test public void testUpdateSourceZeroParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, @@ -632,6 +668,21 @@ public class SourceApiV2ResourceTest { String className, Integer parallelism, String expectedError) throws IOException { + + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); SourceConfig sourceConfig = new SourceConfig(); @@ -657,6 +708,14 @@ public class SourceApiV2ResourceTest { sourceConfig.setParallelism(parallelism); } + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -669,8 +728,12 @@ public class SourceApiV2ResourceTest { FunctionsImpl.SOURCE, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultSource() throws IOException { @@ -683,6 +746,21 @@ public class SourceApiV2ResourceTest { sourceConfig.setTopicName(outputTopic); sourceConfig.setSerdeClassName(outputSerdeClassName); + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + + return resource.updateFunction( tenant, namespace, @@ -757,6 +835,21 @@ public class SourceApiV2ResourceTest { sourceConfig.setClassName(className); sourceConfig.setParallelism(parallelism); + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); RequestResult rr = new RequestResult() .setSuccess(true) @@ -1123,4 +1216,20 @@ public class SourceApiV2ResourceTest { assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); } + + private SourceConfig createDefaultSourceConfig() { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(source); + sourceConfig.setClassName(className); + sourceConfig.setParallelism(parallelism); + sourceConfig.setTopicName(outputTopic); + sourceConfig.setSerdeClassName(outputSerdeClassName); + return sourceConfig; + } + + private FunctionDetails createDefaultFunctionDetails() throws IOException { + return SourceConfigUtils.convert(createDefaultSourceConfig(), null); + } }