srkukarni closed pull request #2985: Update Function Semantics URL: https://github.com/apache/pulsar/pull/2985
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index ed44b98031..1b4c2aa30c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -223,6 +223,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception { final String namespacePortion = "assignment-test"; final String replNamespace = tenant + "/" + namespacePortion; final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String logTopic = "persistent://" + replNamespace + "/log-topic"; final String baseFunctionName = "assign-restart"; final String subscriptionName = "test-sub"; final int totalFunctions = 5; @@ -240,8 +241,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception { functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); functionConfig.setParallelism(parallelism); - // set-auto-ack prop =true - functionConfig.setAutoAck(true); + // don't set any log topic admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); } retryStrategically((test) -> { @@ -263,8 +263,8 @@ public void testFunctionAssignmentsWithRestart() throws Exception { functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); functionConfig.setParallelism(parallelism); - // set-auto-ack prop =false - functionConfig.setAutoAck(false); + // Now set the log topic + functionConfig.setLogTopic(logTopic); admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl); } @@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception { // validate updated function prop = auto-ack=false and instnaceid for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) { String functionName = baseFunctionName + i; - assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck()); + assertEquals(admin.functions().getFunction(tenant, namespacePortion, functionName).getLogTopic(), logTopic); } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 91ff834f93..e0a0d4e0df 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.admin.cli; -import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import lombok.extern.slf4j.Slf4j; @@ -35,8 +34,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction; import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; -import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; -import org.apache.pulsar.admin.cli.CmdSources.CreateSource; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -60,10 +57,8 @@ import java.io.OutputStream; import java.io.PrintStream; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static java.nio.charset.StandardCharsets.UTF_8; @@ -205,7 +200,7 @@ public void testCreateFunction() throws Exception { assertEquals(fnName, creater.getFunctionName()); assertEquals(inputTopicName, creater.getInputs()); assertEquals(outputTopicName, creater.getOutput()); - assertEquals(false, creater.isAutoAck()); + assertEquals(new Boolean(false), creater.getAutoAck()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 28f9183930..3a0d1e191f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -287,14 +287,14 @@ void processArguments() throws Exception { @Parameter(names = "--autoAck", description = "Whether or not the framework will automatically acknowleges messages", hidden = true) protected Boolean DEPRECATED_autoAck = null; @Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) - protected boolean autoAck = true; + protected Boolean autoAck; // for backwards compatibility purposes @Parameter(names = "--timeoutMs", description = "The message timeout in milliseconds", hidden = true) protected Long DEPRECATED_timeoutMs; @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") protected Long timeoutMs; @Parameter(names = "--max-message-retries", description = "How many times should we try to process a message before giving up") - protected Integer maxMessageRetries = -1; + protected Integer maxMessageRetries; @Parameter(names = "--dead-letter-topic", description = "The topic where all messages which could not be processed successfully are sent") protected String deadLetterTopic; protected FunctionConfig functionConfig; @@ -403,21 +403,29 @@ void processArguments() throws Exception { } Resources resources = functionConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - functionConfig.setResources(resources); + if (resources != null) { + functionConfig.setResources(resources); + } if (timeoutMs != null) { functionConfig.setTimeoutMs(timeoutMs); @@ -452,7 +460,9 @@ void processArguments() throws Exception { functionConfig.setWindowConfig(windowConfig); - functionConfig.setAutoAck(autoAck); + if (autoAck != null) { + functionConfig.setAutoAck(autoAck); + } if (null != maxMessageRetries) { functionConfig.setMaxMessageRetries(maxMessageRetries); @@ -711,6 +721,24 @@ void runCmd() throws Exception { @Parameters(commandDescription = "Update a Pulsar Function that's been deployed to a Pulsar cluster") class UpdateFunction extends FunctionDetailsCommand { + + @Override + protected void validateFunctionConfigs(FunctionConfig functionConfig) { + if (StringUtils.isEmpty(functionConfig.getClassName())) { + if (StringUtils.isEmpty(functionConfig.getName())) { + throw new IllegalArgumentException("Function Name not provided"); + } + } else if (StringUtils.isEmpty(functionConfig.getName())) { + org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig); + } + if (StringUtils.isEmpty(functionConfig.getTenant())) { + org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig); + } + if (StringUtils.isEmpty(functionConfig.getNamespace())) { + org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig); + } + } + @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 594aa1b4d4..662b81cd48 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -210,6 +210,10 @@ void runCmd() throws Exception { } print("Updated successfully"); } + + protected void validateSinkConfigs(SinkConfig sinkConfig) { + org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig); + } } abstract class SinkDetailsCommand extends BaseCommand { @@ -253,7 +257,7 @@ void runCmd() throws Exception { @Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order", hidden = true) protected Boolean DEPRECATED_retainOrdering; @Parameter(names = "--retain-ordering", description = "Sink consumes and sinks messages in order") - protected boolean retainOrdering; + protected Boolean retainOrdering; @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected Integer parallelism; @Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) @@ -280,7 +284,7 @@ void runCmd() throws Exception { @Parameter(names = "--sink-config", description = "User defined configs key/values") protected String sinkConfigString; @Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) - protected boolean autoAck = true; + protected Boolean autoAck; @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") protected Long timeoutMs; @@ -329,7 +333,9 @@ void processArguments() throws Exception { sinkConfig.setProcessingGuarantees(processingGuarantees); } - sinkConfig.setRetainOrdering(retainOrdering); + if (retainOrdering != null) { + sinkConfig.setRetainOrdering(retainOrdering); + } if (null != inputs) { sinkConfig.setInputs(Arrays.asList(inputs.split(","))); @@ -371,27 +377,37 @@ void processArguments() throws Exception { } Resources resources = sinkConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - sinkConfig.setResources(resources); + if (resources != null) { + sinkConfig.setResources(resources); + } if (null != sinkConfigString) { sinkConfig.setConfigs(parseConfigs(sinkConfigString)); } - sinkConfig.setAutoAck(autoAck); + if (autoAck != null) { + sinkConfig.setAutoAck(autoAck); + } if (timeoutMs != null) { sinkConfig.setTimeoutMs(timeoutMs); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 92b4f6c710..3e1b6ab221 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -214,6 +214,10 @@ void runCmd() throws Exception { } print("Updated successfully"); } + + protected void validateSourceConfigs(SourceConfig sourceConfig) { + org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig); + } } abstract class SourceDetailsCommand extends BaseCommand { @@ -337,21 +341,29 @@ void processArguments() throws Exception { } Resources resources = sourceConfig.getResources(); - if (resources == null) { - resources = new Resources(); - } if (cpu != null) { + if (resources == null) { + resources = new Resources(); + } resources.setCpu(cpu); } if (ram != null) { + if (resources == null) { + resources = new Resources(); + } resources.setRam(ram); } if (disk != null) { + if (resources == null) { + resources = new Resources(); + } resources.setDisk(disk); } - sourceConfig.setResources(resources); + if (resources != null) { + sourceConfig.setResources(resources); + } if (null != sourceConfigString) { sourceConfig.setConfigs(parseConfigs(sourceConfigString)); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index a52b7b841c..195d2e7726 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -128,7 +128,6 @@ public SinkConfig getSinkConfig() { sinkConfig.setTenant(TENANT); sinkConfig.setNamespace(NAMESPACE); sinkConfig.setName(NAME); - sinkConfig.setAutoAck(true); sinkConfig.setInputs(INPUTS_LIST); sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP); @@ -450,7 +449,7 @@ public void testCmdSinkConfigFileMissingResources() throws Exception { testSinkConfig.setResources(null); SinkConfig expectedSinkConfig = getSinkConfig(); - expectedSinkConfig.setResources(new Resources()); + expectedSinkConfig.setResources(null); testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 2f63626535..2eb951a97f 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -352,7 +352,7 @@ public void testCmdSourceConfigFileMissingResources() throws Exception { testSourceConfig.setResources(null); SourceConfig expectedSourceConfig = getSourceConfig(); - expectedSourceConfig.setResources(new Resources()); + expectedSourceConfig.setResources(null); testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java index 7652336642..832c90321b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java @@ -18,15 +18,13 @@ */ package org.apache.pulsar.common.functions; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.*; @Data @Builder @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode public class ConsumerConfig { private String schemaType; private String serdeClassName; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 2bcc5a6056..bf271556ee 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -24,20 +24,16 @@ import java.util.Map; import java.util.TreeMap; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.Resources; -import org.apache.pulsar.common.functions.WindowConfig; +import lombok.*; @Getter @Setter @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class FunctionConfig { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java index ff8cb5589a..fde3d38801 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java @@ -27,6 +27,7 @@ @ToString @AllArgsConstructor @NoArgsConstructor +@Builder(toBuilder=true) public class Resources { // Default cpu is 1 core private Double cpu = 1d; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java index 33f742bd3d..755201a033 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java @@ -18,10 +18,7 @@ */ package org.apache.pulsar.common.functions; -import lombok.Data; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import lombok.experimental.Accessors; @Data diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 50fb662504..b85de0633c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -22,11 +22,7 @@ import java.util.Map; import java.util.TreeMap; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -36,6 +32,9 @@ @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor public class SinkConfig { private String tenant; @@ -63,9 +62,9 @@ private Map<String, Object> secrets; private Integer parallelism; private FunctionConfig.ProcessingGuarantees processingGuarantees; - private boolean retainOrdering; + private Boolean retainOrdering; private Resources resources; - private boolean autoAck; + private Boolean autoAck; private Long timeoutMs; private String archive; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 3ca7aadfd2..88955b8611 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -18,11 +18,7 @@ */ package org.apache.pulsar.common.io; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -33,6 +29,9 @@ @Data @EqualsAndHashCode @ToString +@Builder(toBuilder=true) +@NoArgsConstructor +@AllArgsConstructor public class SourceConfig { private String tenant; private String namespace; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 7fcf5bb994..2459cfdf5b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -249,7 +249,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } functionConfig.setAutoAck(functionDetails.getAutoAck()); - functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (functionDetails.getSource().getTimeoutMs() != 0) { + functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + } if (!isEmpty(functionDetails.getSink().getTopic())) { functionConfig.setOutput(functionDetails.getSink().getTopic()); } @@ -566,4 +568,112 @@ public static ClassLoader validate(FunctionConfig functionConfig, String functio return null; } } + + public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) { + FunctionConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Function Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (newConfig.getInputs() != null) { + newConfig.getInputs().forEach((topicName -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder().isRegexPattern(false).build()); + })); + } + if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) { + newConfig.getInputSpecs().put(newConfig.getTopicsPattern(), + ConsumerConfig.builder() + .isRegexPattern(true) + .build()); + } + if (newConfig.getCustomSerdeInputs() != null) { + newConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .serdeClassName(serdeClassName) + .isRegexPattern(false) + .build()); + }); + } + if (newConfig.getCustomSchemaInputs() != null) { + newConfig.getCustomSchemaInputs().forEach((topicName, schemaClassname) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .schemaType(schemaClassname) + .isRegexPattern(false) + .build()); + }); + } + if (!newConfig.getInputSpecs().isEmpty()) { + newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> { + if (!existingConfig.getInputSpecs().containsKey(topicName)) { + throw new IllegalArgumentException("Input Topics cannot be altered"); + } + if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { + throw new IllegalArgumentException("Input Specs mismatch"); + } + }); + } + if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) { + throw new IllegalArgumentException("Output topics differ"); + } + if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) { + throw new IllegalArgumentException("Output Serde mismatch"); + } + if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) { + throw new IllegalArgumentException("Output Schema mismatch"); + } + if (!StringUtils.isEmpty(newConfig.getLogTopic())) { + mergedConfig.setLogTopic(newConfig.getLogTopic()); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) { + throw new IllegalArgumentException("Retain Orderning cannot be altered"); + } + if (newConfig.getUserConfig() != null) { + mergedConfig.setUserConfig(newConfig.getUserConfig()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getRuntime() != null && !newConfig.getRuntime().equals(existingConfig.getRuntime())) { + throw new IllegalArgumentException("Runtime cannot be altered"); + } + if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) { + throw new IllegalArgumentException("AutoAck cannot be altered"); + } + if (newConfig.getMaxMessageRetries() != null) { + mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries()); + } + if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) { + mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic()); + } + if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName().equals(existingConfig.getSubName())) { + throw new IllegalArgumentException("Subscription Name cannot be altered"); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (newConfig.getWindowConfig() != null) { + mergedConfig.setWindowConfig(newConfig.getWindowConfig()); + } + if (newConfig.getTimeoutMs() != null) { + mergedConfig.setTimeoutMs(newConfig.getTimeoutMs()); + } + return mergedConfig; + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java index ee46cebf0a..5fbda31fab 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java @@ -33,4 +33,22 @@ public static void validate(Resources resources) { com.google.common.base.Preconditions.checkArgument(disk == null || disk > 0L, "The disk allocation for the function must be positive"); } + + public static Resources merge(Resources existingResources, Resources newResources) { + Resources mergedResources; + if (existingResources != null) { + mergedResources = existingResources.toBuilder().build(); + } else { + mergedResources = new Resources(); + } + if (newResources.getCpu() != null) { + mergedResources.setCpu(newResources.getCpu()); + }if (newResources.getRam() != null) { + mergedResources.setRam(newResources.getRam()); + } + if (newResources.getDisk() != null) { + mergedResources.setDisk(newResources.getDisk()); + } + return mergedResources; + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 375113697d..d65b87fd4d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -42,8 +42,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; -import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSinkType; @@ -144,13 +142,17 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); } - Function.SubscriptionType subType = (sinkConfig.isRetainOrdering() + Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) ? Function.SubscriptionType.FAILOVER : Function.SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); - functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); + if (sinkConfig.getAutoAck() != null) { + functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck()); + } else { + functionDetailsBuilder.setAutoAck(true); + } if (sinkConfig.getTimeoutMs() != null) { sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); } @@ -226,7 +228,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } sinkConfig.setAutoAck(functionDetails.getAutoAck()); - sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + if (functionDetails.getSource().getTimeoutMs() != 0) { + sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); + } if (!isEmpty(functionDetails.getSink().getClassName())) { sinkConfig.setClassName(functionDetails.getSink().getClassName()); } @@ -351,4 +355,91 @@ public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, S } return retval; } + + public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig newConfig) { + SinkConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Sink Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && !newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName())) { + throw new IllegalArgumentException("Subscription Name cannot be altered"); + } + if (newConfig.getInputs() != null) { + newConfig.getInputs().forEach((topicName -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder().isRegexPattern(false).build()); + })); + } + if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) { + newConfig.getInputSpecs().put(newConfig.getTopicsPattern(), + ConsumerConfig.builder() + .isRegexPattern(true) + .build()); + } + if (newConfig.getTopicToSerdeClassName() != null) { + newConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .serdeClassName(serdeClassName) + .isRegexPattern(false) + .build()); + }); + } + if (newConfig.getTopicToSchemaType() != null) { + newConfig.getTopicToSchemaType().forEach((topicName, schemaClassname) -> { + newConfig.getInputSpecs().put(topicName, + ConsumerConfig.builder() + .schemaType(schemaClassname) + .isRegexPattern(false) + .build()); + }); + } + if (!newConfig.getInputSpecs().isEmpty()) { + newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> { + if (!existingConfig.getInputSpecs().containsKey(topicName)) { + throw new IllegalArgumentException("Input Topics cannot be altered"); + } + if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { + throw new IllegalArgumentException("Input Specs mismatch"); + } + }); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getConfigs() != null) { + mergedConfig.setConfigs(newConfig.getConfigs()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) { + throw new IllegalArgumentException("Retain Orderning cannot be altered"); + } + if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) { + throw new IllegalArgumentException("AutoAck cannot be altered"); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (newConfig.getTimeoutMs() != null) { + mergedConfig.setTimeoutMs(newConfig.getTimeoutMs()); + } + if (!StringUtils.isEmpty(newConfig.getArchive())) { + mergedConfig.setArchive(newConfig.getArchive()); + } + return mergedConfig; + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 2245580458..a7baea219b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -240,4 +240,48 @@ public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePat return classLoader; } + public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceConfig newConfig) { + SourceConfig mergedConfig = existingConfig.toBuilder().build(); + if (!existingConfig.getTenant().equals(newConfig.getTenant())) { + throw new IllegalArgumentException("Tenants differ"); + } + if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) { + throw new IllegalArgumentException("Namespaces differ"); + } + if (!existingConfig.getName().equals(newConfig.getName())) { + throw new IllegalArgumentException("Function Names differ"); + } + if (!StringUtils.isEmpty(newConfig.getClassName())) { + mergedConfig.setClassName(newConfig.getClassName()); + } + if (!StringUtils.isEmpty(newConfig.getTopicName()) && !newConfig.getTopicName().equals(existingConfig.getTopicName())) { + throw new IllegalArgumentException("Destination topics differ"); + } + if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) { + mergedConfig.setSerdeClassName(newConfig.getSerdeClassName()); + } + if (!StringUtils.isEmpty(newConfig.getSchemaType())) { + mergedConfig.setSchemaType(newConfig.getSchemaType()); + } + if (newConfig.getConfigs() != null) { + mergedConfig.setConfigs(newConfig.getConfigs()); + } + if (newConfig.getSecrets() != null) { + mergedConfig.setSecrets(newConfig.getSecrets()); + } + if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { + throw new IllegalArgumentException("Processing Guarantess cannot be alterted"); + } + if (newConfig.getParallelism() != null) { + mergedConfig.setParallelism(newConfig.getParallelism()); + } + if (newConfig.getResources() != null) { + mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources())); + } + if (!StringUtils.isEmpty(newConfig.getArchive())) { + mergedConfig.setArchive(newConfig.getArchive()); + } + return mergedConfig; + } + } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 6d71d45292..0f4f400543 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -22,14 +22,18 @@ import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; +import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON; import static org.testng.Assert.assertEquals; /** @@ -92,6 +96,293 @@ public void testConvertWindow() { ); } + @Test + public void testMergeEqual() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createFunctionConfig(); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Function Names differ") + public void testMergeDifferentName() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("name", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("tenant", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("namespace", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentClassName() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("className", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(functionConfig.getClassName()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered") + public void testMergeDifferentInputs() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("topicsPattern", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ") + public void testMergeDifferentOutput() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("output", "Different"); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentLogTopic() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("logTopic", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getLogTopic(), + "Different" + ); + mergedConfig.setLogTopic(functionConfig.getLogTopic()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered") + public void testMergeDifferentRetainOrdering() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("retainOrdering", true); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + FunctionConfig functionConfig = createFunctionConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("userConfig", myConfig); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getUserConfig(), + myConfig + ); + mergedConfig.setUserConfig(functionConfig.getUserConfig()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + FunctionConfig functionConfig = createFunctionConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("secrets", mySecrets); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(functionConfig.getSecrets()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Runtime cannot be altered") + public void testMergeDifferentRuntime() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("runtime", PYTHON); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "AutoAck cannot be altered") + public void testMergeDifferentAutoAck() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("autoAck", false); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentMaxMessageRetries() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("maxMessageRetries", 10); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getMaxMessageRetries(), + new Integer(10) + ); + mergedConfig.setMaxMessageRetries(functionConfig.getMaxMessageRetries()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentDeadLetterTopic() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("deadLetterTopic", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getDeadLetterTopic(), + "Different" + ); + mergedConfig.setDeadLetterTopic(functionConfig.getDeadLetterTopic()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Subscription Name cannot be altered") + public void testMergeDifferentSubname() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("subName", "Different"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentParallelism() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("parallelism", 101); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(functionConfig.getParallelism()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + FunctionConfig functionConfig = createFunctionConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("resources", resources); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(functionConfig.getResources()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentWindowConfig() { + FunctionConfig functionConfig = createFunctionConfig(); + WindowConfig windowConfig = new WindowConfig(); + windowConfig.setSlidingIntervalCount(123); + windowConfig.setSlidingIntervalDurationMs(123l); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("windowConfig", windowConfig); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getWindowConfig(), + windowConfig + ); + mergedConfig.setWindowConfig(functionConfig.getWindowConfig()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentTimeout() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("timeoutMs", 102l); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getTimeoutMs(), + new Long(102l) + ); + mergedConfig.setTimeoutMs(functionConfig.getTimeoutMs()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + + private FunctionConfig createFunctionConfig() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant("test-tenant"); + functionConfig.setNamespace("test-namespace"); + functionConfig.setName("test-function"); + functionConfig.setParallelism(1); + functionConfig.setClassName(IdentityFunction.class.getName()); + Map<String, ConsumerConfig> inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + functionConfig.setInputSpecs(inputSpecs); + functionConfig.setOutput("test-output"); + functionConfig.setOutputSerdeClassName("test-serde"); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + functionConfig.setRetainOrdering(false); + functionConfig.setUserConfig(new HashMap<>()); + functionConfig.setAutoAck(true); + functionConfig.setTimeoutMs(2000l); + functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); + return functionConfig; + } + + private FunctionConfig createUpdatedFunctionConfig(String fieldName, Object fieldValue) { + FunctionConfig functionConfig = createFunctionConfig(); + Class<?> fClass = FunctionConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(functionConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return functionConfig; + } + @Test public void testFunctionConfigConvertFromDetails() { String name = "test1"; diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 0a68c85e9d..858cd72966 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -21,14 +21,18 @@ import com.google.gson.Gson; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; import static org.testng.Assert.assertEquals; /** @@ -60,4 +64,207 @@ public void testConvertBackFidelity() throws IOException { new Gson().toJson(convertedConfig) ); } + + @Test + public void testMergeEqual() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createSinkConfig(); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Names differ") + public void testMergeDifferentName() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("name", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("tenant", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("namespace", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentClassName() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("className", "Different"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(sinkConfig.getClassName()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered") + public void testMergeDifferentInputs() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("topicsPattern", "Different"); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered") + public void testMergeDifferentRetainOrdering() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("retainOrdering", true); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + SinkConfig sinkConfig = createSinkConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + SinkConfig newSinkConfig = createUpdatedSinkConfig("configs", myConfig); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getConfigs(), + myConfig + ); + mergedConfig.setConfigs(sinkConfig.getConfigs()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + SinkConfig sinkConfig = createSinkConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + SinkConfig newSinkConfig = createUpdatedSinkConfig("secrets", mySecrets); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(sinkConfig.getSecrets()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "AutoAck cannot be altered") + public void testMergeDifferentAutoAck() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("autoAck", false); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Subscription Name cannot be altered") + public void testMergeDifferentSubname() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("sourceSubscriptionName", "Different"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentParallelism() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("parallelism", 101); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(sinkConfig.getParallelism()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + SinkConfig sinkConfig = createSinkConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + SinkConfig newSinkConfig = createUpdatedSinkConfig("resources", resources); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(sinkConfig.getResources()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentTimeout() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("timeoutMs", 102l); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getTimeoutMs(), + new Long(102l) + ); + mergedConfig.setTimeoutMs(sinkConfig.getTimeoutMs()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + + private SinkConfig createSinkConfig() { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant("test-tenant"); + sinkConfig.setNamespace("test-namespace"); + sinkConfig.setName("test-sink"); + sinkConfig.setParallelism(1); + sinkConfig.setClassName(IdentityFunction.class.getName()); + Map<String, ConsumerConfig> inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); + sinkConfig.setInputSpecs(inputSpecs); + sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setRetainOrdering(false); + sinkConfig.setConfigs(new HashMap<>()); + sinkConfig.setAutoAck(true); + sinkConfig.setTimeoutMs(2000l); + sinkConfig.setArchive("DummyArchive.nar"); + return sinkConfig; + } + + private SinkConfig createUpdatedSinkConfig(String fieldName, Object fieldValue) { + SinkConfig sinkConfig = createSinkConfig(); + Class<?> fClass = SinkConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(sinkConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return sinkConfig; + } } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index dcaf2e9f61..7cb7d3c5b5 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -20,13 +20,18 @@ import com.google.gson.Gson; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.testng.annotations.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; +import java.util.Map; +import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE; import static org.testng.Assert.assertEquals; /** @@ -53,4 +58,165 @@ public void testConvertBackFidelity() throws IOException { new Gson().toJson(convertedConfig) ); } + + @Test + public void testMergeEqual() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createSourceConfig(); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Function Names differ") + public void testMergeDifferentName() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("name", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Tenants differ") + public void testMergeDifferentTenant() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("tenant", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Namespaces differ") + public void testMergeDifferentNamespace() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("namespace", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test + public void testMergeDifferentClassName() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("className", "Different"); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getClassName(), + "Different" + ); + mergedConfig.setClassName(sourceConfig.getClassName()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Destination topics differ") + public void testMergeDifferentOutput() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", "Different"); + SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") + public void testMergeDifferentProcessingGuarantees() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + } + + @Test + public void testMergeDifferentUserConfig() { + SourceConfig sourceConfig = createSourceConfig(); + Map<String, String> myConfig = new HashMap<>(); + myConfig.put("MyKey", "MyValue"); + SourceConfig newSourceConfig = createUpdatedSourceConfig("configs", myConfig); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getConfigs(), + myConfig + ); + mergedConfig.setConfigs(sourceConfig.getConfigs()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentSecrets() { + SourceConfig sourceConfig = createSourceConfig(); + Map<String, String> mySecrets = new HashMap<>(); + mySecrets.put("MyKey", "MyValue"); + SourceConfig newSourceConfig = createUpdatedSourceConfig("secrets", mySecrets); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getSecrets(), + mySecrets + ); + mergedConfig.setSecrets(sourceConfig.getSecrets()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentParallelism() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("parallelism", 101); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getParallelism(), + new Integer(101) + ); + mergedConfig.setParallelism(sourceConfig.getParallelism()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + @Test + public void testMergeDifferentResources() { + SourceConfig sourceConfig = createSourceConfig(); + Resources resources = new Resources(); + resources.setCpu(0.3); + resources.setRam(1232l); + resources.setDisk(123456l); + SourceConfig newSourceConfig = createUpdatedSourceConfig("resources", resources); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getResources(), + resources + ); + mergedConfig.setResources(sourceConfig.getResources()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + + private SourceConfig createSourceConfig() { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant("test-tenant"); + sourceConfig.setNamespace("test-namespace"); + sourceConfig.setName("test-source"); + sourceConfig.setParallelism(1); + sourceConfig.setClassName(IdentityFunction.class.getName()); + sourceConfig.setTopicName("test-output"); + sourceConfig.setSerdeClassName("test-serde"); + sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sourceConfig.setConfigs(new HashMap<>()); + return sourceConfig; + } + + private SourceConfig createUpdatedSourceConfig(String fieldName, Object fieldValue) { + SourceConfig sourceConfig = createSourceConfig(); + Class<?> fClass = SourceConfig.class; + try { + Field chap = fClass.getDeclaredField(fieldName); + chap.setAccessible(true); + chap.set(sourceConfig, fieldValue); + } catch (Exception e) { + throw new RuntimeException("Something wrong with the test", e); + } + return sourceConfig; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 2cd51b84ea..7defdf6ee5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -125,8 +125,14 @@ public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outpu } public static void downloadFromBookkeeper(Namespace namespace, - OutputStream outputStream, - String packagePath) throws IOException { + File outputFile, + String packagePath) throws IOException { + downloadFromBookkeeper(namespace, new FileOutputStream(outputFile), packagePath); + } + + public static void downloadFromBookkeeper(Namespace namespace, + OutputStream outputStream, + String packagePath) throws IOException { DistributedLogManager dlm = namespace.openLog(packagePath); try (InputStream in = new DLInputStream(dlm)) { int read = 0; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 18de1eb734..627f5ccd0f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -329,6 +329,19 @@ public Response updateFunction(final String tenant, final String namespace, fina return getUnavailableResponse(); } + if (tenant == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Tenant is not provided")).build(); + } + if (namespace == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Namespace is not provided")).build(); + } + if (componentName == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(componentType + " Name is not provided")).build(); + } + try { if (!isAuthorizedRole(tenant, clientRole)) { log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace, @@ -349,23 +362,83 @@ public Response updateFunction(final String tenant, final String namespace, fina .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); } + String mergedComponentConfigJson; + String existingComponentConfigJson; + + FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); + if (componentType.equals(FUNCTION)) { + FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingFunctionConfig); + FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(componentName); + try { + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } else if (componentType.equals(SOURCE)) { + SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingSourceConfig); + SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(componentName); + try { + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } else { + SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); + existingComponentConfigJson = new Gson().toJson(existingSinkConfig); + SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(componentName); + try { + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig); + mergedComponentConfigJson = new Gson().toJson(mergedConfig); + } catch (Exception e) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + } + + if (existingComponentConfigJson.equals(mergedComponentConfigJson) && isBlank(functionPkgUrl) && uploadedInputStream == null) { + log.error("{}/{}/{} Update contains no changes", tenant, namespace, componentName); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Update contains no change")).build(); + } + FunctionDetails functionDetails; - boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); File uploadedInputStreamAsFile = null; if (uploadedInputStream != null) { uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream); } + File existingPackageAsFile = null; + // validate parameters try { - if (isPkgUrlProvided) { + if (isNotBlank(functionPkgUrl)) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, - functionDetailsJson, componentConfigJson, componentType); - } else { + functionDetailsJson, mergedComponentConfigJson, componentType); + } else if (uploadedInputStream != null) { functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, componentConfigJson, componentType); + fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType); + } else { + functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType); } } catch (Exception e) { - log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); + log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) .entity(new ErrorData(e.getMessage())).build(); } @@ -383,12 +456,16 @@ public Response updateFunction(final String tenant, final String namespace, fina .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); PackageLocationMetaData.Builder packageLocationMetaDataBuilder; - try { - packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, - functionPkgUrl, fileDetail, uploadedInputStreamAsFile); - } catch (Exception e) { - return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) - .build(); + if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) { + try { + packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails, componentType, + functionPkgUrl, fileDetail, uploadedInputStreamAsFile); + } catch (Exception e) { + return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage())) + .build(); + } + } else { + packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation()); } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); @@ -1157,6 +1234,16 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp return functionDetails; } + private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName, + PackageLocationMetaData packageLocationMetaData, + String componentConfigJson, String componentType) throws Exception { + File tmpFile = File.createTempFile("functions", null); + tmpFile.deleteOnExit(); + Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath()); + return validateUpdateRequestParams(tenant, namespace, componentName, + null, componentConfigJson, componentType, null, tmpFile); + } + private static File dumpToTmpFile(InputStream uploadedInputStream) { try { File tmpFile = File.createTempFile("functions", null); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index e03d90d3f1..5275cf0834 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -126,6 +126,7 @@ public String process(String input, Context context) { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetadata; @BeforeMethod public void setup() throws Exception { @@ -140,6 +141,7 @@ public void setup() throws Exception { this.mockedPulsarAdmin = mock(PulsarAdmin.class); this.mockedTenants = mock(Tenants.class); this.mockedNamespaces = mock(Namespaces.class); + this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); namespaceList.add(tenant + "/" + namespace); this.mockedWorkerService = mock(WorkerService.class); @@ -153,6 +155,7 @@ public void setup() throws Exception { when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata); // worker config WorkerConfig workerConfig = new WorkerConfig() @@ -434,16 +437,7 @@ private void testRegisterFunctionMissingArguments( } private Response registerDefaultFunction() { - FunctionConfig functionConfig = new FunctionConfig(); - functionConfig.setTenant(tenant); - functionConfig.setNamespace(namespace); - functionConfig.setName(function); - functionConfig.setClassName(className); - functionConfig.setParallelism(parallelism); - functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); - functionConfig.setOutput(outputTopic); - functionConfig.setOutputSerdeClassName(outputSerdeClassName); - functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + FunctionConfig functionConfig = createDefaultFunctionConfig(); return resource.registerFunction( tenant, namespace, @@ -617,6 +611,9 @@ public void testUpdateFunctionMissingFunctionName() throws IOException { @Test public void testUpdateFunctionMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, @@ -628,55 +625,104 @@ public void testUpdateFunctionMissingPackage() throws IOException { outputSerdeClassName, className, parallelism, - "Function Package is not provided"); + "Update contains no change"); } @Test public void testUpdateFunctionMissingInputTopic() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, function, - mockedInputStream, + null, null, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "No input topic(s) specified for the function"); + "Update contains no change"); } @Test - public void testUpdateFunctionMissingPackageDetails() throws IOException { + public void testUpdateFunctionMissingClassName() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( tenant, namespace, function, - mockedInputStream, - topicsToSerDeClassName, null, + topicsToSerDeClassName, + mockedFormData, outputTopic, outputSerdeClassName, - className, + null, parallelism, - "Function Package is not provided"); + "Update contains no change"); } @Test - public void testUpdateFunctionMissingClassName() throws IOException { + public void testUpdateFunctionChangedParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - topicsToSerDeClassName, - mockedFormData, - outputTopic, + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + outputTopic, outputSerdeClassName, - null, - parallelism, - "Function classname cannot be null"); + null, + parallelism + 1, + null); + } + + @Test + public void testUpdateFunctionChangedInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + null, + topicsToSerDeClassName, + mockedFormData, + "DifferentOutput", + outputSerdeClassName, + null, + parallelism, + "Output topics differ"); + } + + @Test + public void testUpdateFunctionChangedOutput() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + Map<String, String> someOtherInput = new HashMap<>(); + someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE); + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + null, + someOtherInput, + mockedFormData, + outputTopic, + outputSerdeClassName, + null, + parallelism, + "Input Topics cannot be altered"); } private void testUpdateFunctionMissingArguments( @@ -720,6 +766,14 @@ private void testUpdateFunctionMissingArguments( } functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("function registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -732,8 +786,12 @@ private void testUpdateFunctionMissingArguments( FunctionsImpl.FUNCTION, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultFunction() throws IOException { @@ -1269,4 +1327,23 @@ public void testRegisterFunctionWithConflictingFields() throws IOException { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + + public static FunctionConfig createDefaultFunctionConfig() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + return functionConfig; + } + + public static FunctionDetails createDefaultFunctionDetails() { + FunctionConfig functionConfig = createDefaultFunctionConfig(); + return FunctionConfigUtils.convert(functionConfig, null); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 5e710e03d5..a40e0adbad 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -28,6 +28,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -39,10 +41,12 @@ import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.functions.utils.SinkConfigUtils; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.apache.pulsar.io.cassandra.CassandraStringSink; +import org.apache.pulsar.io.twitter.TwitterFireHose; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -60,12 +64,15 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.nio.file.Path; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE; +import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -81,7 +88,7 @@ /** * Unit test of {@link SinkApiV2Resource}. */ -@PrepareForTest({Utils.class, SinkConfigUtils.class}) +@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SinkApiV2ResourceTest { @@ -96,7 +103,7 @@ public IObjectFactory getObjectFactory() { private static final String sink = "test-sink"; private static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DEFAULT_SERDE); } private static final String subscriptionName = "test-subscription"; private static final String className = CassandraStringSink.class.getName(); @@ -119,6 +126,7 @@ public IObjectFactory getObjectFactory() { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetaData; @BeforeMethod public void setup() throws Exception { @@ -372,13 +380,7 @@ private void testRegisterSinkMissingArguments( } private Response registerDefaultSink() throws IOException { - SinkConfig sinkConfig = new SinkConfig(); - sinkConfig.setTenant(tenant); - sinkConfig.setNamespace(namespace); - sinkConfig.setName(sink); - sinkConfig.setClassName(className); - sinkConfig.setParallelism(parallelism); - sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + SinkConfig sinkConfig = createDefaultSinkConfig(); return resource.registerFunction( tenant, namespace, @@ -573,6 +575,10 @@ public void testUpdateSinkMissingFunctionName() throws IOException { @Test public void testUpdateSinkMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSinkMissingArguments( tenant, namespace, @@ -582,21 +588,63 @@ public void testUpdateSinkMissingPackage() throws IOException { topicsToSerDeClassName, className, parallelism, - "Sink Package is not provided"); + "Update contains no change"); } @Test - public void testUpdateSinkMissingPackageDetails() throws IOException { + public void testUpdateSinkMissingInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSinkMissingArguments( - tenant, - namespace, + tenant, + namespace, sink, - mockedInputStream, - null, - topicsToSerDeClassName, - className, - parallelism, - "zip file is empty"); + null, + mockedFormData, + null, + className, + parallelism, + "Update contains no change"); + } + + @Test + public void testUpdateSinkDifferentInputs() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + Map<String, String> inputTopics = new HashMap<>(); + inputTopics.put("DifferntTopic", DEFAULT_SERDE); + testUpdateSinkMissingArguments( + tenant, + namespace, + sink, + null, + mockedFormData, + inputTopics, + className, + parallelism, + "Input Topics cannot be altered"); + } + + @Test + public void testUpdateSinkDifferentParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSinkMissingArguments( + tenant, + namespace, + sink, + null, + mockedFormData, + topicsToSerDeClassName, + className, + parallelism + 1, + null); } private void testUpdateSinkMissingArguments( @@ -609,6 +657,24 @@ private void testUpdateSinkMissingArguments( String className, Integer parallelism, String expectedError) throws IOException { + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); SinkConfig sinkConfig = new SinkConfig(); @@ -631,6 +697,14 @@ private void testUpdateSinkMissingArguments( sinkConfig.setParallelism(parallelism); } + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -643,8 +717,12 @@ private void testUpdateSinkMissingArguments( FunctionsImpl.SINK, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultSink() throws IOException { @@ -656,6 +734,24 @@ private Response updateDefaultSink() throws IOException { sinkConfig.setParallelism(parallelism); sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + return resource.updateFunction( tenant, namespace, @@ -730,6 +826,24 @@ public void testUpdateSinkWithUrl() throws IOException { sinkConfig.setParallelism(parallelism); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + mockStatic(ConnectorUtils.class); + doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + RequestResult rr = new RequestResult() .setSuccess(true) .setMessage("source registered"); @@ -974,7 +1088,7 @@ public void testGetSinkSuccess() throws Exception { .setSubscriptionType(Function.SubscriptionType.SHARED) .setSubscriptionName(subscriptionName) .putInputSpecs("input", Function.ConsumerSpec.newBuilder() - .setSerdeClassName(TopicSchema.DEFAULT_SERDE) + .setSerdeClassName(DEFAULT_SERDE) .setIsRegexPattern(false) .build()).build(); Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder() @@ -985,7 +1099,7 @@ public void testGetSinkSuccess() throws Exception { .setSink(sinkSpec) .setName(sink) .setNamespace(namespace) - .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE) + .setProcessingGuarantees(ATLEAST_ONCE) .setTenant(tenant) .setParallelism(parallelism) .setRuntime(FunctionDetails.Runtime.JAVA) @@ -1101,4 +1215,19 @@ public void testRegisterFunctionNonexistantTenant() throws Exception { assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); } + + private SinkConfig createDefaultSinkConfig() { + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(sink); + sinkConfig.setClassName(className); + sinkConfig.setParallelism(parallelism); + sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + return sinkConfig; + } + + private FunctionDetails createDefaultFunctionDetails() throws IOException { + return SinkConfigUtils.convert(createDefaultSinkConfig(), null); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 0c420a3e2f..f60afe4a47 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -37,6 +38,7 @@ import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.functions.utils.SourceConfigUtils; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.worker.*; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; @@ -55,6 +57,7 @@ import javax.ws.rs.core.Response.Status; import java.io.*; import java.net.URL; +import java.nio.file.Path; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -72,7 +75,7 @@ /** * Unit test of {@link SourceApiV2Resource}. */ -@PrepareForTest({Utils.class}) +@PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class}) @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" }) @Slf4j public class SourceApiV2ResourceTest { @@ -107,6 +110,7 @@ public IObjectFactory getObjectFactory() { private FunctionsImpl resource; private InputStream mockedInputStream; private FormDataContentDisposition mockedFormData; + private FunctionMetaData mockedFunctionMetaData; @BeforeMethod public void setup() throws Exception { @@ -342,14 +346,7 @@ private void testRegisterSourceMissingArguments( } private Response registerDefaultSource() throws IOException { - SourceConfig sourceConfig = new SourceConfig(); - sourceConfig.setTenant(tenant); - sourceConfig.setNamespace(namespace); - sourceConfig.setName(source); - sourceConfig.setClassName(className); - sourceConfig.setParallelism(parallelism); - sourceConfig.setTopicName(outputTopic); - sourceConfig.setSerdeClassName(outputSerdeClassName); + SourceConfig sourceConfig = createDefaultSourceConfig(); return resource.registerFunction( tenant, namespace, @@ -548,56 +545,53 @@ public void testUpdateSourceMissingFunctionName() throws IOException { @Test public void testUpdateSourceMissingPackage() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - null, + null, mockedFormData, outputTopic, outputSerdeClassName, className, parallelism, - "Source Package is not provided"); - } - - @Test - public void testUpdateSourceMissingPackageDetails() throws IOException { - testUpdateSourceMissingArguments( - tenant, - namespace, - source, - mockedInputStream, - null, - outputTopic, - outputSerdeClassName, - className, - parallelism, - "zip file is empty"); + "Update contains no change"); } @Test public void testUpdateSourceMissingTopicName() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - mockedInputStream, + null, mockedFormData, null, outputSerdeClassName, className, parallelism, - "Topic name cannot be null"); + "Update contains no change"); } @Test public void testUpdateSourceNegativeParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, source, - mockedInputStream, + null, mockedFormData, outputTopic, outputSerdeClassName, @@ -606,8 +600,50 @@ public void testUpdateSourceNegativeParallelism() throws IOException { "Source parallelism should positive number"); } + @Test + public void testUpdateSourceChangedParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + null, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism + 1, + null); + } + + @Test + public void testUpdateSourceChangedTopic() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + + testUpdateSourceMissingArguments( + tenant, + namespace, + source, + null, + mockedFormData, + "DifferentTopic", + outputSerdeClassName, + className, + parallelism, + "Destination topics differ"); + } + @Test public void testUpdateSourceZeroParallelism() throws IOException { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString()); + testUpdateSourceMissingArguments( tenant, namespace, @@ -632,6 +668,21 @@ private void testUpdateSourceMissingArguments( String className, Integer parallelism, String expectedError) throws IOException { + + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); SourceConfig sourceConfig = new SourceConfig(); @@ -657,6 +708,14 @@ private void testUpdateSourceMissingArguments( sourceConfig.setParallelism(parallelism); } + if (expectedError == null) { + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + } + Response response = resource.updateFunction( tenant, namespace, @@ -669,8 +728,12 @@ private void testUpdateSourceMissingArguments( FunctionsImpl.SOURCE, null); - assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + if (expectedError == null) { + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } else { + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason); + } } private Response updateDefaultSource() throws IOException { @@ -683,6 +746,21 @@ private Response updateDefaultSource() throws IOException { sourceConfig.setTopicName(outputTopic); sourceConfig.setSerdeClassName(outputSerdeClassName); + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + + return resource.updateFunction( tenant, namespace, @@ -757,6 +835,21 @@ public void testUpdateSourceWithUrl() throws IOException { sourceConfig.setClassName(className); sourceConfig.setParallelism(parallelism); + mockStatic(ConnectorUtils.class); + doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class); + ConnectorUtils.getIOSourceClass(any(NarClassLoader.class)); + + mockStatic(org.apache.pulsar.functions.utils.Utils.class); + doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class)); + + doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class); + org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class)); + + this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); RequestResult rr = new RequestResult() .setSuccess(true) @@ -1123,4 +1216,20 @@ public void testRegisterFunctionNonexistantTenant() throws Exception { assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); } + + private SourceConfig createDefaultSourceConfig() { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(source); + sourceConfig.setClassName(className); + sourceConfig.setParallelism(parallelism); + sourceConfig.setTopicName(outputTopic); + sourceConfig.setSerdeClassName(outputSerdeClassName); + return sourceConfig; + } + + private FunctionDetails createDefaultFunctionDetails() throws IOException { + return SourceConfigUtils.convert(createDefaultSourceConfig(), null); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services