Repository: storm Updated Branches: refs/heads/master 87fc2982e -> 285d3288c
Storm-Kafka trident topology example Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/287718c8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/287718c8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/287718c8 Branch: refs/heads/master Commit: 287718c8bc0f8b51ca89de4f62fcb6710d525992 Parents: f72beb0 Author: Arun Mahadevan <ai...@hortonworks.com> Authored: Tue Aug 4 14:06:45 2015 +0530 Committer: Arun Mahadevan <ai...@hortonworks.com> Committed: Tue Aug 4 14:06:45 2015 +0530 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 6 + .../starter/trident/TridentKafkaWordCount.java | 208 +++++++++++++++++++ 2 files changed, 214 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/287718c8/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index db69d74..36f7b65 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -96,6 +96,12 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/287718c8/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java new file mode 100644 index 0000000..11a9111 --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java @@ -0,0 +1,208 @@ +package storm.starter.trident; + + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.LocalDRPC; +import backtype.storm.generated.StormTopology; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.kafka.clients.producer.ProducerConfig; +import storm.kafka.StringScheme; +import storm.kafka.ZkHosts; +import storm.kafka.bolt.KafkaBolt; +import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import storm.kafka.bolt.selector.DefaultTopicSelector; +import storm.kafka.trident.TransactionalTridentKafkaSpout; +import storm.kafka.trident.TridentKafkaConfig; +import storm.starter.spout.RandomSentenceSpout; +import storm.trident.Stream; +import storm.trident.TridentState; +import storm.trident.TridentTopology; +import storm.trident.operation.builtin.Count; +import storm.trident.operation.builtin.FilterNull; +import storm.trident.operation.builtin.MapGet; +import storm.trident.testing.MemoryMapState; +import storm.trident.testing.Split; + +import java.util.Properties; + +/** + * A sample word count trident topology using transactional kafka spout that has the following components. + * <ol> + * <li> {@link KafkaBolt} + * that receives random sentences from {@link RandomSentenceSpout} and + * publishes the sentences to a kafka "test" topic. + * </li> + * <li> {@link TransactionalTridentKafkaSpout} + * that consumes sentences from the "test" topic, splits it into words, aggregates + * and stores the word count in a {@link MemoryMapState}. + * </li> + * <li> DRPC query + * that returns the word counts by querying the trident state (MemoryMapState). + * </li> + * </ol> + * <p> + * For more background read the <a href="https://storm.apache.org/documentation/Trident-tutorial.html">trident tutorial</a>, + * <a href="https://storm.apache.org/documentation/Trident-state">trident state</a> and + * <a href="https://github.com/apache/storm/tree/master/external/storm-kafka"> Storm Kafka </a>. + * </p> + */ +public class TridentKafkaWordCount { + + private String zkUrl; + private String brokerUrl; + + TridentKafkaWordCount(String zkUrl, String brokerUrl) { + this.zkUrl = zkUrl; + this.brokerUrl = brokerUrl; + } + + /** + * Creates a transactional kafka spout that consumes any new data published to "test" topic. + * <p/> + * For more info on transactional spouts + * see "Transactional spouts" section in + * <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc. + * + * @return a transactional trident kafka spout. + */ + private TransactionalTridentKafkaSpout createKafkaSpout() { + ZkHosts hosts = new ZkHosts(zkUrl); + TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test"); + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + + // Consume new data from the topic + config.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); + return new TransactionalTridentKafkaSpout(config); + } + + + private Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) { + return tridentTopology.newDRPCStream("words", drpc) + .each(new Fields("args"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count")) + .each(new Fields("count"), new FilterNull()) + .project(new Fields("word", "count")); + } + + private TridentState addTridentState(TridentTopology tridentTopology) { + return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1) + .each(new Fields("str"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) + .parallelismHint(1); + } + + /** + * Creates a trident topology that consumes sentences from the kafka "test" topic using a + * {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}. + * A DRPC stream is then created to query the word counts. + * @param drpc + * @return + */ + public StormTopology buildConsumerTopology(LocalDRPC drpc) { + TridentTopology tridentTopology = new TridentTopology(); + addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc); + return tridentTopology.build(); + } + + /** + * Return the consumer topology config. + * + * @return the topology config + */ + public Config getConsumerConfig() { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + // conf.setDebug(true); + return conf; + } + + /** + * A topology that produces random sentences using {@link RandomSentenceSpout} and + * publishes the sentences using a KafkaBolt to kafka "test" topic. + * + * @return the storm topology + */ + public StormTopology buildProducerTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new RandomSentenceSpout(), 2); + /** + * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField + * so that this gets written out as the message in the kafka topic. + */ + KafkaBolt bolt = new KafkaBolt() + .withTopicSelector(new DefaultTopicSelector("test")) + .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word")); + builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout"); + return builder.createTopology(); + } + + /** + * Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt. + * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt. + * + * @return the topology config + */ + public Config getProducerConfig() { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer"); + conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); + return conf; + } + + /** + * <p> + * To run this topology ensure you have a kafka broker running. + * </p> + * Create a topic test with command line, + * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test + */ + public static void main(String[] args) throws Exception { + + String zkUrl = "localhost:2181"; // the defaults. + String brokerUrl = "localhost:9092"; + + if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) { + System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]"); + System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]"); + System.exit(1); + } else if (args.length == 1) { + zkUrl = args[0]; + } else if (args.length == 2) { + zkUrl = args[0]; + brokerUrl = args[1]; + } + + System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl); + + TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl); + + LocalDRPC drpc = new LocalDRPC(); + LocalCluster cluster = new LocalCluster(); + + // submit the consumer topology. + cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); + + // submit the producer topology. + cluster.submitTopology("kafkaBolt", wordCount.getProducerConfig(), wordCount.buildProducerTopology()); + + // keep querying the word counts for a minute. + for (int i = 0; i < 60; i++) { + System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); + Thread.sleep(1000); + } + + cluster.killTopology("kafkaBolt"); + cluster.killTopology("wordCounter"); + cluster.shutdown(); + } +} \ No newline at end of file