Repository: storm
Updated Branches:
  refs/heads/master 87fc2982e -> 285d3288c


Storm-Kafka trident topology example


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/287718c8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/287718c8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/287718c8

Branch: refs/heads/master
Commit: 287718c8bc0f8b51ca89de4f62fcb6710d525992
Parents: f72beb0
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Aug 4 14:06:45 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Aug 4 14:06:45 2015 +0530

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |   6 +
 .../starter/trident/TridentKafkaWordCount.java  | 208 +++++++++++++++++++
 2 files changed, 214 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/287718c8/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index db69d74..36f7b65 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -96,6 +96,12 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-kafka</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/287718c8/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
 
b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
new file mode 100644
index 0000000..11a9111
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
@@ -0,0 +1,208 @@
+package storm.starter.trident;
+
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.LocalDRPC;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import storm.kafka.StringScheme;
+import storm.kafka.ZkHosts;
+import storm.kafka.bolt.KafkaBolt;
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import storm.kafka.bolt.selector.DefaultTopicSelector;
+import storm.kafka.trident.TransactionalTridentKafkaSpout;
+import storm.kafka.trident.TridentKafkaConfig;
+import storm.starter.spout.RandomSentenceSpout;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.FilterNull;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.testing.MemoryMapState;
+import storm.trident.testing.Split;
+
+import java.util.Properties;
+
+/**
+ * A sample word count trident topology using transactional kafka spout that 
has the following components.
+ * <ol>
+ * <li> {@link KafkaBolt}
+ * that receives random sentences from {@link RandomSentenceSpout} and
+ * publishes the sentences to a kafka "test" topic.
+ * </li>
+ * <li> {@link TransactionalTridentKafkaSpout}
+ * that consumes sentences from the "test" topic, splits it into words, 
aggregates
+ * and stores the word count in a {@link MemoryMapState}.
+ * </li>
+ * <li> DRPC query
+ * that returns the word counts by querying the trident state (MemoryMapState).
+ * </li>
+ * </ol>
+ * <p>
+ *     For more background read the <a 
href="https://storm.apache.org/documentation/Trident-tutorial.html";>trident 
tutorial</a>,
+ *     <a href="https://storm.apache.org/documentation/Trident-state";>trident 
state</a> and
+ *     <a 
href="https://github.com/apache/storm/tree/master/external/storm-kafka";> Storm 
Kafka </a>.
+ * </p>
+ */
+public class TridentKafkaWordCount {
+
+    private String zkUrl;
+    private String brokerUrl;
+
+    TridentKafkaWordCount(String zkUrl, String brokerUrl) {
+        this.zkUrl = zkUrl;
+        this.brokerUrl = brokerUrl;
+    }
+
+    /**
+     * Creates a transactional kafka spout that consumes any new data 
published to "test" topic.
+     * <p/>
+     * For more info on transactional spouts
+     * see "Transactional spouts" section in
+     * <a href="https://storm.apache.org/documentation/Trident-state";> Trident 
state</a> doc.
+     *
+     * @return a transactional trident kafka spout.
+     */
+    private TransactionalTridentKafkaSpout createKafkaSpout() {
+        ZkHosts hosts = new ZkHosts(zkUrl);
+        TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+
+        // Consume new data from the topic
+        config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+        return new TransactionalTridentKafkaSpout(config);
+    }
+
+
+    private Stream addDRPCStream(TridentTopology tridentTopology, TridentState 
state, LocalDRPC drpc) {
+        return tridentTopology.newDRPCStream("words", drpc)
+                .each(new Fields("args"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .stateQuery(state, new Fields("word"), new MapGet(), new 
Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .project(new Fields("word", "count"));
+    }
+
+    private TridentState addTridentState(TridentTopology tridentTopology) {
+        return tridentTopology.newStream("spout1", 
createKafkaSpout()).parallelismHint(1)
+                .each(new Fields("str"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .persistentAggregate(new MemoryMapState.Factory(), new 
Count(), new Fields("count"))
+                .parallelismHint(1);
+    }
+
+    /**
+     * Creates a trident topology that consumes sentences from the kafka 
"test" topic using a
+     * {@link TransactionalTridentKafkaSpout} computes the word count and 
stores it in a {@link MemoryMapState}.
+     * A DRPC stream is then created to query the word counts.
+     * @param drpc
+     * @return
+     */
+    public StormTopology buildConsumerTopology(LocalDRPC drpc) {
+        TridentTopology tridentTopology = new TridentTopology();
+        addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc);
+        return tridentTopology.build();
+    }
+
+    /**
+     * Return the consumer topology config.
+     *
+     * @return the topology config
+     */
+    public Config getConsumerConfig() {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        //  conf.setDebug(true);
+        return conf;
+    }
+
+    /**
+     * A topology that produces random sentences using {@link 
RandomSentenceSpout} and
+     * publishes the sentences using a KafkaBolt to kafka "test" topic.
+     *
+     * @return the storm topology
+     */
+    public StormTopology buildProducerTopology() {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new RandomSentenceSpout(), 2);
+        /**
+         * The output field of the RandomSentenceSpout ("word") is provided as 
the boltMessageField
+         * so that this gets written out as the message in the kafka topic.
+         */
+        KafkaBolt bolt = new KafkaBolt()
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("key", "word"));
+        builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
+        return builder.createTopology();
+    }
+
+    /**
+     * Returns the storm config for the topology that publishes sentences to 
kafka "test" topic using a kafka bolt.
+     * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
+     *
+     * @return the topology config
+     */
+    public Config getProducerConfig() {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
+        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        return conf;
+    }
+
+    /**
+     * <p>
+     * To run this topology ensure you have a kafka broker running.
+     * </p>
+     * Create a topic test with command line,
+     * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
+     */
+    public static void main(String[] args) throws Exception {
+
+        String zkUrl = "localhost:2181";        // the defaults.
+        String brokerUrl = "localhost:9092";
+
+        if (args.length > 2 || (args.length == 1 && 
args[0].matches("^-h|--help$"))) {
+            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper 
url] [kafka broker url]");
+            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" 
+ " [" + brokerUrl + "]");
+            System.exit(1);
+        } else if (args.length == 1) {
+            zkUrl = args[0];
+        } else if (args.length == 2) {
+            zkUrl = args[0];
+            brokerUrl = args[1];
+        }
+
+        System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker 
url: " + brokerUrl);
+
+        TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, 
brokerUrl);
+
+        LocalDRPC drpc = new LocalDRPC();
+        LocalCluster cluster = new LocalCluster();
+
+        // submit the consumer topology.
+        cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), 
wordCount.buildConsumerTopology(drpc));
+
+        // submit the producer topology.
+        cluster.submitTopology("kafkaBolt", wordCount.getProducerConfig(), 
wordCount.buildProducerTopology());
+
+        // keep querying the word counts for a minute.
+        for (int i = 0; i < 60; i++) {
+            System.out.println("DRPC RESULT: " + drpc.execute("words", "the 
and apple snow jumped"));
+            Thread.sleep(1000);
+        }
+
+        cluster.killTopology("kafkaBolt");
+        cluster.killTopology("wordCounter");
+        cluster.shutdown();
+    }
+}
\ No newline at end of file

Reply via email to