This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85effc4  [Issue-10109] [admin client] Add --batch-source-config switch 
to the Pulsar Admin Source API (#10593)
85effc4 is described below

commit 85effc4048199e30bd70edac17bf2d86b49f003b
Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com>
AuthorDate: Tue May 25 11:39:10 2021 -0700

    [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar 
Admin Source API (#10593)
    
    Co-authored-by: David Kjerrumgaard <dkjerrumga...@splunk.com>
---
 pulsar-client-tools/pom.xml                        |  6 ++
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 42 +++++++++++++-
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 65 ++++++++++++++++++++++
 .../common/io/BatchSourceConfigParseTest.java      | 53 ++++++++++++++++++
 .../website/versioned_docs/version-2.7.2/io-cli.md |  2 +
 5 files changed, 167 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index a639109..a094ba7 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -85,6 +85,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+       <groupId>org.apache.pulsar</groupId>
+       <artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
+       <version>${project.version}</version>
+       <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 90a7d16..9f318c2 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
@@ -315,6 +316,8 @@ public class CmdSources extends CmdBase {
         protected String DEPRECATED_sourceConfigString;
         @Parameter(names = "--source-config", description = "Source config 
key/values")
         protected String sourceConfigString;
+        @Parameter(names = "--batch-source-config", description = "Batch 
source config key/values")
+        protected String batchSourceConfigString;
         @Parameter(names = "--custom-runtime-options", description = "A string 
that encodes options to customize the runtime, see docs for configured runtime 
for details")
         protected String customRuntimeOptions;
 
@@ -417,6 +420,10 @@ public class CmdSources extends CmdBase {
             if (null != sourceConfigString) {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
             }
+            
+            if (null != batchSourceConfigString) {
+               
sourceConfig.setBatchSourceConfig(parseBatchSourceConfigs(batchSourceConfigString));
+            }
 
             if (customRuntimeOptions != null) {
                 sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
@@ -427,7 +434,11 @@ public class CmdSources extends CmdBase {
 
         protected Map<String, Object> parseConfigs(String str) {
             Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type);
+            return new Gson().fromJson(str, type); 
+        }
+        
+        protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+               return new Gson().fromJson(str, BatchSourceConfig.class);
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
@@ -444,6 +455,35 @@ public class CmdSources extends CmdBase {
             if (isBlank(sourceConfig.getName())) {
                 throw new IllegalArgumentException("Source name not 
specified");
             }
+            
+            if (sourceConfig.getBatchSourceConfig() != null) {
+               validateBatchSourceConfigs(sourceConfig.getBatchSourceConfig());
+            }
+        }
+        
+        protected void validateBatchSourceConfigs(BatchSourceConfig 
batchSourceConfig) {
+           if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) {
+             throw new IllegalArgumentException("Discovery Triggerer not 
specified");
+           } 
+           
+           boolean isBatchSourceTriggerer = false;
+           
+           try {
+             Class<?>[] interfaces = 
Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces();
+             int idx = 0;
+             
+             while (idx < interfaces.length && !isBatchSourceTriggerer) {
+                isBatchSourceTriggerer = 
interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer");
+             }
+             
+             if (!isBatchSourceTriggerer) {
+                throw new IllegalArgumentException("Invalid Discovery 
Triggerer specified"); 
+             }
+             
+           } catch (ClassNotFoundException e) {
+             throw new IllegalArgumentException("Invalid Discovery Triggerer 
specified"); 
+           }
+           
         }
 
         protected String validateSourceType(String sourceType) throws 
IOException {
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index d9ef39d..b2137bd 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -33,12 +33,16 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.powermock.api.mockito.PowerMockito;
@@ -74,6 +78,8 @@ public class TestCmdSources {
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon 
Jul 02 00:33:15 +0000 2018\"}";
+    private static final String BATCH_SOURCE_CONFIG_STRING = "{ 
\"discoveryTriggererClassName\" : 
\"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+                       + "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 
*\"} }";
 
     private PulsarAdmin pulsarAdmin;
     private Sources source;
@@ -133,6 +139,10 @@ public class TestCmdSources {
         sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
         return sourceConfig;
     }
+    
+    public BatchSourceConfig getBatchSourceConfig() {
+       return createSource.parseBatchSourceConfigs(BATCH_SOURCE_CONFIG_STRING);
+    }
 
     @Test
     public void testCliCorrect() throws Exception {
@@ -390,6 +400,61 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
+    @Test
+    public void testBatchSourceConfigCorrect() throws Exception {
+       SourceConfig testSourceConfig = getSourceConfig();
+       testSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+       
+       SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the DiscoveryTriggererClassName is null
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Discovery Triggerer not specified")
+    public void testBatchSourceConfigMissingDiscoveryTriggererClassName() 
throws Exception {
+       SourceConfig testSourceConfig = getSourceConfig();
+       BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+       batchSourceConfig.setDiscoveryTriggererClassName(null);
+       testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+       
+       SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the class name does not implement the BatchSourceTriggerer 
interface
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+    public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() 
throws Exception {
+       SourceConfig testSourceConfig = getSourceConfig();
+       BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+       batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String");
+       testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+       
+       SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the class name provided doesn't exist
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+    public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws 
Exception {
+       SourceConfig testSourceConfig = getSourceConfig();
+       BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+       batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar");
+       testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+       
+       SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
     public void testCmdSourceConfigFile(SourceConfig testSourceConfig, 
SourceConfig expectedSourceConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
new file mode 100644
index 0000000..c761404
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class BatchSourceConfigParseTest {
+       
+       private ObjectMapper objectMapper = new ObjectMapper();
+
+       @Test
+       public final void ImmediateTriggererTest() throws JsonMappingException, 
JsonProcessingException {
+               String json = "{ \"discoveryTriggererClassName\" : 
\"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer\" }";
+               BatchSourceConfig config = objectMapper.readValue(json, 
BatchSourceConfig.class);
+               assertNotNull(config);
+               assertEquals(config.getDiscoveryTriggererClassName(), 
"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer");
+       }
+       
+       @Test
+       public final void CronTriggererTest() throws JsonMappingException, 
JsonProcessingException {
+               String json = "{ \"discoveryTriggererClassName\" : 
\"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+                               + "\"discoveryTriggererConfig\": {\"cron\": \"5 
0 0 0 0 *\"} }";
+               BatchSourceConfig config = objectMapper.readValue(json, 
BatchSourceConfig.class);
+               assertNotNull(config);
+               assertEquals(config.getDiscoveryTriggererClassName(), 
"org.apache.pulsar.io.batchdiscovery.CronTriggerer");
+               assertNotNull(config.getDiscoveryTriggererConfig());
+               assertEquals(config.getDiscoveryTriggererConfig().size(), 1);
+               assertEquals(config.getDiscoveryTriggererConfig().get("cron"), 
"5 0 0 0 0 *");
+       }
+}
diff --git a/site2/website/versioned_docs/version-2.7.2/io-cli.md 
b/site2/website/versioned_docs/version-2.7.2/io-cli.md
index 00f2e30..2198a85 100644
--- a/site2/website/versioned_docs/version-2.7.2/io-cli.md
+++ b/site2/website/versioned_docs/version-2.7.2/io-cli.md
@@ -57,6 +57,7 @@ $ pulsar-admin sources create options
 |Flag|Description|
 |----|---|
 | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also 
supports url-path (http/https/file [file protocol assumes that file already 
exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs 
provided as a JSON string, e.g., { "discoveryTriggererClassName" : 
"org.apache.pulsar.io.batchdiscovery.CronTriggerer", 
"discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
 | `--classname` | The source's class name if `archive` is file-url-path 
(file://).
 | `--cpu` | The CPU (in cores) that needs to be allocated per source instance 
(applicable only to Docker runtime).
 | `--deserialization-classname` | The SerDe classname for the source.
@@ -89,6 +90,7 @@ $ pulsar-admin sources update options
 |Flag|Description|
 |----|---|
 | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also 
supports url-path (http/https/file [file protocol assumes that file already 
exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs 
provided as a JSON string, e.g., { "discoveryTriggererClassName" : 
"org.apache.pulsar.io.batchdiscovery.CronTriggerer", 
"discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
 | `--classname` | The source's class name if `archive` is file-url-path 
(file://).
 | `--cpu` | The CPU (in cores) that needs to be allocated per source instance 
(applicable only to Docker runtime).
 | `--deserialization-classname` | The SerDe classname for the source.

Reply via email to