This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5f737  Prefer rest path over Function/Source/Sink Config values 
(#2918)
6b5f737 is described below

commit 6b5f737e417fa391d82dda68f01e819e1e93614c
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Sat Nov 3 10:50:48 2018 -0700

    Prefer rest path over Function/Source/Sink Config values (#2918)
    
    ### Motivation
    
    When users create/update sink/source/function, they call the rest endpoint 
that has a tenant/namespace/name components and pass it a 
Function/Source/SinkConfig. These configs also have tenant/namespace/name 
parameter that might be different from the rest path.
    This pr gives preference to the rest path by overwriting the config fields.
---
 .gitignore                                         |  1 +
 .../functions/worker/rest/api/FunctionsImpl.java   | 12 ++++++
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 32 ++++++++++++++++
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  | 42 +++++++++++++++++++++
 .../rest/api/v2/SourceApiV2ResourceTest.java       | 43 ++++++++++++++++++++++
 5 files changed, 130 insertions(+)

diff --git a/.gitignore b/.gitignore
index 38266a8..e43e519 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@ logs
 pulsar-broker/tmp.*
 pulsar-broker/src/test/resources/log4j2.yaml
 pulsar-functions/worker/test-tenant/
+pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar
 *.log
 *.nar
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 57fc331..587d9bd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -1103,6 +1103,10 @@ public class FunctionsImpl {
 
         if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
+            // The rest end points take precendence over whatever is there in 
functionconfig
+            functionConfig.setTenant(tenant);
+            functionConfig.setNamespace(namespace);
+            functionConfig.setName(componentName);
             FunctionConfigUtils.inferMissingArguments(functionConfig);
             ClassLoader clsLoader = 
FunctionConfigUtils.validate(functionConfig, functionPkgUrl, 
uploadedInputStreamAsFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
@@ -1110,6 +1114,10 @@ public class FunctionsImpl {
         if (componentType.equals(SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
+            // The rest end points take precendence over whatever is there in 
sourceconfig
+            sourceConfig.setTenant(tenant);
+            sourceConfig.setNamespace(namespace);
+            sourceConfig.setName(componentName);
             SourceConfigUtils.inferMissingArguments(sourceConfig);
             if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
                 String builtinArchive = sourceConfig.getArchive();
@@ -1128,6 +1136,10 @@ public class FunctionsImpl {
         if (componentType.equals(SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
+            // The rest end points take precendence over whatever is there in 
sinkConfig
+            sinkConfig.setTenant(tenant);
+            sinkConfig.setNamespace(namespace);
+            sinkConfig.setName(componentName);
             SinkConfigUtils.inferMissingArguments(sinkConfig);
             if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
                 String builtinArchive = sinkConfig.getArchive();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 1a7b51a..c3995a8 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -1200,4 +1200,36 @@ public class FunctionApiV2ResourceTest {
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
+
+    @Test
+    public void testRegisterFunctionWithConflictingFields() throws IOException 
{
+        Configurator.setRootLevel(Level.DEBUG);
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        String filePackageUrl = "file://" + fileLocation;
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new 
RequestResult().setSuccess(true).setMessage("function registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        Response response = resource.registerFunction(actualTenant, 
actualNamespace, actualName, null, null, filePackageUrl,
+                null, new Gson().toJson(functionConfig), 
FunctionsImpl.FUNCTION, null);
+
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 57e3cc5..93a0a74 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -421,6 +421,48 @@ public class SinkApiV2ResourceTest {
     }
 
     @Test
+    public void testRegisterSinkConflictingFields() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sink);
+        sinkConfig.setClassName(className);
+        sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        Response response = resource.registerFunction(
+                actualTenant,
+                actualNamespace,
+                actualName,
+                new FileInputStream(JAR_FILE_PATH),
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
+                null);
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
     public void testRegisterSinkFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 3a4785f..b7e5d13 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -392,6 +392,49 @@ public class SourceApiV2ResourceTest {
     }
 
     @Test
+    public void testRegisterSourceConflictingFields() throws Exception {
+        mockStatic(Utils.class);
+        doNothing().when(Utils.class);
+        Utils.uploadFileToBookkeeper(
+                anyString(),
+                any(File.class),
+                any(Namespace.class));
+        String actualTenant = "DIFFERENT_TENANT";
+        String actualNamespace = "DIFFERENT_NAMESPACE";
+        String actualName = "DIFFERENT_NAME";
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+        when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
+
+        RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("source registered");
+        CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
+        
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(source);
+        sourceConfig.setClassName(className);
+        sourceConfig.setParallelism(parallelism);
+        sourceConfig.setTopicName(outputTopic);
+        sourceConfig.setSerdeClassName(outputSerdeClassName);
+        Response response = resource.registerFunction(
+                actualTenant,
+                actualNamespace,
+                actualName,
+                new FileInputStream(JAR_FILE_PATH),
+                mockedFormData,
+                null,
+                null,
+                new Gson().toJson(sourceConfig),
+                FunctionsImpl.SOURCE,
+                null);
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    }
+
+    @Test
     public void testRegisterSourceFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);

Reply via email to