sijie closed pull request #2918: Prefer rest path over Function/Source/Sink Config values URL: https://github.com/apache/pulsar/pull/2918
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 38266a8cf2..e43e519a71 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ logs pulsar-broker/tmp.* pulsar-broker/src/test/resources/log4j2.yaml pulsar-functions/worker/test-tenant/ +pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar *.log *.nar diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 914a691524..ebaa586b8a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -1071,6 +1071,10 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) { FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); + // The rest end points take precendence over whatever is there in functionconfig + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(componentName); FunctionConfigUtils.inferMissingArguments(functionConfig); ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, functionPkgUrl, uploadedInputStreamAsFile); return FunctionConfigUtils.convert(functionConfig, clsLoader); @@ -1078,6 +1082,10 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp if (componentType.equals(SOURCE)) { Path archivePath = null; SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); + // The rest end points take precendence over whatever is there in sourceconfig + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(componentName); SourceConfigUtils.inferMissingArguments(sourceConfig); if (!StringUtils.isEmpty(sourceConfig.getArchive())) { String builtinArchive = sourceConfig.getArchive(); @@ -1096,6 +1104,10 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp if (componentType.equals(SINK)) { Path archivePath = null; SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); + // The rest end points take precendence over whatever is there in sinkConfig + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(componentName); SinkConfigUtils.inferMissingArguments(sinkConfig); if (!StringUtils.isEmpty(sinkConfig.getArchive())) { String builtinArchive = sinkConfig.getArchive(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 1a7b51a977..c3995a83d9 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -1200,4 +1200,36 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + + @Test + public void testRegisterFunctionWithConflictingFields() throws IOException { + Configurator.setRootLevel(Level.DEBUG); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + String filePackageUrl = "file://" + fileLocation; + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + Response response = resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, + null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 57e3cc5e4b..93a0a747c7 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -420,6 +420,48 @@ public void testRegisterSinkSuccess() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + @Test + public void testRegisterSinkConflictingFields() throws Exception { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setTenant(tenant); + sinkConfig.setNamespace(namespace); + sinkConfig.setName(sink); + sinkConfig.setClassName(className); + sinkConfig.setParallelism(parallelism); + sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); + Response response = resource.registerFunction( + actualTenant, + actualNamespace, + actualName, + new FileInputStream(JAR_FILE_PATH), + mockedFormData, + null, + null, + new Gson().toJson(sinkConfig), + FunctionsImpl.SINK, + null); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + @Test public void testRegisterSinkFailure() throws Exception { mockStatic(Utils.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index 3a4785f15c..b7e5d130a0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -391,6 +391,49 @@ public void testRegisterSourceSuccess() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); } + @Test + public void testRegisterSourceConflictingFields() throws Exception { + mockStatic(Utils.class); + doNothing().when(Utils.class); + Utils.uploadFileToBookkeeper( + anyString(), + any(File.class), + any(Namespace.class)); + String actualTenant = "DIFFERENT_TENANT"; + String actualNamespace = "DIFFERENT_NAMESPACE"; + String actualName = "DIFFERENT_NAME"; + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); + + RequestResult rr = new RequestResult() + .setSuccess(true) + .setMessage("source registered"); + CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr); + when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); + + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setTenant(tenant); + sourceConfig.setNamespace(namespace); + sourceConfig.setName(source); + sourceConfig.setClassName(className); + sourceConfig.setParallelism(parallelism); + sourceConfig.setTopicName(outputTopic); + sourceConfig.setSerdeClassName(outputSerdeClassName); + Response response = resource.registerFunction( + actualTenant, + actualNamespace, + actualName, + new FileInputStream(JAR_FILE_PATH), + mockedFormData, + null, + null, + new Gson().toJson(sourceConfig), + FunctionsImpl.SOURCE, + null); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + @Test public void testRegisterSourceFailure() throws Exception { mockStatic(Utils.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services