This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6b5f737 Prefer rest path over Function/Source/Sink Config values (#2918) 6b5f737 is described below commit 6b5f737e417fa391d82dda68f01e819e1e93614c Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Nov 3 10:50:48 2018 -0700 Prefer rest path over Function/Source/Sink Config values (#2918) ### Motivation When users create/update sink/source/function, they call the rest endpoint that has a tenant/namespace/name components and pass it a Function/Source/SinkConfig. These configs also have tenant/namespace/name parameter that might be different from the rest path. This pr gives preference to the rest path by overwriting the config fields. --- .gitignore | 1 + .../functions/worker/rest/api/FunctionsImpl.java | 12 ++++++ .../rest/api/v2/FunctionApiV2ResourceTest.java | 32 ++++++++++++++++ .../worker/rest/api/v2/SinkApiV2ResourceTest.java | 42 +++++++++++++++++++++ .../rest/api/v2/SourceApiV2ResourceTest.java | 43 ++++++++++++++++++++++ 5 files changed, 130 insertions(+) diff --git a/.gitignore b/.gitignore index 38266a8..e43e519 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ logs pulsar-broker/tmp.* pulsar-broker/src/test/resources/log4j2.yaml pulsar-functions/worker/test-tenant/ +pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar *.log *.nar diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 57fc331..587d9bd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -1103,6 +1103,10 @@ public class FunctionsImpl { if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) { FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(componentName); FunctionConfigUtils.inferMissingArguments(functionConfig); ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile); return FunctionConfigUtils.convert(functionConfig, clsLoader); @@ -1110,6 +1114,10 @@ public class FunctionsImpl { if (componentType.equals(SOURCE)) { Path archivePath = null; SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); + // The rest end points take precendence over whatever is there in sourceconfig + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(componentName); SourceConfigUtils.inferMissingArguments(sourceConfig); if (!StringUtils.isEmpty(sourceConfig.getArchive())) { String builtinArchive = sourceConfig.getArchive(); @@ -1128,6 +1136,10 @@ public class FunctionsImpl { if (componentType.equals(SINK)) { Path archivePath = null; SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); + // The rest end points take precendence over whatever is there in sinkConfig + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(componentName); SinkConfigUtils.inferMissingArguments(sinkConfig); if (!StringUtils.isEmpty(sinkConfig.getArchive())) { String builtinArchive = sinkConfig.getArchive(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 1a7b51a..c3995a8 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -1200,4 +1200,36 @@ public class FunctionApiV2ResourceTest { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + + @Test + public void testRegisterFunctionWithConflictingFields() throws IOException { + Configurator.setRootLevel(Level.DEBUG); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String filePackageUrl = "file://" + fileLocation; + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + Response response = resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, + null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 57e3cc5..93a0a74 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -421,6 +421,48 @@ public class SinkApiV2ResourceTest { } @Test + public void testRegisterSinkConflictingFields() throws Exception { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(sink); + sinkConfig.setClassName(className); + sinkConfig.setParallelism(parallelism); + sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + Response response = resource.registerFunction( + actualTenant, + actualNamespace, + actualName, + new FileInputStream(JAR_FILE_PATH), + mockedFormData, + null, + null, + new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, + null); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test public void testRegisterSinkFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 3a4785f..b7e5d13 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -392,6 +392,49 @@ public class SourceApiV2ResourceTest { } @Test + public void testRegisterSourceConflictingFields() throws Exception { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(source); + sourceConfig.setClassName(className); + sourceConfig.setParallelism(parallelism); + sourceConfig.setTopicName(outputTopic); + sourceConfig.setSerdeClassName(outputSerdeClassName); + Response response = resource.registerFunction( + actualTenant, + actualNamespace, + actualName, + new FileInputStream(JAR_FILE_PATH), + mockedFormData, + null, + null, + new Gson().toJson(sourceConfig), + FunctionsImpl.SOURCE, + null); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test public void testRegisterSourceFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class);