Repository: metron
Updated Branches:
  refs/heads/master 8ef18d31a -> 38eebd59c


METRON-1134: Allow parser command line options to be specified in the zookeeper 
parser config. closes apache/incubator-metron#717


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/38eebd59
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/38eebd59
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/38eebd59

Branch: refs/heads/master
Commit: 38eebd59ce2884ead7f3797ff98f7308409e05dd
Parents: 8ef18d3
Author: cstella <ceste...@gmail.com>
Authored: Thu Aug 31 15:46:29 2017 -0400
Committer: cstella <ceste...@gmail.com>
Committed: Thu Aug 31 15:46:29 2017 -0400

----------------------------------------------------------------------
 ...orParserConfigControllerIntegrationTest.java |  18 +-
 .../configuration/SensorParserConfig.java       | 208 ++++++++-
 .../management/ConfigurationFunctionsTest.java  |  10 +-
 metron-platform/metron-parsers/README.md        |  11 +
 .../parsers/topology/ParserTopologyBuilder.java |  87 ++--
 .../parsers/topology/ParserTopologyCLI.java     | 218 +++++++---
 .../parsers/topology/config/ValueSupplier.java  |  30 ++
 .../components/ParserTopologyComponent.java     |  30 +-
 .../parsers/topology/ParserTopologyCLITest.java | 425 ++++++++++++++++++-
 9 files changed, 934 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
index f5ea23d..66771eb 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.controller;
 
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
@@ -37,6 +38,7 @@ import org.springframework.web.context.WebApplicationContext;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.hamcrest.Matchers.hasSize;
@@ -196,11 +198,17 @@ public class SensorParserConfigControllerIntegrationTest {
     cleanFileSystem();
     this.sensorParserConfigService.delete("broTest");
     this.sensorParserConfigService.delete("squidTest");
-
+    Method[] method = SensorParserConfig.class.getMethods();
+    int numFields = 0;
+    for(Method m : method) {
+      if(m.getName().startsWith("set")) {
+        numFields++;
+      }
+    }
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson))
             .andExpect(status().isCreated())
             
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(10)))
+            .andExpect(jsonPath("$.*", hasSize(numFields)))
             
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             
.andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -215,7 +223,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(get(sensorParserConfigUrl + 
"/squidTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(10)))
+            .andExpect(jsonPath("$.*", hasSize(numFields)))
             
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             
.andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -244,7 +252,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isCreated())
             
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(10)))
+            .andExpect(jsonPath("$.*", hasSize(numFields)))
             
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))
@@ -254,7 +262,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(10)))
+            .andExpect(jsonPath("$.*", hasSize(numFields)))
             
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index f08e9c4..2d0ccd8 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -37,7 +37,155 @@ public class SensorParserConfig implements Serializable {
   private String invalidWriterClassName;
   private Boolean readMetadata = false;
   private Boolean mergeMetadata = false;
+  private Integer numWorkers = null;
+  private Integer numAckers= null;
+  private Integer spoutParallelism = 1;
+  private Integer spoutNumTasks = 1;
+  private Integer parserParallelism = 1;
+  private Integer parserNumTasks = 1;
+  private Integer errorWriterParallelism = 1;
+  private Integer errorWriterNumTasks = 1;
+  private Map<String, Object> spoutConfig = new HashMap<>();
+  private String securityProtocol = null;
+  private Map<String, Object> stormConfig = new HashMap<>();
+
+  /**
+   * Return the number of workers for the topology.  This property will be 
used for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getNumWorkers() {
+    return numWorkers;
+  }
+
+  public void setNumWorkers(Integer numWorkers) {
+    this.numWorkers = numWorkers;
+  }
+
+  /**
+   * Return the number of ackers for the topology.  This property will be used 
for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getNumAckers() {
+    return numAckers;
+  }
+
+  public void setNumAckers(Integer numAckers) {
+    this.numAckers = numAckers;
+  }
+
+  /**
+   * Return the spout parallelism.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getSpoutParallelism() {
+    return spoutParallelism;
+  }
+
+  public void setSpoutParallelism(Integer spoutParallelism) {
+    this.spoutParallelism = spoutParallelism;
+  }
+
+  /**
+   * Return the spout num tasks.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getSpoutNumTasks() {
+    return spoutNumTasks;
+  }
+
+  public void setSpoutNumTasks(Integer spoutNumTasks) {
+    this.spoutNumTasks = spoutNumTasks;
+  }
+
+  /**
+   * Return the parser parallelism.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Integer getParserParallelism() {
+    return parserParallelism;
+  }
+
+  public void setParserParallelism(Integer parserParallelism) {
+    this.parserParallelism = parserParallelism;
+  }
+
+  /**
+   * Return the parser number of tasks.  This property will be used for the 
parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getParserNumTasks() {
+    return parserNumTasks;
+  }
+
+  public void setParserNumTasks(Integer parserNumTasks) {
+    this.parserNumTasks = parserNumTasks;
+  }
+
+  /**
+   * Return the error writer bolt parallelism.  This property will be used for 
the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getErrorWriterParallelism() {
+    return errorWriterParallelism;
+  }
+
+  public void setErrorWriterParallelism(Integer errorWriterParallelism) {
+    this.errorWriterParallelism = errorWriterParallelism;
+  }
+
+  /**
+   * Return the error writer bolt number of tasks.  This property will be used 
for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Integer getErrorWriterNumTasks() {
+    return errorWriterNumTasks;
+  }
+
+  public void setErrorWriterNumTasks(Integer errorWriterNumTasks) {
+    this.errorWriterNumTasks = errorWriterNumTasks;
+  }
+
+  /**
+   * Return the spout config.  This includes kafka properties.  This property 
will be used for the parser unless overridden on the CLI.
+   * @return
+   */
+  public Map<String, Object> getSpoutConfig() {
+    return spoutConfig;
+  }
 
+  public void setSpoutConfig(Map<String, Object> spoutConfig) {
+    this.spoutConfig = spoutConfig;
+  }
+
+  /**
+   * Return security protocol to use.  This property will be used for the 
parser unless overridden on the CLI.
+   * The order of precedence is CLI > spout config > config in the sensor 
parser config.
+   * @return
+   */
+  public String getSecurityProtocol() {
+    return securityProtocol;
+  }
+
+  public void setSecurityProtocol(String securityProtocol) {
+    this.securityProtocol = securityProtocol;
+  }
+
+  /**
+   * Return Storm topologyconfig.  This property will be used for the parser 
unless overridden on the CLI.
+   * @return
+   */
+  public Map<String, Object> getStormConfig() {
+    return stormConfig;
+  }
+
+  public void setStormConfig(Map<String, Object> stormConfig) {
+    this.stormConfig = stormConfig;
+  }
+
+  /**
+   * Return whether or not to merge metadata sent into the message.  If true, 
then metadata become proper fields.
+   * @return
+   */
   public Boolean getMergeMetadata() {
     return mergeMetadata;
   }
@@ -46,6 +194,10 @@ public class SensorParserConfig implements Serializable {
     this.mergeMetadata = mergeMetadata;
   }
 
+  /**
+   * Return whether or not to read metadata at all.
+   * @return
+   */
   public Boolean getReadMetadata() {
     return readMetadata;
   }
@@ -145,10 +297,21 @@ public class SensorParserConfig implements Serializable {
             ", writerClassName='" + writerClassName + '\'' +
             ", errorWriterClassName='" + errorWriterClassName + '\'' +
             ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
-            ", parserConfig=" + parserConfig +
-            ", fieldTransformations=" + fieldTransformations +
             ", readMetadata=" + readMetadata +
             ", mergeMetadata=" + mergeMetadata +
+            ", numWorkers=" + numWorkers +
+            ", numAckers=" + numAckers +
+            ", spoutParallelism=" + spoutParallelism +
+            ", spoutNumTasks=" + spoutNumTasks +
+            ", parserParallelism=" + parserParallelism +
+            ", parserNumTasks=" + parserNumTasks +
+            ", errorWriterParallelism=" + errorWriterParallelism +
+            ", errorWriterNumTasks=" + errorWriterNumTasks +
+            ", spoutConfig=" + spoutConfig +
+            ", securityProtocol='" + securityProtocol + '\'' +
+            ", stormConfig=" + stormConfig +
+            ", parserConfig=" + parserConfig +
+            ", fieldTransformations=" + fieldTransformations +
             '}';
   }
 
@@ -171,12 +334,34 @@ public class SensorParserConfig implements Serializable {
       return false;
     if (getInvalidWriterClassName() != null ? 
!getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : 
that.getInvalidWriterClassName() != null)
       return false;
-    if (getParserConfig() != null ? 
!getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != 
null)
-      return false;
     if (getReadMetadata() != null ? 
!getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != 
null)
       return false;
     if (getMergeMetadata() != null ? 
!getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() 
!= null)
       return false;
+    if (getNumWorkers() != null ? 
!getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null)
+      return false;
+    if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : 
that.getNumAckers() != null)
+      return false;
+    if (getSpoutParallelism() != null ? 
!getSpoutParallelism().equals(that.getSpoutParallelism()) : 
that.getSpoutParallelism() != null)
+      return false;
+    if (getSpoutNumTasks() != null ? 
!getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() 
!= null)
+      return false;
+    if (getParserParallelism() != null ? 
!getParserParallelism().equals(that.getParserParallelism()) : 
that.getParserParallelism() != null)
+      return false;
+    if (getParserNumTasks() != null ? 
!getParserNumTasks().equals(that.getParserNumTasks()) : 
that.getParserNumTasks() != null)
+      return false;
+    if (getErrorWriterParallelism() != null ? 
!getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : 
that.getErrorWriterParallelism() != null)
+      return false;
+    if (getErrorWriterNumTasks() != null ? 
!getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : 
that.getErrorWriterNumTasks() != null)
+      return false;
+    if (getSpoutConfig() != null ? 
!getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null)
+      return false;
+    if (getSecurityProtocol() != null ? 
!getSecurityProtocol().equals(that.getSecurityProtocol()) : 
that.getSecurityProtocol() != null)
+      return false;
+    if (getStormConfig() != null ? 
!getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null)
+      return false;
+    if (getParserConfig() != null ? 
!getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != 
null)
+      return false;
     return getFieldTransformations() != null ? 
getFieldTransformations().equals(that.getFieldTransformations()) : 
that.getFieldTransformations() == null;
 
   }
@@ -189,10 +374,21 @@ public class SensorParserConfig implements Serializable {
     result = 31 * result + (getWriterClassName() != null ? 
getWriterClassName().hashCode() : 0);
     result = 31 * result + (getErrorWriterClassName() != null ? 
getErrorWriterClassName().hashCode() : 0);
     result = 31 * result + (getInvalidWriterClassName() != null ? 
getInvalidWriterClassName().hashCode() : 0);
-    result = 31 * result + (getParserConfig() != null ? 
getParserConfig().hashCode() : 0);
-    result = 31 * result + (getFieldTransformations() != null ? 
getFieldTransformations().hashCode() : 0);
     result = 31 * result + (getReadMetadata() != null ? 
getReadMetadata().hashCode() : 0);
     result = 31 * result + (getMergeMetadata() != null ? 
getMergeMetadata().hashCode() : 0);
+    result = 31 * result + (getNumWorkers() != null ? 
getNumWorkers().hashCode() : 0);
+    result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() 
: 0);
+    result = 31 * result + (getSpoutParallelism() != null ? 
getSpoutParallelism().hashCode() : 0);
+    result = 31 * result + (getSpoutNumTasks() != null ? 
getSpoutNumTasks().hashCode() : 0);
+    result = 31 * result + (getParserParallelism() != null ? 
getParserParallelism().hashCode() : 0);
+    result = 31 * result + (getParserNumTasks() != null ? 
getParserNumTasks().hashCode() : 0);
+    result = 31 * result + (getErrorWriterParallelism() != null ? 
getErrorWriterParallelism().hashCode() : 0);
+    result = 31 * result + (getErrorWriterNumTasks() != null ? 
getErrorWriterNumTasks().hashCode() : 0);
+    result = 31 * result + (getSpoutConfig() != null ? 
getSpoutConfig().hashCode() : 0);
+    result = 31 * result + (getSecurityProtocol() != null ? 
getSecurityProtocol().hashCode() : 0);
+    result = 31 * result + (getStormConfig() != null ? 
getStormConfig().hashCode() : 0);
+    result = 31 * result + (getParserConfig() != null ? 
getParserConfig().hashCode() : 0);
+    result = 31 * result + (getFieldTransformations() != null ? 
getFieldTransformations().hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 431ece2..31eeafe 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -81,7 +81,15 @@ public class ConfigurationFunctionsTest {
       "parserConfig" : { },
       "fieldTransformations" : [ ],
       "readMetadata":false,
-      "mergeMetadata":false
+      "mergeMetadata":false,
+      "parserParallelism" : 1,
+      "errorWriterParallelism" : 1,
+      "spoutNumTasks" : 1,
+      "stormConfig" : {},
+      "errorWriterNumTasks":1,
+      "spoutConfig":{},
+      "parserNumTasks":1,
+      "spoutParallelism":1
     }
    */
   @Multiline

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index 9de4341..141e232 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -103,6 +103,17 @@ then it is assumed to be a regex and will match any topic 
matching the pattern (
 * `mergeMetadata` : Boolean indicating whether to merge metadata with the 
message or not (`false` by default).  See below for a discussion about metadata.
 * `parserConfig` : A JSON Map representing the parser implementation specific 
configuration.
 * `fieldTransformations` : An array of complex objects representing the 
transformations to be done on the message generated from the parser before 
writing out to the kafka topic.
+* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can 
be overridden on the command line.
+* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This 
can be overridden on the command line.
+* `parserParallelism` : The parser bolt parallelism (default to `1`). This can 
be overridden on the command line.
+* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). 
This can be overridden on the command line.
+* `errorWriterParallelism` : The error writer bolt parallelism (default to 
`1`). This can be overridden on the command line.
+* `errorWriterNumTasks` : The number of tasks for the error writer bolt 
(default to `1`). This can be overridden on the command line.
+* `numWorkers` : The number of workers to use in the topology (default is the 
storm default of `1`).
+* `numAckers` : The number of acker executors to use in the topology (default 
is the storm default of `1`).
+* `spoutConfig` : A map representing a custom spout config (this is a map). 
This can be overridden on the command line.
+* `securityProtocol` : The security protocol to use for reading from kafka 
(this is a string).  This can be overridden on the command line and also 
specified in the spout config via the `security.protocol` key.  If both are 
specified, then they are merged and the CLI will take precedence.
+* `stormConfig` : The storm config to use (this is a map).  This can be 
overridden on the command line.  If both are specified, they are merged with 
CLI properties taking precedence.
 
 The `fieldTransformations` is a complex object which defines a
 transformation which can be done to a message.  This transformation can 

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index feac80b..c918703 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -18,9 +18,11 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
+import org.apache.storm.Config;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
@@ -48,39 +50,70 @@ import java.util.*;
  */
 public class ParserTopologyBuilder {
 
+  public static class ParserTopology {
+    private TopologyBuilder builder;
+    private Config topologyConfig;
+
+    private ParserTopology(TopologyBuilder builder, Config topologyConfig) {
+      this.builder = builder;
+      this.topologyConfig = topologyConfig;
+    }
+
+
+    public TopologyBuilder getBuilder() {
+      return builder;
+    }
+
+    public Config getTopologyConfig() {
+      return topologyConfig;
+    }
+  }
+
   /**
    * Builds a Storm topology that parses telemetry data received from an 
external sensor.
    *
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
    * @param sensorType               Type of sensor
-   * @param spoutParallelism         Parallelism hint for the spout
-   * @param spoutNumTasks            Number of tasks for the spout
-   * @param parserParallelism        Parallelism hint for the parser bolt
-   * @param parserNumTasks           Number of tasks for the parser bolt
-   * @param errorWriterParallelism   Parallelism hint for the bolt that 
handles errors
-   * @param errorWriterNumTasks      Number of tasks for the bolt that handles 
errors
-   * @param kafkaSpoutConfig         Configuration options for the kafka spout
+   * @param spoutParallelismSupplier         Supplier for the parallelism hint 
for the spout
+   * @param spoutNumTasksSupplier            Supplier for the number of tasks 
for the spout
+   * @param parserParallelismSupplier        Supplier for the parallelism hint 
for the parser bolt
+   * @param parserNumTasksSupplier           Supplier for the number of tasks 
for the parser bolt
+   * @param errorWriterParallelismSupplier   Supplier for the parallelism hint 
for the bolt that handles errors
+   * @param errorWriterNumTasksSupplier      Supplier for the number of tasks 
for the bolt that handles errors
+   * @param kafkaSpoutConfigSupplier         Supplier for the configuration 
options for the kafka spout
+   * @param securityProtocolSupplier         Supplier for the security protocol
+   * @param outputTopic                      The output kafka topic
+   * @param stormConfigSupplier              Supplier for the storm config
    * @return A Storm topology that parses telemetry data received from an 
external sensor
    * @throws Exception
    */
-  public static TopologyBuilder build(String zookeeperUrl,
+  public static ParserTopology build(String zookeeperUrl,
                                       Optional<String> brokerUrl,
                                       String sensorType,
-                                      int spoutParallelism,
-                                      int spoutNumTasks,
-                                      int parserParallelism,
-                                      int parserNumTasks,
-                                      int errorWriterParallelism,
-                                      int errorWriterNumTasks,
-                                      Map<String, Object> kafkaSpoutConfig,
-                                      Optional<String> securityProtocol,
-                                      Optional<String> outputTopic
+                                      ValueSupplier<Integer> 
spoutParallelismSupplier,
+                                      ValueSupplier<Integer> 
spoutNumTasksSupplier,
+                                      ValueSupplier<Integer> 
parserParallelismSupplier,
+                                      ValueSupplier<Integer> 
parserNumTasksSupplier,
+                                      ValueSupplier<Integer> 
errorWriterParallelismSupplier,
+                                      ValueSupplier<Integer> 
errorWriterNumTasksSupplier,
+                                      ValueSupplier<Map> 
kafkaSpoutConfigSupplier,
+                                      ValueSupplier<String> 
securityProtocolSupplier,
+                                      Optional<String> outputTopic,
+                                      ValueSupplier<Config> stormConfigSupplier
   ) throws Exception {
 
     // fetch configuration from zookeeper
     ParserConfigurations configs = new ParserConfigurations();
     SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, 
sensorType, configs);
+    int spoutParallelism = spoutParallelismSupplier.get(parserConfig, 
Integer.class);
+    int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class);
+    int parserParallelism = parserParallelismSupplier.get(parserConfig, 
Integer.class);
+    int parserNumTasks = parserNumTasksSupplier.get(parserConfig, 
Integer.class);
+    int errorWriterParallelism = 
errorWriterParallelismSupplier.get(parserConfig, Integer.class);
+    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, 
Integer.class);
+    Map<String, Object> kafkaSpoutConfig = 
kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
+    Optional<String> securityProtocol = 
Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
@@ -102,7 +135,7 @@ public class ParserTopologyBuilder {
               .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    return builder;
+    return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, 
Config.class));
   }
 
   /**
@@ -247,16 +280,16 @@ public class ParserTopologyBuilder {
    * @throws Exception
    */
   private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, 
String sensorType, ParserConfigurations configs) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
-    client.start();
-    ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
-    SensorParserConfig parserConfig = 
configs.getSensorParserConfig(sensorType);
-    if (parserConfig == null) {
-      throw new IllegalStateException("Cannot find the parser configuration in 
zookeeper for " + sensorType + "." +
-              "  Please check that it exists in zookeeper by using the 
'zk_load_configs.sh -m DUMP' command.");
+    try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) 
{
+      client.start();
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
+      SensorParserConfig parserConfig = 
configs.getSensorParserConfig(sensorType);
+      if (parserConfig == null) {
+        throw new IllegalStateException("Cannot find the parser configuration 
in zookeeper for " + sensorType + "." +
+                "  Please check that it exists in zookeeper by using the 
'zk_load_configs.sh -m DUMP' command.");
+      }
+      return parserConfig;
     }
-    client.close();
-    return parserConfig;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 8ff4f93..b5ee628 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -18,10 +18,14 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.metron.common.Constants;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -235,12 +239,19 @@ public class ParserTopologyCLI {
       return has(cli)?cli.getOptionValue(shortCode):def;
     }
 
-    public static Config getConfig(CommandLine cli) {
-      Config config = new Config();
+    public static Optional<Config> getConfig(CommandLine cli) {
+      return getConfig(cli, new Config());
+    }
+
+    public static Optional<Config> getConfig(CommandLine cli, Config config) {
+      if(EXTRA_OPTIONS.has(cli)) {
+        Map<String, Object> extraOptions = readJSONMapFromFile(new 
File(EXTRA_OPTIONS.get(cli)));
+        config.putAll(extraOptions);
+      }
       for(ParserOptions option : ParserOptions.values()) {
         config = option.configHandler.apply(new Arg(config, option.get(cli)));
       }
-      return config;
+      return config.isEmpty()?Optional.empty():Optional.of(config);
     }
 
     public static CommandLine parse(CommandLineParser parser, String[] args) 
throws ParseException {
@@ -273,65 +284,172 @@ public class ParserTopologyCLI {
     }
   }
 
+  private static CommandLine parse(Options options, String[] args) {
+    CommandLineParser parser = new PosixParser();
+    try {
+      return ParserOptions.parse(parser, args);
+    } catch (ParseException pe) {
+      pe.printStackTrace();
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
+      System.exit(-1);
+      return null;
+    }
+  }
+
+  public ParserTopologyBuilder.ParserTopology createParserTopology(final 
CommandLine cmd) throws Exception {
+    String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
+    Optional<String> brokerUrl = 
ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+    String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+
+    /*
+    It bears mentioning why we're creating this ValueSupplier indirection here.
+    As a separation of responsibilities, the CLI class defines the order of 
precedence
+    for the various topological and structural properties for creating a 
parser.  This is
+    desirable because there are now (i.e. integration tests)
+    and may be in the future (i.e. a REST service to start parsers without 
using the CLI)
+    other mechanisms to construct parser topologies.  It's sensible to split 
those concerns..
+
+    Unfortunately, determining the structural parameters for a parser requires 
interacting with
+    external services (e.g. zookeeper) that are set up well within the 
ParserTopology class.
+    Rather than pulling the infrastructure to interact with those services out 
and moving it into the
+    CLI class and breaking that separation of concerns, we've created a 
supplier
+    indirection where are providing the logic as to how to create precedence 
in the CLI class
+    without owning the responsibility of constructing the infrastructure where 
the values are
+    necessarily supplied.
+
+     */
+    ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
+    };
+    ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
+    };
+    ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, 
"1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
+        return 
Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, 
"1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
+        return readJSONMapFromFile(new 
File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new 
HashMap<>());
+    };
+
+    ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
+      Optional<String> sp = Optional.empty();
+      if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
+        sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd));
+      }
+      if (!sp.isPresent()) {
+        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class));
+      }
+      return 
sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
+    };
+
+    ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
+      Map<String, Object> c = parserConfig.getStormConfig();
+      Config finalConfig = new Config();
+      if(c != null && !c.isEmpty()) {
+        finalConfig.putAll(c);
+      }
+      if(parserConfig.getNumAckers() != null) {
+        Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
+      }
+      if(parserConfig.getNumWorkers() != null) {
+        Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+      }
+      return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
+    };
+
+    Optional<String> outputTopic = 
ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+
+    return getParserTopology(zookeeperUrl, brokerUrl, sensorType, 
spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, 
errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, 
outputTopic);
+  }
+
+  protected ParserTopologyBuilder.ParserTopology getParserTopology( String 
zookeeperUrl
+                                                                  , 
Optional<String> brokerUrl
+                                                                  , String 
sensorType
+                                                                  , 
ValueSupplier<Integer> spoutParallelism
+                                                                  , 
ValueSupplier<Integer> spoutNumTasks
+                                                                  , 
ValueSupplier<Integer> parserParallelism
+                                                                  , 
ValueSupplier<Integer> parserNumTasks
+                                                                  , 
ValueSupplier<Integer> errorParallelism
+                                                                  , 
ValueSupplier<Integer> errorNumTasks
+                                                                  , 
ValueSupplier<Map> spoutConfig
+                                                                  , 
ValueSupplier<String> securityProtocol
+                                                                  , 
ValueSupplier<Config> stormConf
+                                                                  , 
Optional<String> outputTopic
+                                                                  ) throws 
Exception
+  {
+    return ParserTopologyBuilder.build(zookeeperUrl,
+                brokerUrl,
+                sensorType,
+                spoutParallelism,
+                spoutNumTasks,
+                parserParallelism,
+                parserNumTasks,
+                errorParallelism,
+                errorNumTasks,
+                spoutConfig,
+                securityProtocol,
+                outputTopic,
+                stormConf
+        );
+  }
+
+
   public static void main(String[] args) {
-    Options options = new Options();
 
     try {
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = null;
-      try {
-        cmd = ParserOptions.parse(parser, args);
-      } catch (ParseException pe) {
-        pe.printStackTrace();
-        final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printHelp("ParserTopologyCLI", null, options, null, 
true);
-        System.exit(-1);
-      }
+      Options options = new Options();
+      final CommandLine cmd = parse(options, args);
       if (cmd.hasOption("h")) {
         final HelpFormatter usageFormatter = new HelpFormatter();
         usageFormatter.printHelp("ParserTopologyCLI", null, options, null, 
true);
         System.exit(0);
       }
-      String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);;
-      Optional<String> brokerUrl = 
ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+      ParserTopologyCLI cli = new ParserTopologyCLI();
+      ParserTopologyBuilder.ParserTopology topology = 
cli.createParserTopology(cmd);
       String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
-      int spoutParallelism = 
Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
-      int spoutNumTasks = 
Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
-      int parserParallelism = 
Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
-      int parserNumTasks= 
Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
-      int errorParallelism = 
Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
-      int errorNumTasks= 
Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
-      int invalidParallelism = 
Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
-      int invalidNumTasks= 
Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
-      Map<String, Object> spoutConfig = new HashMap<>();
-      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
-        spoutConfig = readSpoutConfig(new 
File(ParserOptions.SPOUT_CONFIG.get(cmd)));
-      }
-      Optional<String> outputTopic = 
ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
-      Optional<String> securityProtocol = 
ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
-      securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
-      TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
-              brokerUrl,
-              sensorType,
-              spoutParallelism,
-              spoutNumTasks,
-              parserParallelism,
-              parserNumTasks,
-              errorParallelism,
-              errorNumTasks,
-              spoutConfig,
-              securityProtocol,
-              outputTopic
-      );
-      Config stormConf = ParserOptions.getConfig(cmd);
       if (ParserOptions.TEST.has(cmd)) {
-        stormConf.put(Config.TOPOLOGY_DEBUG, true);
+        topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorType, stormConf, 
builder.createTopology());
+        cluster.submitTopology(sensorType, topology.getTopologyConfig(), 
topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorType, stormConf, 
builder.createTopology());
+        StormSubmitter.submitTopology(sensorType, 
topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -347,13 +465,13 @@ public class ParserTopologyCLI {
     if(!ret.isPresent()) {
       ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
     }
-    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) {
       ret = Optional.empty();
     }
     return ret;
   }
 
-  private static Map<String, Object> readSpoutConfig(File inputFile) {
+  private static Map<String, Object> readJSONMapFromFile(File inputFile) {
     String json = null;
     if (inputFile.exists()) {
       try {

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
new file mode 100644
index 0000000..0ede0f8
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.topology.config;
+
+import org.apache.metron.common.configuration.SensorParserConfig;
+
+
+/**
+ * Supplies a value given a sensor config.
+ * @param <T>
+ */
+public interface ValueSupplier<T> {
+  T get(SensorParserConfig config, Class<T> clazz);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 3e8e2db..63d9e52 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -87,23 +87,29 @@ public class ParserTopologyComponent implements 
InMemoryComponent {
   @Override
   public void start() throws UnableToStartException {
     try {
-      TopologyBuilder topologyBuilder = 
ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
+      final Map<String, Object> stormConf = new HashMap<>();
+      stormConf.put(Config.TOPOLOGY_DEBUG, true);
+      ParserTopologyBuilder.ParserTopology topologyBuilder = 
ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
                                                                    , 
Optional.ofNullable(brokerUrl)
                                                                    , sensorType
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , null
-                                                                   , 
Optional.empty()
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 
new HashMap<>()
+                                                                   , (x,y) -> 
null
                                                                    , 
Optional.ofNullable(outputTopic)
+                                                                   , (x,y) -> {
+                                                                      Config c 
= new Config();
+                                                                      
c.putAll(stormConf);
+                                                                      return c;
+                                                                      }
                                                                    );
-      Map<String, Object> stormConf = new HashMap<>();
-      stormConf.put(Config.TOPOLOGY_DEBUG, true);
+
       stormCluster = new LocalCluster();
-      stormCluster.submitTopology(sensorType, stormConf, 
topologyBuilder.createTopology());
+      stormCluster.submitTopology(sensorType, stormConf, 
topologyBuilder.getBuilder().createTopology());
     } catch (Exception e) {
       throw new UnableToStartException("Unable to start parser topology for 
sensorType: " + sensorType, e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/38eebd59/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 5f536a5..97dac5a 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -18,8 +18,12 @@
 
 package org.apache.metron.parsers.topology;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.cli.Parser;
 import org.apache.log4j.Level;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.Config;
 import com.google.common.collect.ImmutableMap;
@@ -34,7 +38,10 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.ref.Reference;
 import java.util.*;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 public class ParserTopologyCLITest {
 
@@ -142,7 +149,8 @@ public class ParserTopologyCLITest {
                                      
.with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
                                      
.with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
                                      .build(longOpt);
-    Config config = ParserTopologyCLI.ParserOptions.getConfig(cli);
+    Optional<Config> configOptional = 
ParserTopologyCLI.ParserOptions.getConfig(cli);
+    Config config = configOptional.get();
     Assert.assertEquals(1, config.get(Config.TOPOLOGY_WORKERS));
     Assert.assertEquals(2, config.get(Config.TOPOLOGY_ACKER_EXECUTORS));
     Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
@@ -189,7 +197,8 @@ public class ParserTopologyCLITest {
               .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
               .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, 
extraFile.getAbsolutePath())
               .build(longOpt);
-      Config config = ParserTopologyCLI.ParserOptions.getConfig(cli);
+      Optional<Config> configOptional = 
ParserTopologyCLI.ParserOptions.getConfig(cli);
+      Config config = configOptional.get();
       Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
       Assert.assertEquals("foo", config.get("string"));
       Assert.assertEquals(1, config.get("integer"));
@@ -197,4 +206,416 @@ public class ParserTopologyCLITest {
       extraFile.deleteOnExit();
     }
   }
+
+  private static class ParserInput {
+    private Integer spoutParallelism;
+    private Integer spoutNumTasks;
+    private Integer parserParallelism;
+    private Integer parserNumTasks;
+    private Integer errorParallelism;
+    private Integer errorNumTasks;
+    private Map<String, Object> spoutConfig;
+    private String securityProtocol;
+    private Config stormConf;
+
+    public ParserInput( ValueSupplier<Integer> spoutParallelism
+                      , ValueSupplier<Integer> spoutNumTasks
+                      , ValueSupplier<Integer> parserParallelism
+                      , ValueSupplier<Integer> parserNumTasks
+                      , ValueSupplier<Integer> errorParallelism
+                      , ValueSupplier<Integer> errorNumTasks
+                      , ValueSupplier<Map> spoutConfig
+                      , ValueSupplier<String> securityProtocol
+                      , ValueSupplier<Config> stormConf
+                      , SensorParserConfig config
+                      )
+    {
+      this.spoutParallelism = spoutParallelism.get(config, Integer.class);
+      this.spoutNumTasks = spoutNumTasks.get(config, Integer.class);
+      this.parserParallelism = parserParallelism.get(config, Integer.class);
+      this.parserNumTasks = parserNumTasks.get(config, Integer.class);
+      this.errorParallelism = errorParallelism.get(config, Integer.class);
+      this.errorNumTasks = errorNumTasks.get(config, Integer.class);
+      this.spoutConfig = spoutConfig.get(config, Map.class);
+      this.securityProtocol = securityProtocol.get(config, String.class);
+      this.stormConf = stormConf.get(config, Config.class);
+    }
+
+    public Integer getSpoutParallelism() {
+      return spoutParallelism;
+    }
+
+    public Integer getSpoutNumTasks() {
+      return spoutNumTasks;
+    }
+
+    public Integer getParserParallelism() {
+      return parserParallelism;
+    }
+
+    public Integer getParserNumTasks() {
+      return parserNumTasks;
+    }
+
+    public Integer getErrorParallelism() {
+      return errorParallelism;
+    }
+
+    public Integer getErrorNumTasks() {
+      return errorNumTasks;
+    }
+
+    public Map<String, Object> getSpoutConfig() {
+      return spoutConfig;
+    }
+
+    public String getSecurityProtocol() {
+      return securityProtocol;
+    }
+
+    public Config getStormConf() {
+      return stormConf;
+    }
+  }
+  /**
+{
+  "parserClassName": "org.apache.metron.parsers.GrokParser",
+  "sensorTopic": "squid",
+  "parserConfig": {
+    "grokPath": "/patterns/squid",
+    "patternLabel": "SQUID_DELIMITED",
+    "timestampField": "timestamp"
+  },
+  "fieldTransformations" : [
+    {
+      "transformation" : "STELLAR"
+    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
+    ,"config" : {
+      "full_hostname" : "URL_TO_HOST(url)"
+      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
+                }
+    }
+                           ]
+}
+   */
+  @Multiline
+  public static String baseConfig;
+  private static SensorParserConfig getBaseConfig() {
+    try {
+      return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
+    } catch (IOException e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+
+  @Test
+  public void testSpoutParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
+                    , "10"
+                    , input -> input.getSpoutParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSpoutParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getSpoutParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testSpoutNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
+                    , "10"
+                    , input -> input.getSpoutNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSpoutNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getSpoutNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testParserParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
+                    , "10"
+                    , input -> input.getParserParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setParserParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getParserParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testParserNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
+                    , "10"
+                    , input -> input.getParserNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setParserNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getParserNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testErrorParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
+                    , "10"
+                    , input -> input.getErrorParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setErrorWriterParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getErrorParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testErrorNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_NUM_TASKS
+                    , "10"
+                    , input -> input.getErrorNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setErrorWriterNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getErrorNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromCLI() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+                    , "PLAINTEXT"
+                    , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSecurityProtocol("KERBEROS");
+                      return config;
+                    }
+                    , input -> input.getSecurityProtocol().equals("KERBEROS")
+                    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromSpout() throws Exception {
+    //Ultimately the order of precedence is CLI > spout config > parser config
+    File extraConfig = File.createTempFile("spoutConfig", "json");
+      extraConfig.deleteOnExit();
+      writeMap(extraConfig, new HashMap<String, Object>() {{
+        put("security.protocol", "PLAINTEXTSASL");
+      }});
+    {
+      //Ensure that the CLI spout config takes precedence
+
+      testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class) {{
+                         put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, 
extraConfig.getAbsolutePath());
+                         
put(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL, "PLAINTEXT");
+                       }}
+              , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+              , () -> {
+                SensorParserConfig config = getBaseConfig();
+                config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
+                return config;
+              }
+              , input -> 
input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
+      );
+    }
+    {
+      //Ensure that the spout config takes precedence
+      testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class) {{
+                         put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, 
extraConfig.getAbsolutePath());
+                       }}
+              , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL")
+              , () -> {
+                SensorParserConfig config = getBaseConfig();
+                config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
+                return config;
+              }
+              , input -> 
input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
+      );
+    }
+  }
+
+  @Test
+  public void testTopologyConfig_fromConfigExplicitly() throws Exception {
+    testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
+                      put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
+                    }}
+                    , input -> {
+                        Config c = input.getStormConf();
+                        return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
+                            && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 
20;
+                      }
+                      , () -> {
+                        SensorParserConfig config = getBaseConfig();
+                        config.setNumWorkers(100);
+                        config.setNumAckers(200);
+                        return config;
+                              }
+                      , input -> {
+                          Config c = input.getStormConf();
+                          return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
+                              && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) 
== 200
+                                  ;
+                                 }
+                    );
+  }
+
+  @Test
+  public void testTopologyConfig() throws Exception {
+    File extraConfig = File.createTempFile("topologyConfig", "json");
+    extraConfig.deleteOnExit();
+    writeMap(extraConfig, new HashMap<String, Object>() {{
+      put(Config.TOPOLOGY_DEBUG, true);
+    }});
+    testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
+                      put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
+                      put(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, 
extraConfig.getAbsolutePath());
+                    }}
+                    , input -> {
+                        Config c = input.getStormConf();
+                        return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
+                            && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 
20
+                            && (boolean)c.get(Config.TOPOLOGY_DEBUG);
+                      }
+                      , () -> {
+                        SensorParserConfig config = getBaseConfig();
+                        config.setStormConfig(
+                          new HashMap<String, Object>() {{
+                            put(Config.TOPOLOGY_WORKERS, 100);
+                            put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
+                          }}
+                                             );
+                        return config;
+                              }
+                      , input -> {
+                          Config c = input.getStormConf();
+                          return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
+                              && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) 
== 200
+                              && !c.containsKey(Config.TOPOLOGY_DEBUG);
+                                 }
+                    );
+  }
+
+  @Test
+  public void testSpoutConfig() throws Exception {
+    File extraConfig = File.createTempFile("spoutConfig", "json");
+    extraConfig.deleteOnExit();
+    writeMap(extraConfig, new HashMap<String, Object>() {{
+      put("extra_config", "from_file");
+    }});
+    EnumMap<ParserTopologyCLI.ParserOptions, String> cliOptions = new 
EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, 
extraConfig.getAbsolutePath());
+                    }};
+    Predicate<ParserInput> cliOverrideExpected = input -> {
+      return input.getSpoutConfig().get("extra_config").equals("from_file");
+    };
+
+    Predicate<ParserInput> configOverrideExpected = input -> {
+      return input.getSpoutConfig().get("extra_config").equals("from_zk")
+                                  ;
+    };
+
+    Supplier<SensorParserConfig> configSupplier = () -> {
+      SensorParserConfig config = getBaseConfig();
+      config.setSpoutConfig(
+              new HashMap<String, Object>() {{
+                put("extra_config", "from_zk");
+              }}
+      );
+      return config;
+    };
+    testConfigOption( cliOptions
+                    , cliOverrideExpected
+                    , configSupplier
+                    , configOverrideExpected
+                    );
+  }
+
+  private void writeMap(File outFile, Map<String, Object> config) throws 
IOException {
+    FileUtils.write(outFile, JSONUtils.INSTANCE.toJSON(config, true));
+  }
+
+  private void testConfigOption( ParserTopologyCLI.ParserOptions option
+                               , String cliOverride
+                               , Predicate<ParserInput> cliOverrideCondition
+                               , Supplier<SensorParserConfig> configSupplier
+                               , Predicate<ParserInput> configOverrideCondition
+  ) throws Exception {
+    testConfigOption(
+            new EnumMap<ParserTopologyCLI.ParserOptions, 
String>(ParserTopologyCLI.ParserOptions.class) {{
+              put(option, cliOverride);
+            }},
+            cliOverrideCondition,
+            configSupplier,
+            configOverrideCondition
+    );
+  }
+
+  private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, 
String> options
+                               , Predicate<ParserInput> cliOverrideCondition
+                               , Supplier<SensorParserConfig> configSupplier
+                               , Predicate<ParserInput> configOverrideCondition
+  ) throws Exception {
+    //CLI Override
+    SensorParserConfig config = configSupplier.get();
+    {
+      CLIBuilder builder = new 
CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+              .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+      for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : 
options.entrySet()) {
+        builder.with(entry.getKey(), entry.getValue());
+      }
+      CommandLine cmd = builder.build(true);
+      ParserInput input = getInput(cmd, config);
+      Assert.assertTrue(cliOverrideCondition.test(input));
+    }
+    // Config Override
+    {
+      CLIBuilder builder = new 
CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+              .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+      CommandLine cmd = builder.build(true);
+      ParserInput input = getInput(cmd, config);
+      Assert.assertTrue(configOverrideCondition.test(input));
+    }
+  }
+
+  private static ParserInput getInput(CommandLine cmd, SensorParserConfig 
config ) throws Exception {
+    final ParserInput[] parserInput = new ParserInput[]{null};
+    new ParserTopologyCLI() {
+      @Override
+      protected ParserTopologyBuilder.ParserTopology getParserTopology(String 
zookeeperUrl, Optional<String> brokerUrl, String sensorType, 
ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, 
ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> 
parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> 
errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> 
securityProtocol, ValueSupplier<Config> stormConf, Optional<String> 
outputTopic) throws Exception {
+       parserInput[0] = new ParserInput( spoutParallelism
+                                        , spoutNumTasks
+                                        , parserParallelism
+                                        , parserNumTasks
+                                        , errorParallelism
+                                        , errorNumTasks
+                                        , spoutConfig
+                                        , securityProtocol
+                                        , stormConf
+                                        , config
+                                        );
+        return null;
+      }
+    }.createParserTopology(cmd);
+    return parserInput[0];
+  }
+
 }

Reply via email to