srkukarni closed pull request #2985: Update Function Semantics
URL: https://github.com/apache/pulsar/pull/2985
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index ed44b98031..1b4c2aa30c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -223,6 +223,7 @@ public void testFunctionAssignmentsWithRestart() throws 
Exception {
         final String namespacePortion = "assignment-test";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String logTopic = "persistent://" + replNamespace + "/log-topic";
         final String baseFunctionName = "assign-restart";
         final String subscriptionName = "test-sub";
         final int totalFunctions = 5;
@@ -240,8 +241,7 @@ public void testFunctionAssignmentsWithRestart() throws 
Exception {
             functionConfig = createFunctionConfig(tenant, namespacePortion, 
functionName,
                     "my.*", sinkTopic, subscriptionName);
             functionConfig.setParallelism(parallelism);
-            // set-auto-ack prop =true
-            functionConfig.setAutoAck(true);
+            // don't set any log topic
             admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
         }
         retryStrategically((test) -> {
@@ -263,8 +263,8 @@ public void testFunctionAssignmentsWithRestart() throws 
Exception {
             functionConfig = createFunctionConfig(tenant, namespacePortion, 
functionName,
                     "my.*", sinkTopic, subscriptionName);
             functionConfig.setParallelism(parallelism);
-            // set-auto-ack prop =false
-            functionConfig.setAutoAck(false);
+            // Now set the log topic
+            functionConfig.setLogTopic(logTopic);
             admin.functions().updateFunctionWithUrl(functionConfig, 
jarFilePathUrl);
         }
 
@@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws 
Exception {
         // validate updated function prop = auto-ack=false and instnaceid
         for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
             String functionName = baseFunctionName + i;
-            assertFalse(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getAutoAck());
+            assertEquals(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getLogTopic(), logTopic);
         }
     }
 
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 91ff834f93..e0a0d4e0df 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import lombok.extern.slf4j.Slf4j;
@@ -35,8 +34,6 @@
 import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
-import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
-import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -60,10 +57,8 @@
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -205,7 +200,7 @@ public void testCreateFunction() throws Exception {
         assertEquals(fnName, creater.getFunctionName());
         assertEquals(inputTopicName, creater.getInputs());
         assertEquals(outputTopicName, creater.getOutput());
-        assertEquals(false, creater.isAutoAck());
+        assertEquals(new Boolean(false), creater.getAutoAck());
 
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 28f9183930..3a0d1e191f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -287,14 +287,14 @@ void processArguments() throws Exception {
         @Parameter(names = "--autoAck", description = "Whether or not the 
framework will automatically acknowleges messages", hidden = true)
         protected Boolean DEPRECATED_autoAck = null;
         @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
-        protected boolean autoAck = true;
+        protected Boolean autoAck;
         // for backwards compatibility purposes
         @Parameter(names = "--timeoutMs", description = "The message timeout 
in milliseconds", hidden = true)
         protected Long DEPRECATED_timeoutMs;
         @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
         protected Long timeoutMs;
         @Parameter(names = "--max-message-retries", description = "How many 
times should we try to process a message before giving up")
-        protected Integer maxMessageRetries = -1;
+        protected Integer maxMessageRetries;
         @Parameter(names = "--dead-letter-topic", description = "The topic 
where all messages which could not be processed successfully are sent")
         protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
@@ -403,21 +403,29 @@ void processArguments() throws Exception {
             }
 
             Resources resources = functionConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            functionConfig.setResources(resources);
+            if (resources != null) {
+                functionConfig.setResources(resources);
+            }
 
             if (timeoutMs != null) {
                 functionConfig.setTimeoutMs(timeoutMs);
@@ -452,7 +460,9 @@ void processArguments() throws Exception {
 
             functionConfig.setWindowConfig(windowConfig);
 
-            functionConfig.setAutoAck(autoAck);
+            if (autoAck != null) {
+                functionConfig.setAutoAck(autoAck);
+            }
 
             if (null != maxMessageRetries) {
                 functionConfig.setMaxMessageRetries(maxMessageRetries);
@@ -711,6 +721,24 @@ void runCmd() throws Exception {
 
     @Parameters(commandDescription = "Update a Pulsar Function that's been 
deployed to a Pulsar cluster")
     class UpdateFunction extends FunctionDetailsCommand {
+
+        @Override
+        protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+            if (StringUtils.isEmpty(functionConfig.getClassName())) {
+                if (StringUtils.isEmpty(functionConfig.getName())) {
+                    throw new IllegalArgumentException("Function Name not 
provided");
+                }
+            } else if (StringUtils.isEmpty(functionConfig.getName())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
+            }
+            if (StringUtils.isEmpty(functionConfig.getTenant())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
+            }
+            if (StringUtils.isEmpty(functionConfig.getNamespace())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
+            }
+        }
+
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 594aa1b4d4..662b81cd48 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -210,6 +210,10 @@ void runCmd() throws Exception {
             }
             print("Updated successfully");
         }
+
+        protected void validateSinkConfigs(SinkConfig sinkConfig) {
+            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+        }
     }
 
     abstract class SinkDetailsCommand extends BaseCommand {
@@ -253,7 +257,7 @@ void runCmd() throws Exception {
         @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order", hidden = true)
         protected Boolean DEPRECATED_retainOrdering;
         @Parameter(names = "--retain-ordering", description = "Sink consumes 
and sinks messages in order")
-        protected boolean retainOrdering;
+        protected Boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The sink's 
parallelism factor (i.e. the number of sink instances to run)")
         protected Integer parallelism;
         @Parameter(names = {"-a", "--archive"}, description = "Path to the 
archive file for the sink. It also supports url-path [http/https/file (file 
protocol assumes that file already exists on worker host)] from which worker 
can download the package.", listConverter = StringConverter.class)
@@ -280,7 +284,7 @@ void runCmd() throws Exception {
         @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
         protected String sinkConfigString;
         @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
-        protected boolean autoAck = true;
+        protected Boolean autoAck;
         @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
         protected Long timeoutMs;
 
@@ -329,7 +333,9 @@ void processArguments() throws Exception {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
 
-            sinkConfig.setRetainOrdering(retainOrdering);
+            if (retainOrdering != null) {
+                sinkConfig.setRetainOrdering(retainOrdering);
+            }
 
             if (null != inputs) {
                 sinkConfig.setInputs(Arrays.asList(inputs.split(",")));
@@ -371,27 +377,37 @@ void processArguments() throws Exception {
             }
 
             Resources resources = sinkConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            sinkConfig.setResources(resources);
+            if (resources != null) {
+                sinkConfig.setResources(resources);
+            }
 
             if (null != sinkConfigString) {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
 
-            sinkConfig.setAutoAck(autoAck);
+            if (autoAck != null) {
+                sinkConfig.setAutoAck(autoAck);
+            }
             if (timeoutMs != null) {
                 sinkConfig.setTimeoutMs(timeoutMs);
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 92b4f6c710..3e1b6ab221 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -214,6 +214,10 @@ void runCmd() throws Exception {
             }
             print("Updated successfully");
         }
+
+        protected void validateSourceConfigs(SourceConfig sourceConfig) {
+            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+        }
     }
 
     abstract class SourceDetailsCommand extends BaseCommand {
@@ -337,21 +341,29 @@ void processArguments() throws Exception {
             }
 
             Resources resources = sourceConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            sourceConfig.setResources(resources);
+            if (resources != null) {
+                sourceConfig.setResources(resources);
+            }
 
             if (null != sourceConfigString) {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index a52b7b841c..195d2e7726 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -128,7 +128,6 @@ public SinkConfig getSinkConfig() {
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
         sinkConfig.setName(NAME);
-        sinkConfig.setAutoAck(true);
 
         sinkConfig.setInputs(INPUTS_LIST);
         sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
@@ -450,7 +449,7 @@ public void testCmdSinkConfigFileMissingResources() throws 
Exception {
         testSinkConfig.setResources(null);
 
         SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setResources(new Resources());
+        expectedSinkConfig.setResources(null);
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2f63626535..2eb951a97f 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -352,7 +352,7 @@ public void testCmdSourceConfigFileMissingResources() 
throws Exception {
         testSourceConfig.setResources(null);
 
         SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setResources(new Resources());
+        expectedSourceConfig.setResources(null);
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
index 7652336642..832c90321b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
@@ -18,15 +18,13 @@
  */
 package org.apache.pulsar.common.functions;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@EqualsAndHashCode
 public class ConsumerConfig {
     private String schemaType;
     private String serdeClassName;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 2bcc5a6056..bf271556ee 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -24,20 +24,16 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import lombok.*;
 
 @Getter
 @Setter
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class FunctionConfig {
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
index ff8cb5589a..fde3d38801 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
@@ -27,6 +27,7 @@
 @ToString
 @AllArgsConstructor
 @NoArgsConstructor
+@Builder(toBuilder=true)
 public class Resources {
     // Default cpu is 1 core
     private Double cpu = 1d;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
index 33f742bd3d..755201a033 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pulsar.common.functions;
 
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import lombok.experimental.Accessors;
 
 @Data
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index 50fb662504..b85de0633c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -22,11 +22,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -36,6 +32,9 @@
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 public class SinkConfig {
 
     private String tenant;
@@ -63,9 +62,9 @@
     private Map<String, Object> secrets;
     private Integer parallelism;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
-    private boolean retainOrdering;
+    private Boolean retainOrdering;
     private Resources resources;
-    private boolean autoAck;
+    private Boolean autoAck;
     private Long timeoutMs;
 
     private String archive;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 3ca7aadfd2..88955b8611 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -18,11 +18,7 @@
  */
 package org.apache.pulsar.common.io;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 
@@ -33,6 +29,9 @@
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 public class SourceConfig {
     private String tenant;
     private String namespace;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 7fcf5bb994..2459cfdf5b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -249,7 +249,9 @@ public static FunctionConfig 
convertFromDetails(FunctionDetails functionDetails)
             
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         functionConfig.setAutoAck(functionDetails.getAutoAck());
-        
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (functionDetails.getSource().getTimeoutMs() != 0) {
+            
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        }
         if (!isEmpty(functionDetails.getSink().getTopic())) {
             functionConfig.setOutput(functionDetails.getSink().getTopic());
         }
@@ -566,4 +568,112 @@ public static ClassLoader validate(FunctionConfig 
functionConfig, String functio
             return null;
         }
     }
+
+    public static FunctionConfig validateUpdate(FunctionConfig existingConfig, 
FunctionConfig newConfig) {
+        FunctionConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Function Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (newConfig.getInputs() != null) {
+            newConfig.getInputs().forEach((topicName -> {
+                newConfig.getInputSpecs().put(topicName,
+                        
ConsumerConfig.builder().isRegexPattern(false).build());
+            }));
+        }
+        if (newConfig.getTopicsPattern() != null && 
!newConfig.getTopicsPattern().isEmpty()) {
+            newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
+                    ConsumerConfig.builder()
+                            .isRegexPattern(true)
+                            .build());
+        }
+        if (newConfig.getCustomSerdeInputs() != null) {
+            newConfig.getCustomSerdeInputs().forEach((topicName, 
serdeClassName) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .serdeClassName(serdeClassName)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (newConfig.getCustomSchemaInputs() != null) {
+            newConfig.getCustomSchemaInputs().forEach((topicName, 
schemaClassname) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .schemaType(schemaClassname)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (!newConfig.getInputSpecs().isEmpty()) {
+            newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
+                if (!existingConfig.getInputSpecs().containsKey(topicName)) {
+                    throw new IllegalArgumentException("Input Topics cannot be 
altered");
+                }
+                if 
(!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
+                    throw new IllegalArgumentException("Input Specs mismatch");
+                }
+            });
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutput()) && 
!newConfig.getOutput().equals(existingConfig.getOutput())) {
+            throw new IllegalArgumentException("Output topics differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && 
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+            throw new IllegalArgumentException("Output Serde mismatch");
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && 
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+            throw new IllegalArgumentException("Output Schema mismatch");
+        }
+        if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
+            mergedConfig.setLogTopic(newConfig.getLogTopic());
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getRetainOrdering() != null && 
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
+            throw new IllegalArgumentException("Retain Orderning cannot be 
altered");
+        }
+        if (newConfig.getUserConfig() != null) {
+            mergedConfig.setUserConfig(newConfig.getUserConfig());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getRuntime() != null && 
!newConfig.getRuntime().equals(existingConfig.getRuntime())) {
+            throw new IllegalArgumentException("Runtime cannot be altered");
+        }
+        if (newConfig.getAutoAck() != null && 
!newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
+            throw new IllegalArgumentException("AutoAck cannot be altered");
+        }
+        if (newConfig.getMaxMessageRetries() != null) {
+            
mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries());
+        }
+        if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) {
+            mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSubName()) && 
!newConfig.getSubName().equals(existingConfig.getSubName())) {
+            throw new IllegalArgumentException("Subscription Name cannot be 
altered");
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (newConfig.getWindowConfig() != null) {
+            mergedConfig.setWindowConfig(newConfig.getWindowConfig());
+        }
+        if (newConfig.getTimeoutMs() != null) {
+            mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
+        }
+        return mergedConfig;
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
index ee46cebf0a..5fbda31fab 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
@@ -33,4 +33,22 @@ public static void validate(Resources resources) {
         com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0L,
                 "The disk allocation for the function must be positive");
     }
+
+    public static Resources merge(Resources existingResources, Resources 
newResources) {
+        Resources mergedResources;
+        if (existingResources != null) {
+            mergedResources = existingResources.toBuilder().build();
+        } else {
+            mergedResources = new Resources();
+        }
+        if (newResources.getCpu() != null) {
+            mergedResources.setCpu(newResources.getCpu());
+        }if (newResources.getRam() != null) {
+            mergedResources.setRam(newResources.getRam());
+        }
+        if (newResources.getDisk() != null) {
+            mergedResources.setDisk(newResources.getDisk());
+        }
+        return mergedResources;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 375113697d..d65b87fd4d 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -42,8 +42,6 @@
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 
@@ -144,13 +142,17 @@ public static FunctionDetails convert(SinkConfig 
sinkConfig, NarClassLoader clas
             
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
         }
 
-        Function.SubscriptionType subType = (sinkConfig.isRetainOrdering()
+        Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() 
!= null && sinkConfig.getRetainOrdering())
                 || 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
                 ? Function.SubscriptionType.FAILOVER
                 : Function.SubscriptionType.SHARED;
         sourceSpecBuilder.setSubscriptionType(subType);
 
-        functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+        if (sinkConfig.getAutoAck() != null) {
+            functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck());
+        } else {
+            functionDetailsBuilder.setAutoAck(true);
+        }
         if (sinkConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
         }
@@ -226,7 +228,9 @@ public static SinkConfig convertFromDetails(FunctionDetails 
functionDetails) {
             
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         sinkConfig.setAutoAck(functionDetails.getAutoAck());
-        sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (functionDetails.getSource().getTimeoutMs() != 0) {
+            
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        }
         if (!isEmpty(functionDetails.getSink().getClassName())) {
             sinkConfig.setClassName(functionDetails.getSink().getClassName());
         }
@@ -351,4 +355,91 @@ public static NarClassLoader validate(SinkConfig 
sinkConfig, Path archivePath, S
         }
         return retval;
     }
+
+    public static SinkConfig validateUpdate(SinkConfig existingConfig, 
SinkConfig newConfig) {
+        SinkConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Sink Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && 
!newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName()))
 {
+            throw new IllegalArgumentException("Subscription Name cannot be 
altered");
+        }
+        if (newConfig.getInputs() != null) {
+            newConfig.getInputs().forEach((topicName -> {
+                newConfig.getInputSpecs().put(topicName,
+                        
ConsumerConfig.builder().isRegexPattern(false).build());
+            }));
+        }
+        if (newConfig.getTopicsPattern() != null && 
!newConfig.getTopicsPattern().isEmpty()) {
+            newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
+                    ConsumerConfig.builder()
+                            .isRegexPattern(true)
+                            .build());
+        }
+        if (newConfig.getTopicToSerdeClassName() != null) {
+            newConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .serdeClassName(serdeClassName)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (newConfig.getTopicToSchemaType() != null) {
+            newConfig.getTopicToSchemaType().forEach((topicName, 
schemaClassname) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .schemaType(schemaClassname)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (!newConfig.getInputSpecs().isEmpty()) {
+            newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
+                if (!existingConfig.getInputSpecs().containsKey(topicName)) {
+                    throw new IllegalArgumentException("Input Topics cannot be 
altered");
+                }
+                if 
(!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
+                    throw new IllegalArgumentException("Input Specs mismatch");
+                }
+            });
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getConfigs() != null) {
+            mergedConfig.setConfigs(newConfig.getConfigs());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getRetainOrdering() != null && 
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
+            throw new IllegalArgumentException("Retain Orderning cannot be 
altered");
+        }
+        if (newConfig.getAutoAck() != null && 
!newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
+            throw new IllegalArgumentException("AutoAck cannot be altered");
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (newConfig.getTimeoutMs() != null) {
+            mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
+        }
+        if (!StringUtils.isEmpty(newConfig.getArchive())) {
+            mergedConfig.setArchive(newConfig.getArchive());
+        }
+        return mergedConfig;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 2245580458..a7baea219b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -240,4 +240,48 @@ public static NarClassLoader validate(SourceConfig 
sourceConfig, Path archivePat
         return classLoader;
     }
 
+    public static SourceConfig validateUpdate(SourceConfig existingConfig, 
SourceConfig newConfig) {
+        SourceConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Function Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getTopicName()) && 
!newConfig.getTopicName().equals(existingConfig.getTopicName())) {
+            throw new IllegalArgumentException("Destination topics differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
+            mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSchemaType())) {
+            mergedConfig.setSchemaType(newConfig.getSchemaType());
+        }
+        if (newConfig.getConfigs() != null) {
+            mergedConfig.setConfigs(newConfig.getConfigs());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (!StringUtils.isEmpty(newConfig.getArchive())) {
+            mergedConfig.setArchive(newConfig.getArchive());
+        }
+        return mergedConfig;
+    }
+
 }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 6d71d45292..0f4f400543 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -22,14 +22,18 @@
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -92,6 +96,293 @@ public void testConvertWindow() {
         );
     }
 
+    @Test
+    public void testMergeEqual() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createFunctionConfig();
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Function Names differ")
+    public void testMergeDifferentName() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("name", 
"Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("tenant", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("namespace", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("className", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(functionConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
+    public void testMergeDifferentInputs() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("topicsPattern", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Output topics differ")
+    public void testMergeDifferentOutput() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("output", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentLogTopic() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("logTopic", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getLogTopic(),
+                "Different"
+        );
+        mergedConfig.setLogTopic(functionConfig.getLogTopic());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    public void testMergeDifferentRetainOrdering() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("retainOrdering", true);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("userConfig", myConfig);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getUserConfig(),
+                myConfig
+        );
+        mergedConfig.setUserConfig(functionConfig.getUserConfig());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("secrets", mySecrets);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(functionConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Runtime cannot be altered")
+    public void testMergeDifferentRuntime() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("runtime", PYTHON);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "AutoAck cannot be altered")
+    public void testMergeDifferentAutoAck() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("autoAck", false);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentMaxMessageRetries() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("maxMessageRetries", 10);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getMaxMessageRetries(),
+                new Integer(10)
+        );
+        
mergedConfig.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentDeadLetterTopic() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("deadLetterTopic", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getDeadLetterTopic(),
+                "Different"
+        );
+        mergedConfig.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Subscription Name cannot be altered")
+    public void testMergeDifferentSubname() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("subName", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("parallelism", 101);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(functionConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("resources", resources);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(functionConfig.getResources());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentWindowConfig() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        WindowConfig windowConfig = new WindowConfig();
+        windowConfig.setSlidingIntervalCount(123);
+        windowConfig.setSlidingIntervalDurationMs(123l);
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("windowConfig", windowConfig);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getWindowConfig(),
+                windowConfig
+        );
+        mergedConfig.setWindowConfig(functionConfig.getWindowConfig());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTimeout() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("timeoutMs", 102l);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getTimeoutMs(),
+                new Long(102l)
+        );
+        mergedConfig.setTimeoutMs(functionConfig.getTimeoutMs());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private FunctionConfig createFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant("test-tenant");
+        functionConfig.setNamespace("test-namespace");
+        functionConfig.setName("test-function");
+        functionConfig.setParallelism(1);
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        functionConfig.setInputSpecs(inputSpecs);
+        functionConfig.setOutput("test-output");
+        functionConfig.setOutputSerdeClassName("test-serde");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        functionConfig.setRetainOrdering(false);
+        functionConfig.setUserConfig(new HashMap<>());
+        functionConfig.setAutoAck(true);
+        functionConfig.setTimeoutMs(2000l);
+        functionConfig.setWindowConfig(new 
WindowConfig().setWindowLengthCount(10));
+        return functionConfig;
+    }
+
+    private FunctionConfig createUpdatedFunctionConfig(String fieldName, 
Object fieldValue) {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Class<?> fClass = FunctionConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(functionConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return functionConfig;
+    }
+
     @Test
     public void testFunctionConfigConvertFromDetails() {
         String name = "test1";
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 0a68c85e9d..858cd72966 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -21,14 +21,18 @@
 import com.google.gson.Gson;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -60,4 +64,207 @@ public void testConvertBackFidelity() throws IOException  {
                 new Gson().toJson(convertedConfig)
         );
     }
+
+    @Test
+    public void testMergeEqual() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createSinkConfig();
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Sink Names differ")
+    public void testMergeDifferentName() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("name", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("tenant", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("namespace", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("className", 
"Different");
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(sinkConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
+    public void testMergeDifferentInputs() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("topicsPattern", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = 
createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    public void testMergeDifferentRetainOrdering() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("retainOrdering", 
true);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("configs", 
myConfig);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getConfigs(),
+                myConfig
+        );
+        mergedConfig.setConfigs(sinkConfig.getConfigs());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("secrets", 
mySecrets);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(sinkConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "AutoAck cannot be altered")
+    public void testMergeDifferentAutoAck() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("autoAck", false);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Subscription Name cannot be altered")
+    public void testMergeDifferentSubname() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = 
createUpdatedSinkConfig("sourceSubscriptionName", "Different");
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("parallelism", 101);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(sinkConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("resources", 
resources);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(sinkConfig.getResources());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTimeout() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("timeoutMs", 102l);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getTimeoutMs(),
+                new Long(102l)
+        );
+        mergedConfig.setTimeoutMs(sinkConfig.getTimeoutMs());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private SinkConfig createSinkConfig() {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant("test-tenant");
+        sinkConfig.setNamespace("test-namespace");
+        sinkConfig.setName("test-sink");
+        sinkConfig.setParallelism(1);
+        sinkConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        sinkConfig.setInputSpecs(inputSpecs);
+        
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setRetainOrdering(false);
+        sinkConfig.setConfigs(new HashMap<>());
+        sinkConfig.setAutoAck(true);
+        sinkConfig.setTimeoutMs(2000l);
+        sinkConfig.setArchive("DummyArchive.nar");
+        return sinkConfig;
+    }
+
+    private SinkConfig createUpdatedSinkConfig(String fieldName, Object 
fieldValue) {
+        SinkConfig sinkConfig = createSinkConfig();
+        Class<?> fClass = SinkConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(sinkConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return sinkConfig;
+    }
 }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index dcaf2e9f61..7cb7d3c5b5 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -20,13 +20,18 @@
 
 import com.google.gson.Gson;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
+import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -53,4 +58,165 @@ public void testConvertBackFidelity() throws IOException  {
                 new Gson().toJson(convertedConfig)
         );
     }
+
+    @Test
+    public void testMergeEqual() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createSourceConfig();
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Function Names differ")
+    public void testMergeDifferentName() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("name", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("tenant", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("namespace", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("className", 
"Different");
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(sourceConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Destination topics differ")
+    public void testMergeDifferentOutput() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = 
createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("configs", 
myConfig);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getConfigs(),
+                myConfig
+        );
+        mergedConfig.setConfigs(sourceConfig.getConfigs());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("secrets", 
mySecrets);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(sourceConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = 
createUpdatedSourceConfig("parallelism", 101);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(sourceConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("resources", 
resources);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(sourceConfig.getResources());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private SourceConfig createSourceConfig() {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant("test-tenant");
+        sourceConfig.setNamespace("test-namespace");
+        sourceConfig.setName("test-source");
+        sourceConfig.setParallelism(1);
+        sourceConfig.setClassName(IdentityFunction.class.getName());
+        sourceConfig.setTopicName("test-output");
+        sourceConfig.setSerdeClassName("test-serde");
+        
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setConfigs(new HashMap<>());
+        return sourceConfig;
+    }
+
+    private SourceConfig createUpdatedSourceConfig(String fieldName, Object 
fieldValue) {
+        SourceConfig sourceConfig = createSourceConfig();
+        Class<?> fClass = SourceConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(sourceConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return sourceConfig;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 2cd51b84ea..7defdf6ee5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -125,8 +125,14 @@ public static void downloadFromHttpUrl(String destPkgUrl, 
FileOutputStream outpu
     }
 
     public static void downloadFromBookkeeper(Namespace namespace,
-                                                 OutputStream outputStream,
-                                                 String packagePath) throws 
IOException {
+                                              File outputFile,
+                                              String packagePath) throws 
IOException {
+        downloadFromBookkeeper(namespace, new FileOutputStream(outputFile), 
packagePath);
+    }
+
+    public static void downloadFromBookkeeper(Namespace namespace,
+                                              OutputStream outputStream,
+                                              String packagePath) throws 
IOException {
         DistributedLogManager dlm = namespace.openLog(packagePath);
         try (InputStream in = new DLInputStream(dlm)) {
             int read = 0;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 18de1eb734..627f5ccd0f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -329,6 +329,19 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
             return getUnavailableResponse();
         }
 
+        if (tenant == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant is not provided")).build();
+        }
+        if (namespace == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Namespace is not 
provided")).build();
+        }
+        if (componentName == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(componentType + " Name is not 
provided")).build();
+        }
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
@@ -349,23 +362,83 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
                     .entity(new ErrorData(String.format("%s %s doesn't exist", 
componentType, componentName))).build();
         }
 
+        String mergedComponentConfigJson;
+        String existingComponentConfigJson;
+
+        FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (componentType.equals(FUNCTION)) {
+            FunctionConfig existingFunctionConfig = 
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingFunctionConfig);
+            FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            functionConfig.setTenant(tenant);
+            functionConfig.setNamespace(namespace);
+            functionConfig.setName(componentName);
+            try {
+                FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        } else if (componentType.equals(SOURCE)) {
+            SourceConfig existingSourceConfig = 
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingSourceConfig);
+            SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            sourceConfig.setTenant(tenant);
+            sourceConfig.setNamespace(namespace);
+            sourceConfig.setName(componentName);
+            try {
+                SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        } else {
+            SinkConfig existingSinkConfig = 
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingSinkConfig);
+            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            sinkConfig.setTenant(tenant);
+            sinkConfig.setNamespace(namespace);
+            sinkConfig.setName(componentName);
+            try {
+                SinkConfig mergedConfig = 
SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        }
+
+        if (existingComponentConfigJson.equals(mergedComponentConfigJson) && 
isBlank(functionPkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, componentName);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Update contains no 
change")).build();
+        }
+
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         File uploadedInputStreamAsFile = null;
         if (uploadedInputStream != null) {
             uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
         }
+        File existingPackageAsFile = null;
+
         // validate parameters
         try {
-            if (isPkgUrlProvided) {
+            if (isNotBlank(functionPkgUrl)) {
                 functionDetails = 
validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, 
functionPkgUrl,
-                        functionDetailsJson, componentConfigJson, 
componentType);
-            } else {
+                        functionDetailsJson, mergedComponentConfigJson, 
componentType);
+            } else if (uploadedInputStream != null) {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, componentConfigJson, 
componentType);
+                        fileDetail, functionDetailsJson, 
mergedComponentConfigJson, componentType);
+            } else {
+                functionDetails = 
validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, 
componentName, existingComponent.getPackageLocation(), 
mergedComponentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register {} request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            log.error("Invalid update {} request @ /{}/{}/{}", componentType, 
tenant, namespace, componentName, e);
             return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -383,12 +456,16 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-        try {
-            packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
-                    functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
-        } catch (Exception e) {
-            return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
-                    .build();
+        if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) {
+            try {
+                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
+                        functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+                return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
+                        .build();
+            }
+        } else {
+            packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
         }
 
         
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
@@ -1157,6 +1234,16 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
         return functionDetails;
     }
 
+    private FunctionDetails 
validateUpdateRequestParamsWithExistingMetadata(String tenant, String 
namespace, String componentName,
+                                                            
PackageLocationMetaData packageLocationMetaData,
+                                                            String 
componentConfigJson, String componentType) throws Exception {
+        File tmpFile = File.createTempFile("functions", null);
+        tmpFile.deleteOnExit();
+        Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, 
packageLocationMetaData.getPackagePath());
+        return validateUpdateRequestParams(tenant, namespace, componentName,
+                null, componentConfigJson, componentType, null, tmpFile);
+    }
+
     private static File dumpToTmpFile(InputStream uploadedInputStream) {
         try {
             File tmpFile = File.createTempFile("functions", null);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index e03d90d3f1..5275cf0834 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -126,6 +126,7 @@ public String process(String input, Context context) {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetadata;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -140,6 +141,7 @@ public void setup() throws Exception {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctionMetadata = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
@@ -153,6 +155,7 @@ public void setup() throws Exception {
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetadata);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
@@ -434,16 +437,7 @@ private void testRegisterFunctionMissingArguments(
     }
 
     private Response registerDefaultFunction() {
-        FunctionConfig functionConfig = new FunctionConfig();
-        functionConfig.setTenant(tenant);
-        functionConfig.setNamespace(namespace);
-        functionConfig.setName(function);
-        functionConfig.setClassName(className);
-        functionConfig.setParallelism(parallelism);
-        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
-        functionConfig.setOutput(outputTopic);
-        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -617,6 +611,9 @@ public void testUpdateFunctionMissingFunctionName() throws 
IOException {
 
     @Test
     public void testUpdateFunctionMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
             tenant,
             namespace,
@@ -628,55 +625,104 @@ public void testUpdateFunctionMissingPackage() throws 
IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateFunctionMissingInputTopic() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
-                mockedInputStream,
+                null,
                 null,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
                 className,
                 parallelism,
-                "No input topic(s) specified for the function");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateFunctionMissingPackageDetails() throws IOException {
+    public void testUpdateFunctionMissingClassName() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
             tenant,
             namespace,
             function,
-            mockedInputStream,
-            topicsToSerDeClassName,
             null,
+            topicsToSerDeClassName,
+            mockedFormData,
             outputTopic,
                 outputSerdeClassName,
-            className,
+            null,
             parallelism,
-                "Function Package is not provided");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateFunctionMissingClassName() throws IOException {
+    public void testUpdateFunctionChangedParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
                 outputSerdeClassName,
-            null,
-            parallelism,
-                "Function classname cannot be null");
+                null,
+                parallelism + 1,
+                null);
+    }
+
+    @Test
+    public void testUpdateFunctionChangedInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                "DifferentOutput",
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Output topics differ");
+    }
+
+    @Test
+    public void testUpdateFunctionChangedOutput() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+        Map<String, String> someOtherInput = new HashMap<>();
+        someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                someOtherInput,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Input Topics cannot be altered");
     }
 
     private void testUpdateFunctionMissingArguments(
@@ -720,6 +766,14 @@ private void testUpdateFunctionMissingArguments(
         }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -732,8 +786,12 @@ private void testUpdateFunctionMissingArguments(
             FunctionsImpl.FUNCTION,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultFunction() throws IOException {
@@ -1269,4 +1327,23 @@ public void testRegisterFunctionWithConflictingFields() 
throws IOException {
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
+
+    public static FunctionConfig createDefaultFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        return functionConfig;
+    }
+
+    public static FunctionDetails createDefaultFunctionDetails() {
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        return FunctionConfigUtils.convert(functionConfig, null);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 5e710e03d5..a40e0adbad 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -28,6 +28,8 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -39,10 +41,12 @@
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
+import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -60,12 +64,15 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
+import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -81,7 +88,7 @@
 /**
  * Unit test of {@link SinkApiV2Resource}.
  */
-@PrepareForTest({Utils.class, SinkConfigUtils.class})
+@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, 
org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SinkApiV2ResourceTest {
@@ -96,7 +103,7 @@ public IObjectFactory getObjectFactory() {
     private static final String sink = "test-sink";
     private static final Map<String, String> topicsToSerDeClassName = new 
HashMap<>();
     static {
-        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
TopicSchema.DEFAULT_SERDE);
+        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
DEFAULT_SERDE);
     }
     private static final String subscriptionName = "test-subscription";
     private static final String className = 
CassandraStringSink.class.getName();
@@ -119,6 +126,7 @@ public IObjectFactory getObjectFactory() {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetaData;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -372,13 +380,7 @@ private void testRegisterSinkMissingArguments(
     }
 
     private Response registerDefaultSink() throws IOException {
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setTenant(tenant);
-        sinkConfig.setNamespace(namespace);
-        sinkConfig.setName(sink);
-        sinkConfig.setClassName(className);
-        sinkConfig.setParallelism(parallelism);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        SinkConfig sinkConfig = createDefaultSinkConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -573,6 +575,10 @@ public void testUpdateSinkMissingFunctionName() throws 
IOException {
 
     @Test
     public void testUpdateSinkMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSinkMissingArguments(
             tenant,
             namespace,
@@ -582,21 +588,63 @@ public void testUpdateSinkMissingPackage() throws 
IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Sink Package is not provided");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateSinkMissingPackageDetails() throws IOException {
+    public void testUpdateSinkMissingInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSinkMissingArguments(
-            tenant,
-            namespace,
+                tenant,
+                namespace,
                 sink,
-            mockedInputStream,
-            null,
-            topicsToSerDeClassName,
-            className,
-            parallelism,
-                "zip file is empty");
+                null,
+                mockedFormData,
+                null,
+                className,
+                parallelism,
+                "Update contains no change");
+    }
+
+    @Test
+    public void testUpdateSinkDifferentInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        Map<String, String> inputTopics = new HashMap<>();
+        inputTopics.put("DifferntTopic", DEFAULT_SERDE);
+        testUpdateSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                mockedFormData,
+                inputTopics,
+                className,
+                parallelism,
+                "Input Topics cannot be altered");
+    }
+
+    @Test
+    public void testUpdateSinkDifferentParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                parallelism + 1,
+                null);
     }
 
     private void testUpdateSinkMissingArguments(
@@ -609,6 +657,24 @@ private void testUpdateSinkMissingArguments(
             String className,
             Integer parallelism,
             String expectedError) throws IOException {
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
         SinkConfig sinkConfig = new SinkConfig();
@@ -631,6 +697,14 @@ private void testUpdateSinkMissingArguments(
             sinkConfig.setParallelism(parallelism);
         }
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -643,8 +717,12 @@ private void testUpdateSinkMissingArguments(
                 FunctionsImpl.SINK,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultSink() throws IOException {
@@ -656,6 +734,24 @@ private Response updateDefaultSink() throws IOException {
         sinkConfig.setParallelism(parallelism);
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
 
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         return resource.updateFunction(
             tenant,
             namespace,
@@ -730,6 +826,24 @@ public void testUpdateSinkWithUrl() throws IOException {
         sinkConfig.setParallelism(parallelism);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("source registered");
@@ -974,7 +1088,7 @@ public void testGetSinkSuccess() throws Exception {
                 .setSubscriptionType(Function.SubscriptionType.SHARED)
                 .setSubscriptionName(subscriptionName)
                 .putInputSpecs("input", Function.ConsumerSpec.newBuilder()
-                .setSerdeClassName(TopicSchema.DEFAULT_SERDE)
+                .setSerdeClassName(DEFAULT_SERDE)
                 .setIsRegexPattern(false)
                 .build()).build();
         Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder()
@@ -985,7 +1099,7 @@ public void testGetSinkSuccess() throws Exception {
                 .setSink(sinkSpec)
                 .setName(sink)
                 .setNamespace(namespace)
-                
.setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+                .setProcessingGuarantees(ATLEAST_ONCE)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
                 .setRuntime(FunctionDetails.Runtime.JAVA)
@@ -1101,4 +1215,19 @@ public void testRegisterFunctionNonexistantTenant() 
throws Exception {
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
     }
+
+    private SinkConfig createDefaultSinkConfig() {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        return sinkConfig;
+    }
+
+    private FunctionDetails createDefaultFunctionDetails() throws IOException {
+        return SinkConfigUtils.convert(createDefaultSinkConfig(), null);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 0c420a3e2f..f60afe4a47 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -28,6 +28,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -37,6 +38,7 @@
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -55,6 +57,7 @@
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
 import java.net.URL;
+import java.nio.file.Path;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -72,7 +75,7 @@
 /**
  * Unit test of {@link SourceApiV2Resource}.
  */
-@PrepareForTest({Utils.class})
+@PrepareForTest({Utils.class, ConnectorUtils.class, 
org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SourceApiV2ResourceTest {
@@ -107,6 +110,7 @@ public IObjectFactory getObjectFactory() {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetaData;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -342,14 +346,7 @@ private void testRegisterSourceMissingArguments(
     }
 
     private Response registerDefaultSource() throws IOException {
-        SourceConfig sourceConfig = new SourceConfig();
-        sourceConfig.setTenant(tenant);
-        sourceConfig.setNamespace(namespace);
-        sourceConfig.setName(source);
-        sourceConfig.setClassName(className);
-        sourceConfig.setParallelism(parallelism);
-        sourceConfig.setTopicName(outputTopic);
-        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        SourceConfig sourceConfig = createDefaultSourceConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -548,56 +545,53 @@ public void testUpdateSourceMissingFunctionName() throws 
IOException {
 
     @Test
     public void testUpdateSourceMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
             tenant,
             namespace,
                 source,
-            null,
+                null,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Source Package is not provided");
-    }
-
-    @Test
-    public void testUpdateSourceMissingPackageDetails() throws IOException {
-        testUpdateSourceMissingArguments(
-            tenant,
-            namespace,
-                source,
-            mockedInputStream,
-            null,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "zip file is empty");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateSourceMissingTopicName() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
                 source,
-                mockedInputStream,
+                null,
                 mockedFormData,
                 null,
                 outputSerdeClassName,
                 className,
                 parallelism,
-                "Topic name cannot be null");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateSourceNegativeParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
                 source,
-                mockedInputStream,
+                null,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
@@ -606,8 +600,50 @@ public void testUpdateSourceNegativeParallelism() throws 
IOException {
                 "Source parallelism should positive number");
     }
 
+    @Test
+    public void testUpdateSourceChangedParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism + 1,
+                null);
+    }
+
+    @Test
+    public void testUpdateSourceChangedTopic() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                mockedFormData,
+                "DifferentTopic",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "Destination topics differ");
+    }
+
     @Test
     public void testUpdateSourceZeroParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
@@ -632,6 +668,21 @@ private void testUpdateSourceMissingArguments(
             String className,
             Integer parallelism,
             String expectedError) throws IOException {
+
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
 
         SourceConfig sourceConfig = new SourceConfig();
@@ -657,6 +708,14 @@ private void testUpdateSourceMissingArguments(
             sourceConfig.setParallelism(parallelism);
         }
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -669,8 +728,12 @@ private void testUpdateSourceMissingArguments(
                 FunctionsImpl.SOURCE,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultSource() throws IOException {
@@ -683,6 +746,21 @@ private Response updateDefaultSource() throws IOException {
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
 
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
+
         return resource.updateFunction(
             tenant,
             namespace,
@@ -757,6 +835,21 @@ public void testUpdateSourceWithUrl() throws IOException {
         sourceConfig.setClassName(className);
         sourceConfig.setParallelism(parallelism);
 
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
@@ -1123,4 +1216,20 @@ public void testRegisterFunctionNonexistantTenant() 
throws Exception {
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
     }
+
+    private SourceConfig createDefaultSourceConfig() {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        return sourceConfig;
+    }
+
+    private FunctionDetails createDefaultFunctionDetails() throws IOException {
+        return SourceConfigUtils.convert(createDefaultSourceConfig(), null);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to