This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b0589b  Update Function Semantics (#2985)
1b0589b is described below

commit 1b0589bbe47211fc52430d757fa32a0cb784bcf5
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Nov 20 22:09:34 2018 -0500

    Update Function Semantics (#2985)
    
    * Make update functions better
    
    * Compiled
    
    * more checks
    
    * bug fix
    
    * Added tests
    
    * Tests pass
    
    * Fixed tests
    
    * Fixed tests
    
    * Added tests
    
    * Added unittests
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Timeout fix
    
    * Fixed unittest
    
    * Fix unittest
    
    * Addressed feedback
---
 .../worker/PulsarWorkerAssignmentTest.java         |  10 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |   7 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  42 ++-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  32 ++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  20 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |   3 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   2 +-
 .../pulsar/common/functions/ConsumerConfig.java    |   6 +-
 .../pulsar/common/functions/FunctionConfig.java    |  12 +-
 .../apache/pulsar/common/functions/Resources.java  |   1 +
 .../pulsar/common/functions/WindowConfig.java      |   5 +-
 .../org/apache/pulsar/common/io/SinkConfig.java    |  13 +-
 .../org/apache/pulsar/common/io/SourceConfig.java  |   9 +-
 .../functions/utils/FunctionConfigUtils.java       | 112 +++++++-
 .../functions/utils/ResourceConfigUtils.java       |  18 ++
 .../pulsar/functions/utils/SinkConfigUtils.java    | 101 ++++++-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  44 ++++
 .../functions/utils/FunctionConfigUtilsTest.java   | 291 +++++++++++++++++++++
 .../functions/utils/SinkConfigUtilsTest.java       | 207 +++++++++++++++
 .../functions/utils/SourceConfigUtilsTest.java     | 166 ++++++++++++
 .../org/apache/pulsar/functions/worker/Utils.java  |  10 +-
 .../functions/worker/rest/api/FunctionsImpl.java   | 111 +++++++-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 139 +++++++---
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  | 175 +++++++++++--
 .../rest/api/v2/SourceApiV2ResourceTest.java       | 171 +++++++++---
 25 files changed, 1541 insertions(+), 166 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index ed44b98..1b4c2aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -223,6 +223,7 @@ public class PulsarWorkerAssignmentTest {
         final String namespacePortion = "assignment-test";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String logTopic = "persistent://" + replNamespace + "/log-topic";
         final String baseFunctionName = "assign-restart";
         final String subscriptionName = "test-sub";
         final int totalFunctions = 5;
@@ -240,8 +241,7 @@ public class PulsarWorkerAssignmentTest {
             functionConfig = createFunctionConfig(tenant, namespacePortion, 
functionName,
                     "my.*", sinkTopic, subscriptionName);
             functionConfig.setParallelism(parallelism);
-            // set-auto-ack prop =true
-            functionConfig.setAutoAck(true);
+            // don't set any log topic
             admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
         }
         retryStrategically((test) -> {
@@ -263,8 +263,8 @@ public class PulsarWorkerAssignmentTest {
             functionConfig = createFunctionConfig(tenant, namespacePortion, 
functionName,
                     "my.*", sinkTopic, subscriptionName);
             functionConfig.setParallelism(parallelism);
-            // set-auto-ack prop =false
-            functionConfig.setAutoAck(false);
+            // Now set the log topic
+            functionConfig.setLogTopic(logTopic);
             admin.functions().updateFunctionWithUrl(functionConfig, 
jarFilePathUrl);
         }
 
@@ -308,7 +308,7 @@ public class PulsarWorkerAssignmentTest {
         // validate updated function prop = auto-ack=false and instnaceid
         for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
             String functionName = baseFunctionName + i;
-            assertFalse(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getAutoAck());
+            assertEquals(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getLogTopic(), logTopic);
         }
     }
 
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 91ff834..e0a0d4e 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import lombok.extern.slf4j.Slf4j;
@@ -35,8 +34,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
 import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
-import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
-import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -60,10 +57,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -205,7 +200,7 @@ public class CmdFunctionsTest {
         assertEquals(fnName, creater.getFunctionName());
         assertEquals(inputTopicName, creater.getInputs());
         assertEquals(outputTopicName, creater.getOutput());
-        assertEquals(false, creater.isAutoAck());
+        assertEquals(new Boolean(false), creater.getAutoAck());
 
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 28f9183..3a0d1e1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -287,14 +287,14 @@ public class CmdFunctions extends CmdBase {
         @Parameter(names = "--autoAck", description = "Whether or not the 
framework will automatically acknowleges messages", hidden = true)
         protected Boolean DEPRECATED_autoAck = null;
         @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
-        protected boolean autoAck = true;
+        protected Boolean autoAck;
         // for backwards compatibility purposes
         @Parameter(names = "--timeoutMs", description = "The message timeout 
in milliseconds", hidden = true)
         protected Long DEPRECATED_timeoutMs;
         @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
         protected Long timeoutMs;
         @Parameter(names = "--max-message-retries", description = "How many 
times should we try to process a message before giving up")
-        protected Integer maxMessageRetries = -1;
+        protected Integer maxMessageRetries;
         @Parameter(names = "--dead-letter-topic", description = "The topic 
where all messages which could not be processed successfully are sent")
         protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
@@ -403,21 +403,29 @@ public class CmdFunctions extends CmdBase {
             }
 
             Resources resources = functionConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            functionConfig.setResources(resources);
+            if (resources != null) {
+                functionConfig.setResources(resources);
+            }
 
             if (timeoutMs != null) {
                 functionConfig.setTimeoutMs(timeoutMs);
@@ -452,7 +460,9 @@ public class CmdFunctions extends CmdBase {
 
             functionConfig.setWindowConfig(windowConfig);
 
-            functionConfig.setAutoAck(autoAck);
+            if (autoAck != null) {
+                functionConfig.setAutoAck(autoAck);
+            }
 
             if (null != maxMessageRetries) {
                 functionConfig.setMaxMessageRetries(maxMessageRetries);
@@ -711,6 +721,24 @@ public class CmdFunctions extends CmdBase {
 
     @Parameters(commandDescription = "Update a Pulsar Function that's been 
deployed to a Pulsar cluster")
     class UpdateFunction extends FunctionDetailsCommand {
+
+        @Override
+        protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+            if (StringUtils.isEmpty(functionConfig.getClassName())) {
+                if (StringUtils.isEmpty(functionConfig.getName())) {
+                    throw new IllegalArgumentException("Function Name not 
provided");
+                }
+            } else if (StringUtils.isEmpty(functionConfig.getName())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
+            }
+            if (StringUtils.isEmpty(functionConfig.getTenant())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
+            }
+            if (StringUtils.isEmpty(functionConfig.getNamespace())) {
+                
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
+            }
+        }
+
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 594aa1b..662b81c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -210,6 +210,10 @@ public class CmdSinks extends CmdBase {
             }
             print("Updated successfully");
         }
+
+        protected void validateSinkConfigs(SinkConfig sinkConfig) {
+            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+        }
     }
 
     abstract class SinkDetailsCommand extends BaseCommand {
@@ -253,7 +257,7 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order", hidden = true)
         protected Boolean DEPRECATED_retainOrdering;
         @Parameter(names = "--retain-ordering", description = "Sink consumes 
and sinks messages in order")
-        protected boolean retainOrdering;
+        protected Boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The sink's 
parallelism factor (i.e. the number of sink instances to run)")
         protected Integer parallelism;
         @Parameter(names = {"-a", "--archive"}, description = "Path to the 
archive file for the sink. It also supports url-path [http/https/file (file 
protocol assumes that file already exists on worker host)] from which worker 
can download the package.", listConverter = StringConverter.class)
@@ -280,7 +284,7 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
         protected String sinkConfigString;
         @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
-        protected boolean autoAck = true;
+        protected Boolean autoAck;
         @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
         protected Long timeoutMs;
 
@@ -329,7 +333,9 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
 
-            sinkConfig.setRetainOrdering(retainOrdering);
+            if (retainOrdering != null) {
+                sinkConfig.setRetainOrdering(retainOrdering);
+            }
 
             if (null != inputs) {
                 sinkConfig.setInputs(Arrays.asList(inputs.split(",")));
@@ -371,27 +377,37 @@ public class CmdSinks extends CmdBase {
             }
 
             Resources resources = sinkConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            sinkConfig.setResources(resources);
+            if (resources != null) {
+                sinkConfig.setResources(resources);
+            }
 
             if (null != sinkConfigString) {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
 
-            sinkConfig.setAutoAck(autoAck);
+            if (autoAck != null) {
+                sinkConfig.setAutoAck(autoAck);
+            }
             if (timeoutMs != null) {
                 sinkConfig.setTimeoutMs(timeoutMs);
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 92b4f6c..3e1b6ab 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -214,6 +214,10 @@ public class CmdSources extends CmdBase {
             }
             print("Updated successfully");
         }
+
+        protected void validateSourceConfigs(SourceConfig sourceConfig) {
+            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+        }
     }
 
     abstract class SourceDetailsCommand extends BaseCommand {
@@ -337,21 +341,29 @@ public class CmdSources extends CmdBase {
             }
 
             Resources resources = sourceConfig.getResources();
-            if (resources == null) {
-                resources = new Resources();
-            }
             if (cpu != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setCpu(cpu);
             }
 
             if (ram != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setRam(ram);
             }
 
             if (disk != null) {
+                if (resources == null) {
+                    resources = new Resources();
+                }
                 resources.setDisk(disk);
             }
-            sourceConfig.setResources(resources);
+            if (resources != null) {
+                sourceConfig.setResources(resources);
+            }
 
             if (null != sourceConfigString) {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index a52b7b8..195d2e7 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -128,7 +128,6 @@ public class TestCmdSinks {
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
         sinkConfig.setName(NAME);
-        sinkConfig.setAutoAck(true);
 
         sinkConfig.setInputs(INPUTS_LIST);
         sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
@@ -450,7 +449,7 @@ public class TestCmdSinks {
         testSinkConfig.setResources(null);
 
         SinkConfig expectedSinkConfig = getSinkConfig();
-        expectedSinkConfig.setResources(new Resources());
+        expectedSinkConfig.setResources(null);
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 2f63626..2eb951a 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -352,7 +352,7 @@ public class TestCmdSources {
         testSourceConfig.setResources(null);
 
         SourceConfig expectedSourceConfig = getSourceConfig();
-        expectedSourceConfig.setResources(new Resources());
+        expectedSourceConfig.setResources(null);
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
index 7652336..832c903 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java
@@ -18,15 +18,13 @@
  */
 package org.apache.pulsar.common.functions;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@EqualsAndHashCode
 public class ConsumerConfig {
     private String schemaType;
     private String serdeClassName;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 2bcc5a6..bf27155 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -24,20 +24,16 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import lombok.*;
 
 @Getter
 @Setter
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class FunctionConfig {
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
index ff8cb55..fde3d38 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
@@ -27,6 +27,7 @@ import lombok.*;
 @ToString
 @AllArgsConstructor
 @NoArgsConstructor
+@Builder(toBuilder=true)
 public class Resources {
     // Default cpu is 1 core
     private Double cpu = 1d;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
index 33f742b..755201a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WindowConfig.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pulsar.common.functions;
 
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import lombok.experimental.Accessors;
 
 @Data
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index 50fb662..b85de06 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -22,11 +22,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -36,6 +32,9 @@ import org.apache.pulsar.common.functions.Resources;
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 public class SinkConfig {
 
     private String tenant;
@@ -63,9 +62,9 @@ public class SinkConfig {
     private Map<String, Object> secrets;
     private Integer parallelism;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
-    private boolean retainOrdering;
+    private Boolean retainOrdering;
     private Resources resources;
-    private boolean autoAck;
+    private Boolean autoAck;
     private Long timeoutMs;
 
     private String archive;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 3ca7aad..88955b8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -18,11 +18,7 @@
  */
 package org.apache.pulsar.common.io;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.*;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 
@@ -33,6 +29,9 @@ import java.util.Map;
 @Data
 @EqualsAndHashCode
 @ToString
+@Builder(toBuilder=true)
+@NoArgsConstructor
+@AllArgsConstructor
 public class SourceConfig {
     private String tenant;
     private String namespace;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 7fcf5bb..2459cfd 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -249,7 +249,9 @@ public class FunctionConfigUtils {
             
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         functionConfig.setAutoAck(functionDetails.getAutoAck());
-        
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (functionDetails.getSource().getTimeoutMs() != 0) {
+            
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        }
         if (!isEmpty(functionDetails.getSink().getTopic())) {
             functionConfig.setOutput(functionDetails.getSink().getTopic());
         }
@@ -566,4 +568,112 @@ public class FunctionConfigUtils {
             return null;
         }
     }
+
+    public static FunctionConfig validateUpdate(FunctionConfig existingConfig, 
FunctionConfig newConfig) {
+        FunctionConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Function Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (newConfig.getInputs() != null) {
+            newConfig.getInputs().forEach((topicName -> {
+                newConfig.getInputSpecs().put(topicName,
+                        
ConsumerConfig.builder().isRegexPattern(false).build());
+            }));
+        }
+        if (newConfig.getTopicsPattern() != null && 
!newConfig.getTopicsPattern().isEmpty()) {
+            newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
+                    ConsumerConfig.builder()
+                            .isRegexPattern(true)
+                            .build());
+        }
+        if (newConfig.getCustomSerdeInputs() != null) {
+            newConfig.getCustomSerdeInputs().forEach((topicName, 
serdeClassName) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .serdeClassName(serdeClassName)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (newConfig.getCustomSchemaInputs() != null) {
+            newConfig.getCustomSchemaInputs().forEach((topicName, 
schemaClassname) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .schemaType(schemaClassname)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (!newConfig.getInputSpecs().isEmpty()) {
+            newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
+                if (!existingConfig.getInputSpecs().containsKey(topicName)) {
+                    throw new IllegalArgumentException("Input Topics cannot be 
altered");
+                }
+                if 
(!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
+                    throw new IllegalArgumentException("Input Specs mismatch");
+                }
+            });
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutput()) && 
!newConfig.getOutput().equals(existingConfig.getOutput())) {
+            throw new IllegalArgumentException("Output topics differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && 
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+            throw new IllegalArgumentException("Output Serde mismatch");
+        }
+        if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && 
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+            throw new IllegalArgumentException("Output Schema mismatch");
+        }
+        if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
+            mergedConfig.setLogTopic(newConfig.getLogTopic());
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getRetainOrdering() != null && 
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
+            throw new IllegalArgumentException("Retain Orderning cannot be 
altered");
+        }
+        if (newConfig.getUserConfig() != null) {
+            mergedConfig.setUserConfig(newConfig.getUserConfig());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getRuntime() != null && 
!newConfig.getRuntime().equals(existingConfig.getRuntime())) {
+            throw new IllegalArgumentException("Runtime cannot be altered");
+        }
+        if (newConfig.getAutoAck() != null && 
!newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
+            throw new IllegalArgumentException("AutoAck cannot be altered");
+        }
+        if (newConfig.getMaxMessageRetries() != null) {
+            
mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries());
+        }
+        if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) {
+            mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSubName()) && 
!newConfig.getSubName().equals(existingConfig.getSubName())) {
+            throw new IllegalArgumentException("Subscription Name cannot be 
altered");
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (newConfig.getWindowConfig() != null) {
+            mergedConfig.setWindowConfig(newConfig.getWindowConfig());
+        }
+        if (newConfig.getTimeoutMs() != null) {
+            mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
+        }
+        return mergedConfig;
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
index ee46ceb..5fbda31 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ResourceConfigUtils.java
@@ -33,4 +33,22 @@ public class ResourceConfigUtils {
         com.google.common.base.Preconditions.checkArgument(disk == null || 
disk > 0L,
                 "The disk allocation for the function must be positive");
     }
+
+    public static Resources merge(Resources existingResources, Resources 
newResources) {
+        Resources mergedResources;
+        if (existingResources != null) {
+            mergedResources = existingResources.toBuilder().build();
+        } else {
+            mergedResources = new Resources();
+        }
+        if (newResources.getCpu() != null) {
+            mergedResources.setCpu(newResources.getCpu());
+        }if (newResources.getRam() != null) {
+            mergedResources.setRam(newResources.getRam());
+        }
+        if (newResources.getDisk() != null) {
+            mergedResources.setDisk(newResources.getDisk());
+        }
+        return mergedResources;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 3751136..d65b87f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -42,8 +42,6 @@ import java.util.*;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 
@@ -144,13 +142,17 @@ public class SinkConfigUtils {
             
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
         }
 
-        Function.SubscriptionType subType = (sinkConfig.isRetainOrdering()
+        Function.SubscriptionType subType = ((sinkConfig.getRetainOrdering() 
!= null && sinkConfig.getRetainOrdering())
                 || 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
                 ? Function.SubscriptionType.FAILOVER
                 : Function.SubscriptionType.SHARED;
         sourceSpecBuilder.setSubscriptionType(subType);
 
-        functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+        if (sinkConfig.getAutoAck() != null) {
+            functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck());
+        } else {
+            functionDetailsBuilder.setAutoAck(true);
+        }
         if (sinkConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
         }
@@ -226,7 +228,9 @@ public class SinkConfigUtils {
             
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         sinkConfig.setAutoAck(functionDetails.getAutoAck());
-        sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (functionDetails.getSource().getTimeoutMs() != 0) {
+            
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        }
         if (!isEmpty(functionDetails.getSink().getClassName())) {
             sinkConfig.setClassName(functionDetails.getSink().getClassName());
         }
@@ -351,4 +355,91 @@ public class SinkConfigUtils {
         }
         return retval;
     }
+
+    public static SinkConfig validateUpdate(SinkConfig existingConfig, 
SinkConfig newConfig) {
+        SinkConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Sink Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && 
!newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName()))
 {
+            throw new IllegalArgumentException("Subscription Name cannot be 
altered");
+        }
+        if (newConfig.getInputs() != null) {
+            newConfig.getInputs().forEach((topicName -> {
+                newConfig.getInputSpecs().put(topicName,
+                        
ConsumerConfig.builder().isRegexPattern(false).build());
+            }));
+        }
+        if (newConfig.getTopicsPattern() != null && 
!newConfig.getTopicsPattern().isEmpty()) {
+            newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
+                    ConsumerConfig.builder()
+                            .isRegexPattern(true)
+                            .build());
+        }
+        if (newConfig.getTopicToSerdeClassName() != null) {
+            newConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .serdeClassName(serdeClassName)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (newConfig.getTopicToSchemaType() != null) {
+            newConfig.getTopicToSchemaType().forEach((topicName, 
schemaClassname) -> {
+                newConfig.getInputSpecs().put(topicName,
+                        ConsumerConfig.builder()
+                                .schemaType(schemaClassname)
+                                .isRegexPattern(false)
+                                .build());
+            });
+        }
+        if (!newConfig.getInputSpecs().isEmpty()) {
+            newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
+                if (!existingConfig.getInputSpecs().containsKey(topicName)) {
+                    throw new IllegalArgumentException("Input Topics cannot be 
altered");
+                }
+                if 
(!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
+                    throw new IllegalArgumentException("Input Specs mismatch");
+                }
+            });
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getConfigs() != null) {
+            mergedConfig.setConfigs(newConfig.getConfigs());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getRetainOrdering() != null && 
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
+            throw new IllegalArgumentException("Retain Orderning cannot be 
altered");
+        }
+        if (newConfig.getAutoAck() != null && 
!newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
+            throw new IllegalArgumentException("AutoAck cannot be altered");
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (newConfig.getTimeoutMs() != null) {
+            mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
+        }
+        if (!StringUtils.isEmpty(newConfig.getArchive())) {
+            mergedConfig.setArchive(newConfig.getArchive());
+        }
+        return mergedConfig;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 2245580..a7baea2 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -240,4 +240,48 @@ public class SourceConfigUtils {
         return classLoader;
     }
 
+    public static SourceConfig validateUpdate(SourceConfig existingConfig, 
SourceConfig newConfig) {
+        SourceConfig mergedConfig = existingConfig.toBuilder().build();
+        if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
+            throw new IllegalArgumentException("Tenants differ");
+        }
+        if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
+            throw new IllegalArgumentException("Namespaces differ");
+        }
+        if (!existingConfig.getName().equals(newConfig.getName())) {
+            throw new IllegalArgumentException("Function Names differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getClassName())) {
+            mergedConfig.setClassName(newConfig.getClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getTopicName()) && 
!newConfig.getTopicName().equals(existingConfig.getTopicName())) {
+            throw new IllegalArgumentException("Destination topics differ");
+        }
+        if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
+            mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
+        }
+        if (!StringUtils.isEmpty(newConfig.getSchemaType())) {
+            mergedConfig.setSchemaType(newConfig.getSchemaType());
+        }
+        if (newConfig.getConfigs() != null) {
+            mergedConfig.setConfigs(newConfig.getConfigs());
+        }
+        if (newConfig.getSecrets() != null) {
+            mergedConfig.setSecrets(newConfig.getSecrets());
+        }
+        if (newConfig.getProcessingGuarantees() != null && 
!newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees()))
 {
+            throw new IllegalArgumentException("Processing Guarantess cannot 
be alterted");
+        }
+        if (newConfig.getParallelism() != null) {
+            mergedConfig.setParallelism(newConfig.getParallelism());
+        }
+        if (newConfig.getResources() != null) {
+            
mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(),
 newConfig.getResources()));
+        }
+        if (!StringUtils.isEmpty(newConfig.getArchive())) {
+            mergedConfig.setArchive(newConfig.getArchive());
+        }
+        return mergedConfig;
+    }
+
 }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 6d71d45..0f4f400 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -22,14 +22,18 @@ import com.google.gson.Gson;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -93,6 +97,293 @@ public class FunctionConfigUtilsTest {
     }
 
     @Test
+    public void testMergeEqual() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createFunctionConfig();
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Function Names differ")
+    public void testMergeDifferentName() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("name", 
"Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("tenant", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("namespace", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("className", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(functionConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
+    public void testMergeDifferentInputs() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("topicsPattern", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Output topics differ")
+    public void testMergeDifferentOutput() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("output", "Different");
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentLogTopic() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("logTopic", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getLogTopic(),
+                "Different"
+        );
+        mergedConfig.setLogTopic(functionConfig.getLogTopic());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    public void testMergeDifferentRetainOrdering() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("retainOrdering", true);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("userConfig", myConfig);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getUserConfig(),
+                myConfig
+        );
+        mergedConfig.setUserConfig(functionConfig.getUserConfig());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("secrets", mySecrets);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(functionConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Runtime cannot be altered")
+    public void testMergeDifferentRuntime() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("runtime", PYTHON);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "AutoAck cannot be altered")
+    public void testMergeDifferentAutoAck() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("autoAck", false);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentMaxMessageRetries() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("maxMessageRetries", 10);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getMaxMessageRetries(),
+                new Integer(10)
+        );
+        
mergedConfig.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentDeadLetterTopic() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("deadLetterTopic", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getDeadLetterTopic(),
+                "Different"
+        );
+        mergedConfig.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Subscription Name cannot be altered")
+    public void testMergeDifferentSubname() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("subName", "Different");
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("parallelism", 101);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(functionConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("resources", resources);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(functionConfig.getResources());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentWindowConfig() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        WindowConfig windowConfig = new WindowConfig();
+        windowConfig.setSlidingIntervalCount(123);
+        windowConfig.setSlidingIntervalDurationMs(123l);
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("windowConfig", windowConfig);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getWindowConfig(),
+                windowConfig
+        );
+        mergedConfig.setWindowConfig(functionConfig.getWindowConfig());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTimeout() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("timeoutMs", 102l);
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getTimeoutMs(),
+                new Long(102l)
+        );
+        mergedConfig.setTimeoutMs(functionConfig.getTimeoutMs());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private FunctionConfig createFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant("test-tenant");
+        functionConfig.setNamespace("test-namespace");
+        functionConfig.setName("test-function");
+        functionConfig.setParallelism(1);
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        functionConfig.setInputSpecs(inputSpecs);
+        functionConfig.setOutput("test-output");
+        functionConfig.setOutputSerdeClassName("test-serde");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        functionConfig.setRetainOrdering(false);
+        functionConfig.setUserConfig(new HashMap<>());
+        functionConfig.setAutoAck(true);
+        functionConfig.setTimeoutMs(2000l);
+        functionConfig.setWindowConfig(new 
WindowConfig().setWindowLengthCount(10));
+        return functionConfig;
+    }
+
+    private FunctionConfig createUpdatedFunctionConfig(String fieldName, 
Object fieldValue) {
+        FunctionConfig functionConfig = createFunctionConfig();
+        Class<?> fClass = FunctionConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(functionConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return functionConfig;
+    }
+
+    @Test
     public void testFunctionConfigConvertFromDetails() {
         String name = "test1";
         String namespace = "ns1";
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 0a68c85..858cd72 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -21,14 +21,18 @@ package org.apache.pulsar.functions.utils;
 import com.google.gson.Gson;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -60,4 +64,207 @@ public class SinkConfigUtilsTest {
                 new Gson().toJson(convertedConfig)
         );
     }
+
+    @Test
+    public void testMergeEqual() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createSinkConfig();
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Sink Names differ")
+    public void testMergeDifferentName() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("name", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("tenant", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("namespace", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("className", 
"Different");
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(sinkConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
+    public void testMergeDifferentInputs() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("topicsPattern", 
"Different");
+        SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = 
createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Retain Orderning cannot be altered")
+    public void testMergeDifferentRetainOrdering() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("retainOrdering", 
true);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("configs", 
myConfig);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getConfigs(),
+                myConfig
+        );
+        mergedConfig.setConfigs(sinkConfig.getConfigs());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("secrets", 
mySecrets);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(sinkConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "AutoAck cannot be altered")
+    public void testMergeDifferentAutoAck() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("autoAck", false);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Subscription Name cannot be altered")
+    public void testMergeDifferentSubname() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = 
createUpdatedSinkConfig("sourceSubscriptionName", "Different");
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("parallelism", 101);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(sinkConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        SinkConfig sinkConfig = createSinkConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("resources", 
resources);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(sinkConfig.getResources());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTimeout() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("timeoutMs", 102l);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                mergedConfig.getTimeoutMs(),
+                new Long(102l)
+        );
+        mergedConfig.setTimeoutMs(sinkConfig.getTimeoutMs());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private SinkConfig createSinkConfig() {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant("test-tenant");
+        sinkConfig.setNamespace("test-namespace");
+        sinkConfig.setName("test-sink");
+        sinkConfig.setParallelism(1);
+        sinkConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        sinkConfig.setInputSpecs(inputSpecs);
+        
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setRetainOrdering(false);
+        sinkConfig.setConfigs(new HashMap<>());
+        sinkConfig.setAutoAck(true);
+        sinkConfig.setTimeoutMs(2000l);
+        sinkConfig.setArchive("DummyArchive.nar");
+        return sinkConfig;
+    }
+
+    private SinkConfig createUpdatedSinkConfig(String fieldName, Object 
fieldValue) {
+        SinkConfig sinkConfig = createSinkConfig();
+        Class<?> fClass = SinkConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(sinkConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return sinkConfig;
+    }
 }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index dcaf2e9..7cb7d3c 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
+import java.util.Map;
 
+import static 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -53,4 +58,165 @@ public class SourceConfigUtilsTest {
                 new Gson().toJson(convertedConfig)
         );
     }
+
+    @Test
+    public void testMergeEqual() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createSourceConfig();
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Function Names differ")
+    public void testMergeDifferentName() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("name", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Tenants differ")
+    public void testMergeDifferentTenant() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("tenant", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Namespaces differ")
+    public void testMergeDifferentNamespace() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("namespace", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test
+    public void testMergeDifferentClassName() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("className", 
"Different");
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getClassName(),
+                "Different"
+        );
+        mergedConfig.setClassName(sourceConfig.getClassName());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Destination topics differ")
+    public void testMergeDifferentOutput() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", 
"Different");
+        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    public void testMergeDifferentProcessingGuarantees() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = 
createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+    }
+
+    @Test
+    public void testMergeDifferentUserConfig() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Map<String, String> myConfig = new HashMap<>();
+        myConfig.put("MyKey", "MyValue");
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("configs", 
myConfig);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getConfigs(),
+                myConfig
+        );
+        mergedConfig.setConfigs(sourceConfig.getConfigs());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentSecrets() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Map<String, String> mySecrets = new HashMap<>();
+        mySecrets.put("MyKey", "MyValue");
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("secrets", 
mySecrets);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getSecrets(),
+                mySecrets
+        );
+        mergedConfig.setSecrets(sourceConfig.getSecrets());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentParallelism() {
+        SourceConfig sourceConfig = createSourceConfig();
+        SourceConfig newSourceConfig = 
createUpdatedSourceConfig("parallelism", 101);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getParallelism(),
+                new Integer(101)
+        );
+        mergedConfig.setParallelism(sourceConfig.getParallelism());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentResources() {
+        SourceConfig sourceConfig = createSourceConfig();
+        Resources resources = new Resources();
+        resources.setCpu(0.3);
+        resources.setRam(1232l);
+        resources.setDisk(123456l);
+        SourceConfig newSourceConfig = createUpdatedSourceConfig("resources", 
resources);
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getResources(),
+                resources
+        );
+        mergedConfig.setResources(sourceConfig.getResources());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    private SourceConfig createSourceConfig() {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant("test-tenant");
+        sourceConfig.setNamespace("test-namespace");
+        sourceConfig.setName("test-source");
+        sourceConfig.setParallelism(1);
+        sourceConfig.setClassName(IdentityFunction.class.getName());
+        sourceConfig.setTopicName("test-output");
+        sourceConfig.setSerdeClassName("test-serde");
+        
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setConfigs(new HashMap<>());
+        return sourceConfig;
+    }
+
+    private SourceConfig createUpdatedSourceConfig(String fieldName, Object 
fieldValue) {
+        SourceConfig sourceConfig = createSourceConfig();
+        Class<?> fClass = SourceConfig.class;
+        try {
+            Field chap = fClass.getDeclaredField(fieldName);
+            chap.setAccessible(true);
+            chap.set(sourceConfig, fieldValue);
+        } catch (Exception e) {
+            throw new RuntimeException("Something wrong with the test", e);
+        }
+        return sourceConfig;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 2cd51b8..7defdf6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -125,8 +125,14 @@ public final class Utils {
     }
 
     public static void downloadFromBookkeeper(Namespace namespace,
-                                                 OutputStream outputStream,
-                                                 String packagePath) throws 
IOException {
+                                              File outputFile,
+                                              String packagePath) throws 
IOException {
+        downloadFromBookkeeper(namespace, new FileOutputStream(outputFile), 
packagePath);
+    }
+
+    public static void downloadFromBookkeeper(Namespace namespace,
+                                              OutputStream outputStream,
+                                              String packagePath) throws 
IOException {
         DistributedLogManager dlm = namespace.openLog(packagePath);
         try (InputStream in = new DLInputStream(dlm)) {
             int read = 0;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 18de1eb..627f5cc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -329,6 +329,19 @@ public class FunctionsImpl {
             return getUnavailableResponse();
         }
 
+        if (tenant == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant is not provided")).build();
+        }
+        if (namespace == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Namespace is not 
provided")).build();
+        }
+        if (componentName == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(componentType + " Name is not 
provided")).build();
+        }
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
@@ -349,23 +362,83 @@ public class FunctionsImpl {
                     .entity(new ErrorData(String.format("%s %s doesn't exist", 
componentType, componentName))).build();
         }
 
+        String mergedComponentConfigJson;
+        String existingComponentConfigJson;
+
+        FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (componentType.equals(FUNCTION)) {
+            FunctionConfig existingFunctionConfig = 
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingFunctionConfig);
+            FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            functionConfig.setTenant(tenant);
+            functionConfig.setNamespace(namespace);
+            functionConfig.setName(componentName);
+            try {
+                FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        } else if (componentType.equals(SOURCE)) {
+            SourceConfig existingSourceConfig = 
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingSourceConfig);
+            SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            sourceConfig.setTenant(tenant);
+            sourceConfig.setNamespace(namespace);
+            sourceConfig.setName(componentName);
+            try {
+                SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        } else {
+            SinkConfig existingSinkConfig = 
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+            existingComponentConfigJson = new 
Gson().toJson(existingSinkConfig);
+            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            sinkConfig.setTenant(tenant);
+            sinkConfig.setNamespace(namespace);
+            sinkConfig.setName(componentName);
+            try {
+                SinkConfig mergedConfig = 
SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
+                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
+            } catch (Exception e) {
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData(e.getMessage())).build();
+            }
+        }
+
+        if (existingComponentConfigJson.equals(mergedComponentConfigJson) && 
isBlank(functionPkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, componentName);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Update contains no 
change")).build();
+        }
+
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         File uploadedInputStreamAsFile = null;
         if (uploadedInputStream != null) {
             uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream);
         }
+        File existingPackageAsFile = null;
+
         // validate parameters
         try {
-            if (isPkgUrlProvided) {
+            if (isNotBlank(functionPkgUrl)) {
                 functionDetails = 
validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, 
functionPkgUrl,
-                        functionDetailsJson, componentConfigJson, 
componentType);
-            } else {
+                        functionDetailsJson, mergedComponentConfigJson, 
componentType);
+            } else if (uploadedInputStream != null) {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, componentConfigJson, 
componentType);
+                        fileDetail, functionDetailsJson, 
mergedComponentConfigJson, componentType);
+            } else {
+                functionDetails = 
validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, 
componentName, existingComponent.getPackageLocation(), 
mergedComponentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register {} request @ /{}/{}/{}", 
componentType, tenant, namespace, componentName, e);
+            log.error("Invalid update {} request @ /{}/{}/{}", componentType, 
tenant, namespace, componentName, e);
             return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -383,12 +456,16 @@ public class FunctionsImpl {
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-        try {
-            packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
-                    functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
-        } catch (Exception e) {
-            return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
-                    .build();
+        if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) {
+            try {
+                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionDetails, componentType,
+                        functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+                return 
Response.serverError().type(MediaType.APPLICATION_JSON).entity(new 
ErrorData(e.getMessage()))
+                        .build();
+            }
+        } else {
+            packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
         }
 
         
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
@@ -1157,6 +1234,16 @@ public class FunctionsImpl {
         return functionDetails;
     }
 
+    private FunctionDetails 
validateUpdateRequestParamsWithExistingMetadata(String tenant, String 
namespace, String componentName,
+                                                            
PackageLocationMetaData packageLocationMetaData,
+                                                            String 
componentConfigJson, String componentType) throws Exception {
+        File tmpFile = File.createTempFile("functions", null);
+        tmpFile.deleteOnExit();
+        Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, 
packageLocationMetaData.getPackagePath());
+        return validateUpdateRequestParams(tenant, namespace, componentName,
+                null, componentConfigJson, componentType, null, tmpFile);
+    }
+
     private static File dumpToTmpFile(InputStream uploadedInputStream) {
         try {
             File tmpFile = File.createTempFile("functions", null);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index e03d90d..5275cf0 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -126,6 +126,7 @@ public class FunctionApiV2ResourceTest {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetadata;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -140,6 +141,7 @@ public class FunctionApiV2ResourceTest {
         this.mockedPulsarAdmin = mock(PulsarAdmin.class);
         this.mockedTenants = mock(Tenants.class);
         this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctionMetadata = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
@@ -153,6 +155,7 @@ public class FunctionApiV2ResourceTest {
         when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
         when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
         when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetadata);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
@@ -434,16 +437,7 @@ public class FunctionApiV2ResourceTest {
     }
 
     private Response registerDefaultFunction() {
-        FunctionConfig functionConfig = new FunctionConfig();
-        functionConfig.setTenant(tenant);
-        functionConfig.setNamespace(namespace);
-        functionConfig.setName(function);
-        functionConfig.setClassName(className);
-        functionConfig.setParallelism(parallelism);
-        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
-        functionConfig.setOutput(outputTopic);
-        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -617,6 +611,9 @@ public class FunctionApiV2ResourceTest {
 
     @Test
     public void testUpdateFunctionMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
             tenant,
             namespace,
@@ -628,55 +625,104 @@ public class FunctionApiV2ResourceTest {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Package is not provided");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateFunctionMissingInputTopic() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
-                mockedInputStream,
+                null,
                 null,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
                 className,
                 parallelism,
-                "No input topic(s) specified for the function");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateFunctionMissingPackageDetails() throws IOException {
+    public void testUpdateFunctionMissingClassName() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
             tenant,
             namespace,
             function,
-            mockedInputStream,
-            topicsToSerDeClassName,
             null,
+            topicsToSerDeClassName,
+            mockedFormData,
             outputTopic,
                 outputSerdeClassName,
-            className,
+            null,
             parallelism,
-                "Function Package is not provided");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateFunctionMissingClassName() throws IOException {
+    public void testUpdateFunctionChangedParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
         testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
                 outputSerdeClassName,
-            null,
-            parallelism,
-                "Function classname cannot be null");
+                null,
+                parallelism + 1,
+                null);
+    }
+
+    @Test
+    public void testUpdateFunctionChangedInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                "DifferentOutput",
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Output topics differ");
+    }
+
+    @Test
+    public void testUpdateFunctionChangedOutput() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+        Map<String, String> someOtherInput = new HashMap<>();
+        someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                someOtherInput,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                null,
+                parallelism,
+                "Input Topics cannot be altered");
     }
 
     private void testUpdateFunctionMissingArguments(
@@ -720,6 +766,14 @@ public class FunctionApiV2ResourceTest {
         }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -732,8 +786,12 @@ public class FunctionApiV2ResourceTest {
             FunctionsImpl.FUNCTION,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultFunction() throws IOException {
@@ -1269,4 +1327,23 @@ public class FunctionApiV2ResourceTest {
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
+
+    public static FunctionConfig createDefaultFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        return functionConfig;
+    }
+
+    public static FunctionDetails createDefaultFunctionDetails() {
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        return FunctionConfigUtils.convert(functionConfig, null);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 5e710e0..a40e0ad 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -39,10 +41,12 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
+import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -60,12 +64,15 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
+import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -81,7 +88,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SinkApiV2Resource}.
  */
-@PrepareForTest({Utils.class, SinkConfigUtils.class})
+@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, 
org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SinkApiV2ResourceTest {
@@ -96,7 +103,7 @@ public class SinkApiV2ResourceTest {
     private static final String sink = "test-sink";
     private static final Map<String, String> topicsToSerDeClassName = new 
HashMap<>();
     static {
-        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
TopicSchema.DEFAULT_SERDE);
+        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
DEFAULT_SERDE);
     }
     private static final String subscriptionName = "test-subscription";
     private static final String className = 
CassandraStringSink.class.getName();
@@ -119,6 +126,7 @@ public class SinkApiV2ResourceTest {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetaData;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -372,13 +380,7 @@ public class SinkApiV2ResourceTest {
     }
 
     private Response registerDefaultSink() throws IOException {
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setTenant(tenant);
-        sinkConfig.setNamespace(namespace);
-        sinkConfig.setName(sink);
-        sinkConfig.setClassName(className);
-        sinkConfig.setParallelism(parallelism);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        SinkConfig sinkConfig = createDefaultSinkConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -573,6 +575,10 @@ public class SinkApiV2ResourceTest {
 
     @Test
     public void testUpdateSinkMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSinkMissingArguments(
             tenant,
             namespace,
@@ -582,21 +588,63 @@ public class SinkApiV2ResourceTest {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Sink Package is not provided");
+                "Update contains no change");
     }
 
     @Test
-    public void testUpdateSinkMissingPackageDetails() throws IOException {
+    public void testUpdateSinkMissingInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSinkMissingArguments(
-            tenant,
-            namespace,
+                tenant,
+                namespace,
                 sink,
-            mockedInputStream,
-            null,
-            topicsToSerDeClassName,
-            className,
-            parallelism,
-                "zip file is empty");
+                null,
+                mockedFormData,
+                null,
+                className,
+                parallelism,
+                "Update contains no change");
+    }
+
+    @Test
+    public void testUpdateSinkDifferentInputs() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        Map<String, String> inputTopics = new HashMap<>();
+        inputTopics.put("DifferntTopic", DEFAULT_SERDE);
+        testUpdateSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                mockedFormData,
+                inputTopics,
+                className,
+                parallelism,
+                "Input Topics cannot be altered");
+    }
+
+    @Test
+    public void testUpdateSinkDifferentParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSinkMissingArguments(
+                tenant,
+                namespace,
+                sink,
+                null,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                parallelism + 1,
+                null);
     }
 
     private void testUpdateSinkMissingArguments(
@@ -609,6 +657,24 @@ public class SinkApiV2ResourceTest {
             String className,
             Integer parallelism,
             String expectedError) throws IOException {
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
 
         SinkConfig sinkConfig = new SinkConfig();
@@ -631,6 +697,14 @@ public class SinkApiV2ResourceTest {
             sinkConfig.setParallelism(parallelism);
         }
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -643,8 +717,12 @@ public class SinkApiV2ResourceTest {
                 FunctionsImpl.SINK,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultSink() throws IOException {
@@ -656,6 +734,24 @@ public class SinkApiV2ResourceTest {
         sinkConfig.setParallelism(parallelism);
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
 
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         return resource.updateFunction(
             tenant,
             namespace,
@@ -730,6 +826,24 @@ public class SinkApiV2ResourceTest {
         sinkConfig.setParallelism(parallelism);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+        mockStatic(ConnectorUtils.class);
+        
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        
doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("source registered");
@@ -974,7 +1088,7 @@ public class SinkApiV2ResourceTest {
                 .setSubscriptionType(Function.SubscriptionType.SHARED)
                 .setSubscriptionName(subscriptionName)
                 .putInputSpecs("input", Function.ConsumerSpec.newBuilder()
-                .setSerdeClassName(TopicSchema.DEFAULT_SERDE)
+                .setSerdeClassName(DEFAULT_SERDE)
                 .setIsRegexPattern(false)
                 .build()).build();
         Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder()
@@ -985,7 +1099,7 @@ public class SinkApiV2ResourceTest {
                 .setSink(sinkSpec)
                 .setName(sink)
                 .setNamespace(namespace)
-                
.setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+                .setProcessingGuarantees(ATLEAST_ONCE)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
                 .setRuntime(FunctionDetails.Runtime.JAVA)
@@ -1101,4 +1215,19 @@ public class SinkApiV2ResourceTest {
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
     }
+
+    private SinkConfig createDefaultSinkConfig() {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        return sinkConfig;
+    }
+
+    private FunctionDetails createDefaultFunctionDetails() throws IOException {
+        return SinkConfigUtils.convert(createDefaultSinkConfig(), null);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 0c420a3..f60afe4 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -37,6 +38,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -55,6 +57,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
 import java.net.URL;
+import java.nio.file.Path;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -72,7 +75,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SourceApiV2Resource}.
  */
-@PrepareForTest({Utils.class})
+@PrepareForTest({Utils.class, ConnectorUtils.class, 
org.apache.pulsar.functions.utils.Utils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SourceApiV2ResourceTest {
@@ -107,6 +110,7 @@ public class SourceApiV2ResourceTest {
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
+    private FunctionMetaData mockedFunctionMetaData;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -342,14 +346,7 @@ public class SourceApiV2ResourceTest {
     }
 
     private Response registerDefaultSource() throws IOException {
-        SourceConfig sourceConfig = new SourceConfig();
-        sourceConfig.setTenant(tenant);
-        sourceConfig.setNamespace(namespace);
-        sourceConfig.setName(source);
-        sourceConfig.setClassName(className);
-        sourceConfig.setParallelism(parallelism);
-        sourceConfig.setTopicName(outputTopic);
-        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        SourceConfig sourceConfig = createDefaultSourceConfig();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -548,56 +545,53 @@ public class SourceApiV2ResourceTest {
 
     @Test
     public void testUpdateSourceMissingPackage() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
             tenant,
             namespace,
                 source,
-            null,
+                null,
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
             className,
             parallelism,
-                "Source Package is not provided");
-    }
-
-    @Test
-    public void testUpdateSourceMissingPackageDetails() throws IOException {
-        testUpdateSourceMissingArguments(
-            tenant,
-            namespace,
-                source,
-            mockedInputStream,
-            null,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "zip file is empty");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateSourceMissingTopicName() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
                 source,
-                mockedInputStream,
+                null,
                 mockedFormData,
                 null,
                 outputSerdeClassName,
                 className,
                 parallelism,
-                "Topic name cannot be null");
+                "Update contains no change");
     }
 
     @Test
     public void testUpdateSourceNegativeParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
                 source,
-                mockedInputStream,
+                null,
                 mockedFormData,
                 outputTopic,
                 outputSerdeClassName,
@@ -607,7 +601,49 @@ public class SourceApiV2ResourceTest {
     }
 
     @Test
+    public void testUpdateSourceChangedParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism + 1,
+                null);
+    }
+
+    @Test
+    public void testUpdateSourceChangedTopic() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                mockedFormData,
+                "DifferentTopic",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                "Destination topics differ");
+    }
+
+    @Test
     public void testUpdateSourceZeroParallelism() throws IOException {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), 
anyString());
+
         testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
@@ -632,6 +668,21 @@ public class SourceApiV2ResourceTest {
             String className,
             Integer parallelism,
             String expectedError) throws IOException {
+
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
 
         SourceConfig sourceConfig = new SourceConfig();
@@ -657,6 +708,14 @@ public class SourceApiV2ResourceTest {
             sourceConfig.setParallelism(parallelism);
         }
 
+        if (expectedError == null) {
+            RequestResult rr = new RequestResult()
+                    .setSuccess(true)
+                    .setMessage("source registered");
+            CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+            
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        }
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -669,8 +728,12 @@ public class SourceApiV2ResourceTest {
                 FunctionsImpl.SOURCE,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        if (expectedError == null) {
+            assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        } else {
+            assertEquals(Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new 
ErrorData(expectedError).reason);
+        }
     }
 
     private Response updateDefaultSource() throws IOException {
@@ -683,6 +746,21 @@ public class SourceApiV2ResourceTest {
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
 
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
+
         return resource.updateFunction(
             tenant,
             namespace,
@@ -757,6 +835,21 @@ public class SourceApiV2ResourceTest {
         sourceConfig.setClassName(className);
         sourceConfig.setParallelism(parallelism);
 
+        mockStatic(ConnectorUtils.class);
+        doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
+        ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+
+        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
+        
doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
+        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), 
any(NarClassLoader.class));
+
+        
doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
+        
org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), 
anyString(), any(File.class));
+
+        this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
@@ -1123,4 +1216,20 @@ public class SourceApiV2ResourceTest {
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
     }
+
+    private SourceConfig createDefaultSourceConfig() {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        return sourceConfig;
+    }
+
+    private FunctionDetails createDefaultFunctionDetails() throws IOException {
+        return SourceConfigUtils.convert(createDefaultSourceConfig(), null);
+    }
 }

Reply via email to