This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 85effc4 [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593) 85effc4 is described below commit 85effc4048199e30bd70edac17bf2d86b49f003b Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Tue May 25 11:39:10 2021 -0700 [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593) Co-authored-by: David Kjerrumgaard <dkjerrumga...@splunk.com> --- pulsar-client-tools/pom.xml | 6 ++ .../org/apache/pulsar/admin/cli/CmdSources.java | 42 +++++++++++++- .../apache/pulsar/admin/cli/TestCmdSources.java | 65 ++++++++++++++++++++++ .../common/io/BatchSourceConfigParseTest.java | 53 ++++++++++++++++++ .../website/versioned_docs/version-2.7.2/io-cli.md | 2 + 5 files changed, 167 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml index a639109..a094ba7 100644 --- a/pulsar-client-tools/pom.xml +++ b/pulsar-client-tools/pom.xml @@ -85,6 +85,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io-batch-discovery-triggerers</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 90a7d16..9f318c2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; @@ -315,6 +316,8 @@ public class CmdSources extends CmdBase { protected String DEPRECATED_sourceConfigString; @Parameter(names = "--source-config", description = "Source config key/values") protected String sourceConfigString; + @Parameter(names = "--batch-source-config", description = "Batch source config key/values") + protected String batchSourceConfigString; @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details") protected String customRuntimeOptions; @@ -417,6 +420,10 @@ public class CmdSources extends CmdBase { if (null != sourceConfigString) { sourceConfig.setConfigs(parseConfigs(sourceConfigString)); } + + if (null != batchSourceConfigString) { + sourceConfig.setBatchSourceConfig(parseBatchSourceConfigs(batchSourceConfigString)); + } if (customRuntimeOptions != null) { sourceConfig.setCustomRuntimeOptions(customRuntimeOptions); @@ -427,7 +434,11 @@ public class CmdSources extends CmdBase { protected Map<String, Object> parseConfigs(String str) { Type type = new TypeToken<Map<String, Object>>(){}.getType(); - return new Gson().fromJson(str, type); + return new Gson().fromJson(str, type); + } + + protected BatchSourceConfig parseBatchSourceConfigs(String str) { + return new Gson().fromJson(str, BatchSourceConfig.class); } protected void validateSourceConfigs(SourceConfig sourceConfig) { @@ -444,6 +455,35 @@ public class CmdSources extends CmdBase { if (isBlank(sourceConfig.getName())) { throw new IllegalArgumentException("Source name not specified"); } + + if (sourceConfig.getBatchSourceConfig() != null) { + validateBatchSourceConfigs(sourceConfig.getBatchSourceConfig()); + } + } + + protected void validateBatchSourceConfigs(BatchSourceConfig batchSourceConfig) { + if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) { + throw new IllegalArgumentException("Discovery Triggerer not specified"); + } + + boolean isBatchSourceTriggerer = false; + + try { + Class<?>[] interfaces = Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces(); + int idx = 0; + + while (idx < interfaces.length && !isBatchSourceTriggerer) { + isBatchSourceTriggerer = interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer"); + } + + if (!isBatchSourceTriggerer) { + throw new IllegalArgumentException("Invalid Discovery Triggerer specified"); + } + + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Invalid Discovery Triggerer specified"); + } + } protected String validateSourceType(String sourceType) throws IOException { diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index d9ef39d..b2137bd 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -33,12 +33,16 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; + import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.Sources; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.UpdateOptions; +import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.powermock.api.mockito.PowerMockito; @@ -74,6 +78,8 @@ public class TestCmdSources { private static final Long RAM = 1024L * 1024L; private static final Long DISK = 1024L * 1024L * 1024L; private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}"; + private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\"," + + "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }"; private PulsarAdmin pulsarAdmin; private Sources source; @@ -133,6 +139,10 @@ public class TestCmdSources { sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING)); return sourceConfig; } + + public BatchSourceConfig getBatchSourceConfig() { + return createSource.parseBatchSourceConfigs(BATCH_SOURCE_CONFIG_STRING); + } @Test public void testCliCorrect() throws Exception { @@ -390,6 +400,61 @@ public class TestCmdSources { testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } + @Test + public void testBatchSourceConfigCorrect() throws Exception { + SourceConfig testSourceConfig = getSourceConfig(); + testSourceConfig.setBatchSourceConfig(getBatchSourceConfig()); + + SourceConfig expectedSourceConfig = getSourceConfig(); + expectedSourceConfig.setBatchSourceConfig(getBatchSourceConfig()); + testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); + } + + /* + * Test where the DiscoveryTriggererClassName is null + */ + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Discovery Triggerer not specified") + public void testBatchSourceConfigMissingDiscoveryTriggererClassName() throws Exception { + SourceConfig testSourceConfig = getSourceConfig(); + BatchSourceConfig batchSourceConfig = getBatchSourceConfig(); + batchSourceConfig.setDiscoveryTriggererClassName(null); + testSourceConfig.setBatchSourceConfig(batchSourceConfig); + + SourceConfig expectedSourceConfig = getSourceConfig(); + expectedSourceConfig.setBatchSourceConfig(batchSourceConfig); + testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); + } + + /* + * Test where the class name does not implement the BatchSourceTriggerer interface + */ + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified") + public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() throws Exception { + SourceConfig testSourceConfig = getSourceConfig(); + BatchSourceConfig batchSourceConfig = getBatchSourceConfig(); + batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String"); + testSourceConfig.setBatchSourceConfig(batchSourceConfig); + + SourceConfig expectedSourceConfig = getSourceConfig(); + expectedSourceConfig.setBatchSourceConfig(batchSourceConfig); + testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); + } + + /* + * Test where the class name provided doesn't exist + */ + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified") + public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws Exception { + SourceConfig testSourceConfig = getSourceConfig(); + BatchSourceConfig batchSourceConfig = getBatchSourceConfig(); + batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar"); + testSourceConfig.setBatchSourceConfig(batchSourceConfig); + + SourceConfig expectedSourceConfig = getSourceConfig(); + expectedSourceConfig.setBatchSourceConfig(batchSourceConfig); + testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); + } + public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception { File file = Files.createTempFile("", "").toFile(); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java new file mode 100644 index 0000000..c761404 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.io; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import org.testng.annotations.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class BatchSourceConfigParseTest { + + private ObjectMapper objectMapper = new ObjectMapper(); + + @Test + public final void ImmediateTriggererTest() throws JsonMappingException, JsonProcessingException { + String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer\" }"; + BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class); + assertNotNull(config); + assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer"); + } + + @Test + public final void CronTriggererTest() throws JsonMappingException, JsonProcessingException { + String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\"," + + "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }"; + BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class); + assertNotNull(config); + assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.CronTriggerer"); + assertNotNull(config.getDiscoveryTriggererConfig()); + assertEquals(config.getDiscoveryTriggererConfig().size(), 1); + assertEquals(config.getDiscoveryTriggererConfig().get("cron"), "5 0 0 0 0 *"); + } +} diff --git a/site2/website/versioned_docs/version-2.7.2/io-cli.md b/site2/website/versioned_docs/version-2.7.2/io-cli.md index 00f2e30..2198a85 100644 --- a/site2/website/versioned_docs/version-2.7.2/io-cli.md +++ b/site2/website/versioned_docs/version-2.7.2/io-cli.md @@ -57,6 +57,7 @@ $ pulsar-admin sources create options |Flag|Description| |----|---| | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package. +| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} } | `--classname` | The source's class name if `archive` is file-url-path (file://). | `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime). | `--deserialization-classname` | The SerDe classname for the source. @@ -89,6 +90,7 @@ $ pulsar-admin sources update options |Flag|Description| |----|---| | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package. +| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} } | `--classname` | The source's class name if `archive` is file-url-path (file://). | `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime). | `--deserialization-classname` | The SerDe classname for the source.