METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bb926df Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bb926df Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bb926df Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 3bb926df5d253a907bbf8dab4b76b78dd32993ea Parents: 2b4f0b8 Author: nickwallen <n...@nickallen.org> Authored: Wed May 2 15:06:03 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed May 2 15:06:03 2018 -0400 ---------------------------------------------------------------------- .../org/apache/metron/common/Constants.java | 10 +- .../configuration/SensorParserConfig.java | 410 +++++++++++-------- .../parsers/topology/ParserTopologyBuilder.java | 139 ++++--- .../parsers/topology/ParserTopologyCLI.java | 147 +++++-- .../components/ParserTopologyComponent.java | 80 ++-- .../parsers/topology/ParserTopologyCLITest.java | 122 ++++-- ...pleHbaseEnrichmentWriterIntegrationTest.java | 69 ++-- .../integration/WriterBoltIntegrationTest.java | 109 ++--- .../apache/metron/writer/kafka/KafkaWriter.java | 5 + 9 files changed, 676 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java index b939a92..12b541c 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java @@ -17,9 +17,7 @@ */ package org.apache.metron.common; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; public class Constants { @@ -37,9 +35,17 @@ public class Constants { public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel"; public static final String GUID = "guid"; + /** + * The key in the global configuration that defines the global parser error topic. + * + * <p>This value is used only if the error topic is left undefined in a sensor's parser configuration. + */ + public static final String PARSER_ERROR_TOPIC_GLOBALS_KEY = "parser.error.topic"; + public interface Field { String getName(); } + public enum Fields implements Field { SRC_ADDR("ip_src_addr") ,SRC_PORT("ip_src_port") http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java index d347481..1dfb045 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java @@ -18,6 +18,9 @@ package org.apache.metron.common.configuration; import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.metron.common.utils.JSONUtils; import java.io.IOException; @@ -27,35 +30,171 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * The configuration object that defines a parser for a given sensor. Each + * sensor has its own parser configuration. + */ public class SensorParserConfig implements Serializable { + /** + * The class name of the parser. + */ private String parserClassName; + + /** + * Allows logic to be defined to filter or ignore messages. Messages that have been + * filtered will not be parsed. + * + * This should be a fully qualified name of a class that implements the + * org.apache.metron.parsers.interfaces.MessageFilter interface. + */ private String filterClassName; + + /** + * The input topic containing the sensor telemetry to parse. + */ private String sensorTopic; + + /** + * The output topic where the parsed telemetry will be written. + */ + private String outputTopic; + + /** + * The error topic where errors are written to. + */ + private String errorTopic; + + /** + * The fully qualified name of a class used to write messages + * to the output topic. + * + * <p>A sensible default is provided. + */ private String writerClassName; + + /** + * The fully qualified name of a class used to write messages + * to the error topic. + * + * <p>A sensible default is provided. + */ private String errorWriterClassName; - private String invalidWriterClassName; + + /** + * Determines if parser metadata is made available to the parser's field + * transformations. If true, the parser field transformations can access + * parser metadata values. + * + * <p>By default, this is false and parser metadata is not available + * to the field transformations. + */ private Boolean readMetadata = false; + + /** + * Determines if parser metadata is automatically merged into the message. If + * true, parser metadata values will appear as fields within the message. + * + * <p>By default, this is false and metadata is not merged. + */ private Boolean mergeMetadata = false; + + /** + * The number of workers for the topology. + * + * <p>This property can be overridden on the CLI. + */ private Integer numWorkers = null; + + /** + * The number of ackers for the topology. + * + * <p>This property can be overridden on the CLI. + */ private Integer numAckers= null; + + /** + * The parallelism of the Kafka spout. + * + * <p>This property can be overridden on the CLI. + */ private Integer spoutParallelism = 1; + + /** + * The number of tasks for the Kafka spout. + * + * <p>This property can be overridden on the CLI. + */ private Integer spoutNumTasks = 1; + + /** + * The parallelism of the parser bolt. + * + * <p>This property can be overridden on the CLI. + */ private Integer parserParallelism = 1; + + /** + * The number of tasks for the parser bolt. + * + * <p>This property can be overridden on the CLI. + */ private Integer parserNumTasks = 1; + + /** + * The parallelism of the error writer bolt. + * + * <p>This property can be overridden on the CLI. + */ private Integer errorWriterParallelism = 1; + + /** + * The number of tasks for the error writer bolt. + * + * <p>This property can be overridden on the CLI. + */ private Integer errorWriterNumTasks = 1; - private Map<String, Object> cacheConfig = new HashMap<>(); + + /** + * Configuration properties passed to the Kafka spout. + * + * <p>This property can be overridden on the CLI. + */ private Map<String, Object> spoutConfig = new HashMap<>(); + + /** + * The Kafka security protocol. + * + * <p>This property can be overridden on the CLI. This property can also be overridden by the spout config. + */ private String securityProtocol = null; + + /** + * Configuration properties passed to the storm topology. + * + * <p>This property can be overridden on the CLI. + */ private Map<String, Object> stormConfig = new HashMap<>(); /** - * Cache config for stellar field transformations. - * * stellar.cache.maxSize - The maximum number of elements in the cache. - * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes). - * @return + * Configuration for the parser. + */ + private Map<String, Object> parserConfig = new HashMap<>(); + + /** + * The field transformations applied to the parsed messages. These allow fields + * of the parsed message to be transformed. */ + private List<FieldTransformer> fieldTransformations = new ArrayList<>(); + + /** + * Configures the cache that backs stellar field transformations. + * + * <li>stellar.cache.maxSize - The maximum number of elements in the cache. + * <li>stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes). + */ + private Map<String, Object> cacheConfig = new HashMap<>(); + public Map<String, Object> getCacheConfig() { return cacheConfig; } @@ -64,10 +203,6 @@ public class SensorParserConfig implements Serializable { this.cacheConfig = cacheConfig; } - /** - * Return the number of workers for the topology. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getNumWorkers() { return numWorkers; } @@ -76,10 +211,6 @@ public class SensorParserConfig implements Serializable { this.numWorkers = numWorkers; } - /** - * Return the number of ackers for the topology. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getNumAckers() { return numAckers; } @@ -88,10 +219,6 @@ public class SensorParserConfig implements Serializable { this.numAckers = numAckers; } - /** - * Return the spout parallelism. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getSpoutParallelism() { return spoutParallelism; } @@ -100,10 +227,6 @@ public class SensorParserConfig implements Serializable { this.spoutParallelism = spoutParallelism; } - /** - * Return the spout num tasks. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getSpoutNumTasks() { return spoutNumTasks; } @@ -112,10 +235,6 @@ public class SensorParserConfig implements Serializable { this.spoutNumTasks = spoutNumTasks; } - /** - * Return the parser parallelism. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getParserParallelism() { return parserParallelism; } @@ -124,10 +243,6 @@ public class SensorParserConfig implements Serializable { this.parserParallelism = parserParallelism; } - /** - * Return the parser number of tasks. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getParserNumTasks() { return parserNumTasks; } @@ -136,10 +251,6 @@ public class SensorParserConfig implements Serializable { this.parserNumTasks = parserNumTasks; } - /** - * Return the error writer bolt parallelism. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getErrorWriterParallelism() { return errorWriterParallelism; } @@ -148,10 +259,6 @@ public class SensorParserConfig implements Serializable { this.errorWriterParallelism = errorWriterParallelism; } - /** - * Return the error writer bolt number of tasks. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Integer getErrorWriterNumTasks() { return errorWriterNumTasks; } @@ -160,10 +267,6 @@ public class SensorParserConfig implements Serializable { this.errorWriterNumTasks = errorWriterNumTasks; } - /** - * Return the spout config. This includes kafka properties. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Map<String, Object> getSpoutConfig() { return spoutConfig; } @@ -172,11 +275,6 @@ public class SensorParserConfig implements Serializable { this.spoutConfig = spoutConfig; } - /** - * Return security protocol to use. This property will be used for the parser unless overridden on the CLI. - * The order of precedence is CLI > spout config > config in the sensor parser config. - * @return - */ public String getSecurityProtocol() { return securityProtocol; } @@ -185,10 +283,6 @@ public class SensorParserConfig implements Serializable { this.securityProtocol = securityProtocol; } - /** - * Return Storm topologyconfig. This property will be used for the parser unless overridden on the CLI. - * @return - */ public Map<String, Object> getStormConfig() { return stormConfig; } @@ -197,10 +291,6 @@ public class SensorParserConfig implements Serializable { this.stormConfig = stormConfig; } - /** - * Return whether or not to merge metadata sent into the message. If true, then metadata become proper fields. - * @return - */ public Boolean getMergeMetadata() { return mergeMetadata; } @@ -209,10 +299,6 @@ public class SensorParserConfig implements Serializable { this.mergeMetadata = mergeMetadata; } - /** - * Return whether or not to read metadata at all. - * @return - */ public Boolean getReadMetadata() { return readMetadata; } @@ -229,22 +315,13 @@ public class SensorParserConfig implements Serializable { this.errorWriterClassName = errorWriterClassName; } - public String getInvalidWriterClassName() { - return invalidWriterClassName; - } - - public void setInvalidWriterClassName(String invalidWriterClassName) { - this.invalidWriterClassName = invalidWriterClassName; - } - public String getWriterClassName() { return writerClassName; } + public void setWriterClassName(String classNames) { this.writerClassName = classNames; } - private Map<String, Object> parserConfig = new HashMap<>(); - private List<FieldTransformer> fieldTransformations = new ArrayList<>(); public List<FieldTransformer> getFieldTransformations() { return fieldTransformations; @@ -278,6 +355,22 @@ public class SensorParserConfig implements Serializable { this.sensorTopic = sensorTopic; } + public String getOutputTopic() { + return outputTopic; + } + + public void setOutputTopic(String outputTopic) { + this.outputTopic = outputTopic; + } + + public String getErrorTopic() { + return errorTopic; + } + + public void setErrorTopic(String errorTopic) { + this.errorTopic = errorTopic; + } + public Map<String, Object> getParserConfig() { return parserConfig; } @@ -298,112 +391,103 @@ public class SensorParserConfig implements Serializable { } } - public String toJSON() throws JsonProcessingException { return JSONUtils.INSTANCE.toJSON(this, true); } @Override - public String toString() { - return "SensorParserConfig{" + - "parserClassName='" + parserClassName + '\'' + - ", filterClassName='" + filterClassName + '\'' + - ", sensorTopic='" + sensorTopic + '\'' + - ", writerClassName='" + writerClassName + '\'' + - ", errorWriterClassName='" + errorWriterClassName + '\'' + - ", invalidWriterClassName='" + invalidWriterClassName + '\'' + - ", readMetadata=" + readMetadata + - ", mergeMetadata=" + mergeMetadata + - ", numWorkers=" + numWorkers + - ", numAckers=" + numAckers + - ", spoutParallelism=" + spoutParallelism + - ", spoutNumTasks=" + spoutNumTasks + - ", parserParallelism=" + parserParallelism + - ", parserNumTasks=" + parserNumTasks + - ", errorWriterParallelism=" + errorWriterParallelism + - ", errorWriterNumTasks=" + errorWriterNumTasks + - ", spoutConfig=" + spoutConfig + - ", securityProtocol='" + securityProtocol + '\'' + - ", stormConfig=" + stormConfig + - ", parserConfig=" + parserConfig + - ", fieldTransformations=" + fieldTransformations + - '}'; - } - - @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SensorParserConfig that = (SensorParserConfig) o; + if (this == o) { + return true; + } - if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null) - return false; - if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null) - return false; - if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null) - return false; - if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null) - return false; - if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null) - return false; - if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null) - return false; - if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null) - return false; - if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null) + if (o == null || getClass() != o.getClass()) { return false; - if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null) - return false; - if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null) - return false; - if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null) - return false; - if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null) - return false; - if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null) - return false; - if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null) - return false; - if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null) - return false; - if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null) - return false; - if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null) - return false; - if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null) - return false; - if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null) - return false; - if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) - return false; - return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null; + } + SensorParserConfig that = (SensorParserConfig) o; + return new EqualsBuilder() + .append(parserClassName, that.parserClassName) + .append(filterClassName, that.filterClassName) + .append(sensorTopic, that.sensorTopic) + .append(outputTopic, that.outputTopic) + .append(errorTopic, that.errorTopic) + .append(writerClassName, that.writerClassName) + .append(errorWriterClassName, that.errorWriterClassName) + .append(readMetadata, that.readMetadata) + .append(mergeMetadata, that.mergeMetadata) + .append(numWorkers, that.numWorkers) + .append(numAckers, that.numAckers) + .append(spoutParallelism, that.spoutParallelism) + .append(spoutNumTasks, that.spoutNumTasks) + .append(parserParallelism, that.parserParallelism) + .append(parserNumTasks, that.parserNumTasks) + .append(errorWriterParallelism, that.errorWriterParallelism) + .append(errorWriterNumTasks, that.errorWriterNumTasks) + .append(spoutConfig, that.spoutConfig) + .append(securityProtocol, that.securityProtocol) + .append(stormConfig, that.stormConfig) + .append(cacheConfig, that.cacheConfig) + .append(parserConfig, that.parserConfig) + .append(fieldTransformations, that.fieldTransformations) + .isEquals(); } @Override public int hashCode() { - int result = getParserClassName() != null ? getParserClassName().hashCode() : 0; - result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0); - result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0); - result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0); - result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0); - result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0); - result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0); - result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0); - result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0); - result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0); - result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0); - result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0); - result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0); - result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0); - result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0); - result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0); - result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0); - result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0); - result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0); - result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); - result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); - return result; + return new HashCodeBuilder(17, 37) + .append(parserClassName) + .append(filterClassName) + .append(sensorTopic) + .append(outputTopic) + .append(errorTopic) + .append(writerClassName) + .append(errorWriterClassName) + .append(readMetadata) + .append(mergeMetadata) + .append(numWorkers) + .append(numAckers) + .append(spoutParallelism) + .append(spoutNumTasks) + .append(parserParallelism) + .append(parserNumTasks) + .append(errorWriterParallelism) + .append(errorWriterNumTasks) + .append(spoutConfig) + .append(securityProtocol) + .append(stormConfig) + .append(cacheConfig) + .append(parserConfig) + .append(fieldTransformations) + .toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("parserClassName", parserClassName) + .append("filterClassName", filterClassName) + .append("sensorTopic", sensorTopic) + .append("outputTopic", outputTopic) + .append("errorTopic", errorTopic) + .append("writerClassName", writerClassName) + .append("errorWriterClassName", errorWriterClassName) + .append("readMetadata", readMetadata) + .append("mergeMetadata", mergeMetadata) + .append("numWorkers", numWorkers) + .append("numAckers", numAckers) + .append("spoutParallelism", spoutParallelism) + .append("spoutNumTasks", spoutNumTasks) + .append("parserParallelism", parserParallelism) + .append("parserNumTasks", parserNumTasks) + .append("errorWriterParallelism", errorWriterParallelism) + .append("errorWriterNumTasks", errorWriterNumTasks) + .append("spoutConfig", spoutConfig) + .append("securityProtocol", securityProtocol) + .append("stormConfig", stormConfig) + .append("cacheConfig", cacheConfig) + .append("parserConfig", parserConfig) + .append("fieldTransformations", fieldTransformations) + .toString(); } } http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index 2865dd6..cd4ad50 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -84,7 +84,7 @@ public class ParserTopologyBuilder { * @param errorWriterNumTasksSupplier Supplier for the number of tasks for the bolt that handles errors * @param kafkaSpoutConfigSupplier Supplier for the configuration options for the kafka spout * @param securityProtocolSupplier Supplier for the security protocol - * @param outputTopic The output kafka topic + * @param outputTopicSupplier Supplier for the output kafka topic * @param stormConfigSupplier Supplier for the storm config * @return A Storm topology that parses telemetry data received from an external sensor * @throws Exception @@ -100,7 +100,8 @@ public class ParserTopologyBuilder { ValueSupplier<Integer> errorWriterNumTasksSupplier, ValueSupplier<Map> kafkaSpoutConfigSupplier, ValueSupplier<String> securityProtocolSupplier, - Optional<String> outputTopic, + ValueSupplier<String> outputTopicSupplier, + ValueSupplier<String> errorTopicSupplier, ValueSupplier<Config> stormConfigSupplier ) throws Exception { @@ -113,24 +114,27 @@ public class ParserTopologyBuilder { int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class); int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class); int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class); + String outputTopic = outputTopicSupplier.get(parserConfig, String.class); + Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class); Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class)); // create the spout TopologyBuilder builder = new TopologyBuilder(); - KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig); + KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig); builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism) .setNumTasks(spoutNumTasks); // create the parser bolt - ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic); + ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic)); builder.setBolt("parserBolt", parserBolt, parserParallelism) .setNumTasks(parserNumTasks) .localOrShuffleGrouping("kafkaSpout"); // create the error bolt, if needed if (errorWriterNumTasks > 0) { - WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig); + String errorTopic = errorTopicSupplier.get(parserConfig, String.class); + WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic); builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism) .setNumTasks(errorWriterNumTasks) .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM); @@ -176,24 +180,35 @@ public class ParserTopologyBuilder { ); } - private static KafkaWriter createKafkaWriter( Optional<String> broker - , String zkQuorum - , Optional<String> securityProtocol - ) - { - KafkaWriter ret = null; + /** + * Create a Kafka writer. + * + * @param broker An optional URL to the Kafka brokers. + * @param zkQuorum The URL to Zookeeper. + * @param securityProtocol An optional security protocol in use. + * @return + */ + private static KafkaWriter createKafkaWriter(Optional<String> broker, + String zkQuorum, + Optional<String> securityProtocol) { + KafkaWriter writer = new KafkaWriter(); + + // cluster URL; either broker or zookeeper if(broker.isPresent()) { - ret = new KafkaWriter(broker.get()); - } - else { - ret = new KafkaWriter().withZkQuorum(zkQuorum); + writer.withBrokerUrl(broker.get()); + + } else { + writer.withZkQuorum(zkQuorum); } + + // security protocol if(securityProtocol.isPresent()) { HashMap<String, Object> config = new HashMap<>(); config.put("security.protocol", securityProtocol.get()); - ret.withProducerConfigs(config); + writer.withProducerConfigs(config); } - return ret; + + return writer; } /** @@ -206,27 +221,31 @@ public class ParserTopologyBuilder { * @param parserConfig * @return A Storm bolt that parses input from a sensor */ - private static ParserBolt createParserBolt( String zookeeperUrl - , Optional<String> brokerUrl - , String sensorType - , Optional<String> securityProtocol - , ParserConfigurations configs - , SensorParserConfig parserConfig - , Optional<String> outputTopic - ) - { + private static ParserBolt createParserBolt( String zookeeperUrl, + Optional<String> brokerUrl, + String sensorType, + Optional<String> securityProtocol, + ParserConfigurations configs, + SensorParserConfig parserConfig, + Optional<String> outputTopic) { // create message parser MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName()); parser.configure(parserConfig.getParserConfig()); - // create writer - if not configured uses a sensible default - AbstractWriter writer = parserConfig.getWriterClassName() == null ? - createKafkaWriter( brokerUrl - , zookeeperUrl - , securityProtocol - ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) : - ReflectionUtils.createInstance(parserConfig.getWriterClassName()); + // create a writer + AbstractWriter writer; + if(parserConfig.getWriterClassName() == null) { + + // if not configured, use a sensible default + writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol) + .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)); + + } else { + writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName()); + } + + // configure it writer.configure(sensorType, new ParserWriterConfiguration(configs)); // create a writer handler @@ -238,37 +257,47 @@ public class ParserTopologyBuilder { /** * Create a bolt that handles error messages. * - * @param zookeeperUrl Kafka zookeeper URL - * @param brokerUrl Kafka Broker URL - * @param sensorType Type of sensor that is being consumed. - * @param securityProtocol Security protocol used (if any) + * @param zookeeperUrl Kafka zookeeper URL + * @param brokerUrl Kafka Broker URL + * @param sensorType Type of sensor that is being consumed. + * @param securityProtocol Security protocol used (if any) * @param configs - * @param parserConfig + * @param parserConfig The sensor's parser configuration. * @return A Storm bolt that handles error messages. */ - private static WriterBolt createErrorBolt( String zookeeperUrl - , Optional<String> brokerUrl - , String sensorType - , Optional<String> securityProtocol - , ParserConfigurations configs - , SensorParserConfig parserConfig - ) - { + private static WriterBolt createErrorBolt( String zookeeperUrl, + Optional<String> brokerUrl, + String sensorType, + Optional<String> securityProtocol, + ParserConfigurations configs, + SensorParserConfig parserConfig, + String errorTopic) { + + // create a writer + AbstractWriter writer; + if (parserConfig.getErrorWriterClassName() == null) { + + if(errorTopic == null) { + errorTopic = (String) configs.getGlobalConfig().get(Constants.PARSER_ERROR_TOPIC_GLOBALS_KEY); + } + + // if not configured, uses a sensible default + writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol) + .withTopic(errorTopic) + .withConfigPrefix("error"); + + } else { + writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName()); + } - // create writer - if not configured uses a sensible default - AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ? - createKafkaWriter( brokerUrl - , zookeeperUrl - , securityProtocol - ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")) - .withConfigPrefix("error") - : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); + // configure it writer.configure(sensorType, new ParserWriterConfiguration(configs)); // create a writer handler WriterHandler writerHandler = createWriterHandler(writer); - return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR); + return new WriterBolt(writerHandler, configs, sensorType) + .withErrorType(Constants.ErrorType.PARSER_ERROR); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 3824212..f60ff44 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -310,34 +310,40 @@ public class ParserTopologyCLI { String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); /* - It bears mentioning why we're creating this ValueSupplier indirection here. - As a separation of responsibilities, the CLI class defines the order of precedence - for the various topological and structural properties for creating a parser. This is - desirable because there are now (i.e. integration tests) - and may be in the future (i.e. a REST service to start parsers without using the CLI) - other mechanisms to construct parser topologies. It's sensible to split those concerns.. - - Unfortunately, determining the structural parameters for a parser requires interacting with - external services (e.g. zookeeper) that are set up well within the ParserTopology class. - Rather than pulling the infrastructure to interact with those services out and moving it into the - CLI class and breaking that separation of concerns, we've created a supplier - indirection where are providing the logic as to how to create precedence in the CLI class - without owning the responsibility of constructing the infrastructure where the values are - necessarily supplied. - + * It bears mentioning why we're creating this ValueSupplier indirection here. + * As a separation of responsibilities, the CLI class defines the order of precedence + * for the various topological and structural properties for creating a parser. This is + * desirable because there are now (i.e. integration tests) + * and may be in the future (i.e. a REST service to start parsers without using the CLI) + * other mechanisms to construct parser topologies. It's sensible to split those concerns.. + * + * Unfortunately, determining the structural parameters for a parser requires interacting with + * external services (e.g. zookeeper) that are set up well within the ParserTopology class. + * Rather than pulling the infrastructure to interact with those services out and moving it into the + * CLI class and breaking that separation of concerns, we've created a supplier + * indirection where are providing the logic as to how to create precedence in the CLI class + * without owning the responsibility of constructing the infrastructure where the values are + * necessarily supplied. + * */ + + // kafka spout parallelism ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> { if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) { return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); } return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1); }; + + // kafka spout number of tasks ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> { if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) { return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); } return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1); }; + + // parser bolt parallelism ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> { if(ParserOptions.PARSER_PARALLELISM.has(cmd)) { return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); @@ -345,6 +351,7 @@ public class ParserTopologyCLI { return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1); }; + // parser bolt number of tasks ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> { if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) { return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); @@ -352,6 +359,7 @@ public class ParserTopologyCLI { return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1); }; + // error bolt parallelism ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> { if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) { return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); @@ -359,6 +367,7 @@ public class ParserTopologyCLI { return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1); }; + // error bolt number of tasks ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> { if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) { return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); @@ -366,6 +375,7 @@ public class ParserTopologyCLI { return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1); }; + // kafka spout config ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> { if(ParserOptions.SPOUT_CONFIG.has(cmd)) { return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); @@ -373,6 +383,7 @@ public class ParserTopologyCLI { return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>()); }; + // security protocol ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> { Optional<String> sp = Optional.empty(); if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) { @@ -384,6 +395,7 @@ public class ParserTopologyCLI { return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null)); }; + // storm configuration ValueSupplier<Config> stormConf = (parserConfig, clazz) -> { Map<String, Object> c = parserConfig.getStormConfig(); Config finalConfig = new Config(); @@ -399,39 +411,84 @@ public class ParserTopologyCLI { return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig); }; - Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); + // output topic + ValueSupplier<String> outputTopic = (parserConfig, clazz) -> { + String topic; + + if(ParserOptions.OUTPUT_TOPIC.has(cmd)) { + topic = ParserOptions.OUTPUT_TOPIC.get(cmd); + + } else if(parserConfig.getOutputTopic() != null) { + topic = parserConfig.getOutputTopic(); + + } else { + topic = Constants.ENRICHMENT_TOPIC; + } + + return topic; + }; + + // error topic + ValueSupplier<String> errorTopic = (parserConfig, clazz) -> { + String topic; + + if(parserConfig.getErrorTopic() != null) { + topic = parserConfig.getErrorTopic(); + + } else { + // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created + topic = null; + } + + return topic; + }; - return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic); + return getParserTopology( + zookeeperUrl, + brokerUrl, + sensorType, + spoutParallelism, + spoutNumTasks, + parserParallelism, + parserNumTasks, + errorParallelism, + errorNumTasks, + spoutConfig, + securityProtocol, + stormConf, + outputTopic, + errorTopic); } - protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl - , Optional<String> brokerUrl - , String sensorType - , ValueSupplier<Integer> spoutParallelism - , ValueSupplier<Integer> spoutNumTasks - , ValueSupplier<Integer> parserParallelism - , ValueSupplier<Integer> parserNumTasks - , ValueSupplier<Integer> errorParallelism - , ValueSupplier<Integer> errorNumTasks - , ValueSupplier<Map> spoutConfig - , ValueSupplier<String> securityProtocol - , ValueSupplier<Config> stormConf - , Optional<String> outputTopic - ) throws Exception - { - return ParserTopologyBuilder.build(zookeeperUrl, - brokerUrl, - sensorType, - spoutParallelism, - spoutNumTasks, - parserParallelism, - parserNumTasks, - errorParallelism, - errorNumTasks, - spoutConfig, - securityProtocol, - outputTopic, - stormConf + protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl, + Optional<String> brokerUrl, + String sensorType, + ValueSupplier<Integer> spoutParallelism, + ValueSupplier<Integer> spoutNumTasks, + ValueSupplier<Integer> parserParallelism, + ValueSupplier<Integer> parserNumTasks, + ValueSupplier<Integer> errorParallelism, + ValueSupplier<Integer> errorNumTasks, + ValueSupplier<Map> spoutConfig, + ValueSupplier<String> securityProtocol, + ValueSupplier<Config> stormConf, + ValueSupplier<String> outputTopic, + ValueSupplier<String> errorTopic) throws Exception { + return ParserTopologyBuilder.build( + zookeeperUrl, + brokerUrl, + sensorType, + spoutParallelism, + spoutNumTasks, + parserParallelism, + parserNumTasks, + errorParallelism, + errorNumTasks, + spoutConfig, + securityProtocol, + outputTopic, + errorTopic, + stormConf ); } http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 63d9e52..7f40684 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -17,14 +17,6 @@ */ package org.apache.metron.parsers.integration.components; -import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; -import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; - -import java.lang.invoke.MethodHandles; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.ZKServerComponent; @@ -32,24 +24,37 @@ import org.apache.metron.parsers.topology.ParserTopologyBuilder; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.KillOptions; -import org.apache.storm.topology.TopologyBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; +import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; + public class ParserTopologyComponent implements InMemoryComponent { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Properties topologyProperties; private String brokerUrl; private String sensorType; private LocalCluster stormCluster; private String outputTopic; + private String errorTopic; public static class Builder { + Properties topologyProperties; String brokerUrl; String sensorType; String outputTopic; + String errorTopic; + public Builder withTopologyProperties(Properties topologyProperties) { this.topologyProperties = topologyProperties; return this; @@ -68,16 +73,31 @@ public class ParserTopologyComponent implements InMemoryComponent { return this; } + public Builder withErrorTopic(String topic) { + this.errorTopic = topic; + return this; + } + public ParserTopologyComponent build() { - return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic); + + if(sensorType == null) { + throw new IllegalArgumentException("The sensor type must be defined."); + } + + if(outputTopic == null) { + throw new IllegalArgumentException("The output topic must be defined."); + } + + return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic); } } - public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) { + public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) { this.topologyProperties = topologyProperties; this.brokerUrl = brokerUrl; this.sensorType = sensorType; this.outputTopic = outputTopic; + this.errorTopic = errorTopic; } public void updateSensorType(String sensorType) { @@ -89,24 +109,26 @@ public class ParserTopologyComponent implements InMemoryComponent { try { final Map<String, Object> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_DEBUG, true); - ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY) - , Optional.ofNullable(brokerUrl) - , sensorType - , (x,y) -> 1 - , (x,y) -> 1 - , (x,y) -> 1 - , (x,y) -> 1 - , (x,y) -> 1 - , (x,y) -> 1 - , (x,y) -> new HashMap<>() - , (x,y) -> null - , Optional.ofNullable(outputTopic) - , (x,y) -> { - Config c = new Config(); - c.putAll(stormConf); - return c; - } - ); + ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build ( + topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY), + Optional.ofNullable(brokerUrl), + sensorType, + (x,y) -> 1, + (x,y) -> 1, + (x,y) -> 1, + (x,y) -> 1, + (x,y) -> 1, + (x,y) -> 1, + (x,y) -> new HashMap<>(), + (x,y) -> null, + (x,y) -> outputTopic, + (x,y) -> errorTopic, + (x,y) -> { + Config c = new Config(); + c.putAll(stormConf); + return c; + } + ); stormCluster = new LocalCluster(); stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology()); http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index 97dac5a..fcfc93b 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -217,17 +217,21 @@ public class ParserTopologyCLITest { private Map<String, Object> spoutConfig; private String securityProtocol; private Config stormConf; - - public ParserInput( ValueSupplier<Integer> spoutParallelism - , ValueSupplier<Integer> spoutNumTasks - , ValueSupplier<Integer> parserParallelism - , ValueSupplier<Integer> parserNumTasks - , ValueSupplier<Integer> errorParallelism - , ValueSupplier<Integer> errorNumTasks - , ValueSupplier<Map> spoutConfig - , ValueSupplier<String> securityProtocol - , ValueSupplier<Config> stormConf - , SensorParserConfig config + private String outputTopic; + private String errorTopic; + + public ParserInput(ValueSupplier<Integer> spoutParallelism, + ValueSupplier<Integer> spoutNumTasks, + ValueSupplier<Integer> parserParallelism, + ValueSupplier<Integer> parserNumTasks, + ValueSupplier<Integer> errorParallelism, + ValueSupplier<Integer> errorNumTasks, + ValueSupplier<Map> spoutConfig, + ValueSupplier<String> securityProtocol, + ValueSupplier<Config> stormConf, + ValueSupplier<String> outputTopic, + ValueSupplier<String> errorTopic, + SensorParserConfig config ) { this.spoutParallelism = spoutParallelism.get(config, Integer.class); @@ -239,6 +243,8 @@ public class ParserTopologyCLITest { this.spoutConfig = spoutConfig.get(config, Map.class); this.securityProtocol = securityProtocol.get(config, String.class); this.stormConf = stormConf.get(config, Config.class); + this.outputTopic = outputTopic.get(config, String.class); + this.errorTopic = outputTopic.get(config, String.class); } public Integer getSpoutParallelism() { @@ -276,30 +282,43 @@ public class ParserTopologyCLITest { public Config getStormConf() { return stormConf; } + + public String getOutputTopic() { + return outputTopic; + } + + public String getErrorTopic() { + return errorTopic; + } } + /** -{ - "parserClassName": "org.apache.metron.parsers.GrokParser", - "sensorTopic": "squid", - "parserConfig": { - "grokPath": "/patterns/squid", - "patternLabel": "SQUID_DELIMITED", - "timestampField": "timestamp" - }, - "fieldTransformations" : [ - { - "transformation" : "STELLAR" - ,"output" : [ "full_hostname", "domain_without_subdomains" ] - ,"config" : { - "full_hostname" : "URL_TO_HOST(url)" - ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)" - } - } - ] -} + * { + * "parserClassName": "org.apache.metron.parsers.GrokParser", + * "sensorTopic": "squid", + * "parserConfig": { + * "grokPath": "/patterns/squid", + * "patternLabel": "SQUID_DELIMITED", + * "timestampField": "timestamp" + * }, + * "fieldTransformations" : [ + * { + * "transformation" : "STELLAR", + * "output" : [ + * "full_hostname", + * "domain_without_subdomains" + * ], + * "config" : { + * "full_hostname" : "URL_TO_HOST(url)", + * "domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)" + * } + * } + * ] + * } */ @Multiline public static String baseConfig; + private static SensorParserConfig getBaseConfig() { try { return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class); @@ -600,18 +619,37 @@ public class ParserTopologyCLITest { final ParserInput[] parserInput = new ParserInput[]{null}; new ParserTopologyCLI() { @Override - protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception { - parserInput[0] = new ParserInput( spoutParallelism - , spoutNumTasks - , parserParallelism - , parserNumTasks - , errorParallelism - , errorNumTasks - , spoutConfig - , securityProtocol - , stormConf - , config - ); + protected ParserTopologyBuilder.ParserTopology getParserTopology( + String zookeeperUrl, + Optional<String> brokerUrl, + String sensorType, + ValueSupplier<Integer> spoutParallelism, + ValueSupplier<Integer> spoutNumTasks, + ValueSupplier<Integer> parserParallelism, + ValueSupplier<Integer> parserNumTasks, + ValueSupplier<Integer> errorParallelism, + ValueSupplier<Integer> errorNumTasks, + ValueSupplier<Map> spoutConfig, + ValueSupplier<String> securityProtocol, + ValueSupplier<Config> stormConf, + ValueSupplier<String> outputTopic, + ValueSupplier<String> errorTopic) throws Exception { + + parserInput[0] = new ParserInput( + spoutParallelism, + spoutNumTasks, + parserParallelism, + parserNumTasks, + errorParallelism, + errorNumTasks, + spoutConfig, + securityProtocol, + stormConf, + outputTopic, + errorTopic, + config + ); + return null; } }.createParserTopology(cmd); http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java index 4f513be..49d7521 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java @@ -30,9 +30,14 @@ import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.integration.components.ConfigUploadComponent; import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.integration.*; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.integration.BaseIntegrationTest; +import org.apache.metron.integration.ComponentRunner; +import org.apache.metron.integration.Processor; +import org.apache.metron.integration.ProcessorResult; +import org.apache.metron.integration.ReadinessState; +import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.parsers.integration.components.ParserTopologyComponent; @@ -40,41 +45,52 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest { /** - { - "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" - ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter" - ,"sensorTopic":"dummy" - ,"parserConfig": - { - "shew.table" : "dummy" - ,"shew.cf" : "cf" - ,"shew.keyColumns" : "col2" - ,"shew.enrichmentType" : "et" - ,"shew.hbaseProvider" : "org.apache.metron.hbase.mock.MockHBaseTableProvider" - ,"columns" : { - "col1" : 0 - ,"col2" : 1 - ,"col3" : 2 - } - } - } + * { + * "parserClassName": "org.apache.metron.parsers.csv.CSVParser", + * "writerClassName": "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter", + * "sensorTopic": "dummy", + * "outputTopic": "output", + * "errorTopic": "error", + * "parserConfig": { + * "shew.table": "dummy", + * "shew.cf": "cf", + * "shew.keyColumns": "col2", + * "shew.enrichmentType": "et", + * "shew.hbaseProvider": "org.apache.metron.hbase.mock.MockHBaseTableProvider", + * "columns" : { + * "col1": 0, + * "col2": 1, + * "col3": 2 + * } + * } + * } */ @Multiline - public static String parserConfig; + public static String parserConfigJSON; @Test public void test() throws UnableToStartException, IOException { final String sensorType = "dummy"; + + // the input messages to parse final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ add(Bytes.toBytes("col11,col12,col13")); add(Bytes.toBytes("col21,col22,col23")); add(Bytes.toBytes("col31,col32,col33")); }}; + + // setup external components; kafka, zookeeper MockHBaseTableProvider.addToCache(sensorType, "cf"); final Properties topologyProperties = new Properties(); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); @@ -83,17 +99,20 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT }}); topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); + SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class); + ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() .withTopologyProperties(topologyProperties) .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH) - .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class)); + .withParserSensorConfig(sensorType, parserConfig); ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder() .withSensorType(sensorType) .withTopologyProperties(topologyProperties) - .withBrokerUrl(kafkaComponent.getBrokerList()).build(); + .withBrokerUrl(kafkaComponent.getBrokerList()) + .withOutputTopic(parserConfig.getOutputTopic()) + .build(); - //UnitTestHelper.verboseLogging(); ComponentRunner runner = new ComponentRunner.Builder() .withComponent("zk", zkServerComponent) .withComponent("kafka", kafkaComponent) http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index 0cfaae3..cde08bc 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.util.*; public class WriterBoltIntegrationTest extends BaseIntegrationTest { - private static final String ERROR_TOPIC = "parser_error"; public static class MockValidator implements FieldValidation{ @@ -62,48 +61,53 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { } } /** - { - "fieldValidations" : [ - { - "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" - } - ], - "parser.error.topic":"parser_error" - } - */ + * { + * "fieldValidations" : [ + * { "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" } + * ] + * } + */ @Multiline public static String globalConfig; /** - { - "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" - ,"sensorTopic":"dummy" - ,"parserConfig": - { - "columns" : { - "action" : 0 - ,"dummy" : 1 - } - } - } + * { + * "parserClassName" : "org.apache.metron.parsers.csv.CSVParser", + * "sensorTopic": "dummy", + * "outputTopic": "output", + * "errorTopic": "parser_error", + * "parserConfig": { + * "columns" : { + * "action" : 0, + * "dummy" : 1 + * } + * } + * } */ @Multiline - public static String parserConfig; + public static String parserConfigJSON; @Test public void test() throws UnableToStartException, IOException, ParseException { + UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL); final String sensorType = "dummy"; + + SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class); + + // the input messages to parser final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ add(Bytes.toBytes("valid,foo")); add(Bytes.toBytes("invalid,foo")); add(Bytes.toBytes("error")); }}; + + // setup external components; zookeeper, kafka final Properties topologyProperties = new Properties(); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ add(new KafkaComponent.Topic(sensorType, 1)); - add(new KafkaComponent.Topic(ERROR_TOPIC, 1)); + add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1)); add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); }}); topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); @@ -111,14 +115,16 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() .withTopologyProperties(topologyProperties) .withGlobalConfig(globalConfig) - .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class)); + .withParserSensorConfig(sensorType, parserConfig); ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder() .withSensorType(sensorType) .withTopologyProperties(topologyProperties) - .withBrokerUrl(kafkaComponent.getBrokerList()).build(); + .withBrokerUrl(kafkaComponent.getBrokerList()) + .withErrorTopic(parserConfig.getErrorTopic()) + .withOutputTopic(parserConfig.getOutputTopic()) + .build(); - //UnitTestHelper.verboseLogging(); ComponentRunner runner = new ComponentRunner.Builder() .withComponent("zk", zkServerComponent) .withComponent("kafka", kafkaComponent) @@ -131,48 +137,42 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { try { runner.start(); kafkaComponent.writeMessages(sensorType, inputMessages); - ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor()); + ProcessorResult<Map<String, List<JSONObject>>> result = runner.process( + getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic())); + + // validate the output messages Map<String,List<JSONObject>> outputMessages = result.getResult(); Assert.assertEquals(2, outputMessages.size()); Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size()); Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action")); - Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size()); - JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0); + Assert.assertEquals(2, outputMessages.get(parserConfig.getErrorTopic()).size()); + + // validate an error message + JSONObject invalidMessage = outputMessages.get(parserConfig.getErrorTopic()).get(0); Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName())); JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class); Assert.assertEquals("foo", rawMessage.get("dummy")); Assert.assertEquals("invalid", rawMessage.get("action")); - JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1); + + // validate the next error message + JSONObject errorMessage = outputMessages.get(parserConfig.getErrorTopic()).get(1); Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName())); Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName())); - // It's unclear if we need a rawMessageBytes field so commenting out for now - //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes())); - } - finally { + + } finally { if(runner != null) { runner.stop(); } } } - private static byte[] listToBytes(Object o ){ - List<Byte> l = (List<Byte>)o; - byte[] ret = new byte[l.size()]; - int i = 0; - for(Number b : l) { - ret[i++] = b.byteValue(); - } - return ret; - } + private static List<JSONObject> loadMessages(List<byte[]> outputMessages) { List<JSONObject> tmp = new ArrayList<>(); - Iterables.addAll(tmp - , Iterables.transform(outputMessages - , message -> { + Iterables.addAll(tmp, + Iterables.transform(outputMessages, + message -> { try { - return new JSONObject(JSONUtils.INSTANCE.load(new String(message) - ,JSONUtils.MAP_SUPPLIER - ) - ); + return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER)); } catch (Exception ex) { throw new IllegalStateException(ex); } @@ -181,13 +181,14 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { ); return tmp; } + @SuppressWarnings("unchecked") - private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){ + private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){ return new KafkaProcessor<>() .withKafkaComponentName("kafka") - .withReadTopic(Constants.ENRICHMENT_TOPIC) - .withErrorTopic(ERROR_TOPIC) + .withReadTopic(outputTopic) + .withErrorTopic(errorTopic) .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { @Nullable @Override @@ -201,7 +202,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) { return new HashMap<String, List<JSONObject>>() {{ put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages())); - put(ERROR_TOPIC, loadMessages(messageSet.getErrors())); + put(errorTopic, loadMessages(messageSet.getErrors())); }}; } }); http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index f73e0f4..c4e3998 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -76,6 +76,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj this.brokerUrl = brokerUrl; } + public KafkaWriter withBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + return this; + } + public KafkaWriter withZkQuorum(String zkQuorum) { this.zkQuorum = zkQuorum; return this;