METRON-954: Create ability to change output topic of parsers from the CLI closes apache/incubator-metron#588
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c1c21211 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c1c21211 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c1c21211 Branch: refs/heads/Metron_0.4.0 Commit: c1c212117d6123f2897a606fedbb2583fb3bb9c3 Parents: 6c836d1 Author: cstella <ceste...@gmail.com> Authored: Tue May 16 15:24:17 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Tue May 16 15:24:17 2017 -0400 ---------------------------------------------------------------------- .../parsers/topology/ParserTopologyBuilder.java | 8 +++++--- .../metron/parsers/topology/ParserTopologyCLI.java | 17 ++++++++++++++++- .../parsers/integration/ParserIntegrationTest.java | 1 + .../components/ParserTopologyComponent.java | 13 +++++++++++-- .../parsers/topology/ParserTopologyCLITest.java | 17 +++++++++++++++++ 5 files changed, 50 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index 196c19d..0c88573 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -74,7 +74,8 @@ public class ParserTopologyBuilder { int errorWriterParallelism, int errorWriterNumTasks, Map<String, Object> kafkaSpoutConfig, - Optional<String> securityProtocol + Optional<String> securityProtocol, + Optional<String> outputTopic ) throws Exception { // fetch configuration from zookeeper @@ -88,7 +89,7 @@ public class ParserTopologyBuilder { .setNumTasks(spoutNumTasks); // create the parser bolt - ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig); + ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic); builder.setBolt("parserBolt", parserBolt, parserParallelism) .setNumTasks(parserNumTasks) .shuffleGrouping("kafkaSpout"); @@ -170,6 +171,7 @@ public class ParserTopologyBuilder { , Optional<String> securityProtocol , ParserConfigurations configs , SensorParserConfig parserConfig + , Optional<String> outputTopic ) { @@ -182,7 +184,7 @@ public class ParserTopologyBuilder { createKafkaWriter( brokerUrl , zookeeperUrl , securityProtocol - ).withTopic(Constants.ENRICHMENT_TOPIC) : + ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); writer.configure(sensorType, new ParserWriterConfiguration(configs)); http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 7523333..c68e101 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.topology; +import org.apache.metron.common.Constants; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; @@ -188,6 +189,18 @@ public class ParserTopologyCLI { return o; } ) + ,OUTPUT_TOPIC("ot", code -> { + Option o = new Option(code + , "output_topic" + , true + , "The output kafka topic for the parser. If unset, the default is " + Constants.ENRICHMENT_TOPIC + ); + o.setArgName("KAFKA_TOPIC"); + o.setRequired(false); + o.setType(String.class); + return o; + } + ) ,TEST("t", code -> { Option o = new Option("t", "test", true, "Run in Test Mode"); @@ -296,6 +309,7 @@ public class ParserTopologyCLI { if(ParserOptions.SPOUT_CONFIG.has(cmd)) { spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); } + Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty(); securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig); TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, @@ -308,7 +322,8 @@ public class ParserTopologyCLI { errorParallelism, errorNumTasks, spoutConfig, - securityProtocol + securityProtocol, + outputTopic ); Config stormConf = ParserOptions.getConfig(cmd); if (ParserOptions.TEST.has(cmd)) { http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java index defd815..fe6475d 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java @@ -62,6 +62,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest { ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder() .withSensorType(sensorType) .withTopologyProperties(topologyProperties) + .withOutputTopic(Constants.ENRICHMENT_TOPIC) .withBrokerUrl(kafkaComponent.getBrokerList()).build(); //UnitTestHelper.verboseLogging(); http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index b6a76d0..6ad7427 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -45,11 +45,13 @@ public class ParserTopologyComponent implements InMemoryComponent { private String brokerUrl; private String sensorType; private LocalCluster stormCluster; + private String outputTopic; public static class Builder { Properties topologyProperties; String brokerUrl; String sensorType; + String outputTopic; public Builder withTopologyProperties(Properties topologyProperties) { this.topologyProperties = topologyProperties; return this; @@ -63,15 +65,21 @@ public class ParserTopologyComponent implements InMemoryComponent { return this; } + public Builder withOutputTopic(String topic) { + this.outputTopic = topic; + return this; + } + public ParserTopologyComponent build() { - return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType); + return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic); } } - public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType) { + public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) { this.topologyProperties = topologyProperties; this.brokerUrl = brokerUrl; this.sensorType = sensorType; + this.outputTopic = outputTopic; } @@ -89,6 +97,7 @@ public class ParserTopologyComponent implements InMemoryComponent { , 1 , null , Optional.empty() + , Optional.ofNullable(outputTopic) ); Map<String, Object> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_DEBUG, true); http://git-wip-us.apache.org/repos/asf/metron/blob/c1c21211/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index ac73a2b..5f536a5 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -116,6 +116,7 @@ public class ParserTopologyCLITest { .build(true); UnitTestHelper.setLog4jLevel(Parser.class, Level.ERROR); } + public void happyPath(boolean longOpt) throws ParseException { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") @@ -147,6 +148,22 @@ public class ParserTopologyCLITest { Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); } + + @Test + public void testOutputTopic() throws Exception { + testOutputTopic(true); + testOutputTopic(false); + } + + public void testOutputTopic(boolean longOpt) throws ParseException { + CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") + .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic") + .build(longOpt); + Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli)); + } + /** { "string" : "foo"