sijie closed pull request #2918: Prefer rest path over Function/Source/Sink 
Config values
URL: https://github.com/apache/pulsar/pull/2918
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 38266a8cf2..e43e519a71 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@ logs
 pulsar-broker/tmp.*
 pulsar-broker/src/test/resources/log4j2.yaml
 pulsar-functions/worker/test-tenant/
+pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar
 *.log
 *.nar
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 914a691524..ebaa586b8a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -1071,6 +1071,10 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            functionConfig.setTenant(tenant);
+            functionConfig.setNamespace(namespace);
+            functionConfig.setName(componentName);
             FunctionConfigUtils.inferMissingArguments(functionConfig);
             ClassLoader clsLoader = 
FunctionConfigUtils.validate(functionConfig, functionPkgUrl, 
uploadedInputStreamAsFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
@@ -1078,6 +1082,10 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
+            // The rest end points take precendence over whatever is there in 
sourceconfig
+            sourceConfig.setTenant(tenant);
+            sourceConfig.setNamespace(namespace);
+            sourceConfig.setName(componentName);
             SourceConfigUtils.inferMissingArguments(sourceConfig);
             if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
                 String builtinArchive = sourceConfig.getArchive();
@@ -1096,6 +1104,10 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
+            // The rest end points take precendence over whatever is there in 
sinkConfig
+            sinkConfig.setTenant(tenant);
+            sinkConfig.setNamespace(namespace);
+            sinkConfig.setName(componentName);
             SinkConfigUtils.inferMissingArguments(sinkConfig);
             if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
                 String builtinArchive = sinkConfig.getArchive();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 1a7b51a977..c3995a83d9 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -1200,4 +1200,36 @@ public void 
testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
+
+    @Test
+    public void testRegisterFunctionWithConflictingFields() throws IOException 
{
+        Configurator.setRootLevel(Level.DEBUG);
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new 
RequestResult().setSuccess(true).setMessage("function registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        Response response = resource.registerFunction(actualTenant, 
actualNamespace, actualName, null, null, filePackageUrl,
+                null, new Gson().toJson(functionConfig), 
FunctionsImpl.FUNCTION, null);
+
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 57e3cc5e4b..93a0a747c7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -420,6 +420,48 @@ public void testRegisterSinkSuccess() throws Exception {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
+    @Test
+    public void testRegisterSinkConflictingFields() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        Response response = resource.registerFunction(
+                actualTenant,
+                actualNamespace,
+                actualName,
+                new FileInputStream(JAR_FILE_PATH),
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
+                null);
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
     @Test
     public void testRegisterSinkFailure() throws Exception {
         mockStatic(Utils.class);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 3a4785f15c..b7e5d130a0 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -391,6 +391,49 @@ public void testRegisterSourceSuccess() throws Exception {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
+    @Test
+    public void testRegisterSourceConflictingFields() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        Response response = resource.registerFunction(
+                actualTenant,
+                actualNamespace,
+                actualName,
+                new FileInputStream(JAR_FILE_PATH),
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(sourceConfig),
+                FunctionsImpl.SOURCE,
+                null);
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
     @Test
     public void testRegisterSourceFailure() throws Exception {
         mockStatic(Utils.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to