add HBase example
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a1db96f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a1db96f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a1db96f Branch: refs/heads/master Commit: 4a1db96fc38fb438c3d4433381e719ca16d63bc8 Parents: a791604 Author: P. Taylor Goetz <[email protected]> Authored: Tue Apr 7 23:23:05 2015 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Tue Apr 7 23:23:05 2015 -0400 ---------------------------------------------------------------------- flux-core/pom.xml | 6 + .../java/org/apache/storm/flux/TCKTest.java | 10 ++ .../test/resources/configs/simple_hbase.yaml | 120 +++++++++++++++++++ flux-examples/pom.xml | 5 + .../storm/flux/examples/WordCountClient.java | 63 ++++++++++ .../apache/storm/flux/examples/WordCounter.java | 71 +++++++++++ flux-examples/src/main/resources/hbase-site.xml | 36 ++++++ .../src/main/resources/simple_hbase.yaml | 91 ++++++++++++++ 8 files changed, 402 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/pom.xml ---------------------------------------------------------------------- diff --git a/flux-core/pom.xml b/flux-core/pom.xml index 0d72ead..fe2e301 100644 --- a/flux-core/pom.xml +++ b/flux-core/pom.xml @@ -50,6 +50,12 @@ <version>${storm.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + <version>0.11.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java ---------------------------------------------------------------------- diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index 6580ef7..27abfbe 100644 --- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -82,6 +82,16 @@ public class TCKTest { } @Test + public void testHbase() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + topology.validate(); + } + + @Test public void testIncludes() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false); Config conf = FluxBuilder.buildConfig(topologyDef); http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-core/src/test/resources/configs/simple_hbase.yaml ---------------------------------------------------------------------- diff --git a/flux-core/src/test/resources/configs/simple_hbase.yaml b/flux-core/src/test/resources/configs/simple_hbase.yaml new file mode 100644 index 0000000..e407bd9 --- /dev/null +++ b/flux-core/src/test/resources/configs/simple_hbase.yaml @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test ability to wire together shell spouts/bolts +--- + +# topology definition +# name to be used when submitting +name: "hbase-wordcount" + +# Components +# Components are analagous to Spring beans. They are meant to be used as constructor, +# property(setter), and builder arguments. +# +# for the time being, components must be declared in the order they are referenced + +# WordSpout spout = new WordSpout(); +# WordCounter bolt = new WordCounter(); +# +# SimpleHBaseMapper mapper = new SimpleHBaseMapper() +# .withRowKeyField("word") +# .withColumnFields(new Fields("word")) +# .withCounterFields(new Fields("count")) +# .withColumnFamily("cf"); +# +# HBaseBolt hbase = new HBaseBolt("WordCount", mapper) +# .withConfigKey("hbase.conf"); +# +# +# // wordSpout ==> countBolt ==> HBaseBolt +# TopologyBuilder builder = new TopologyBuilder(); +# +# builder.setSpout(WORD_SPOUT, spout, 1); +# builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); +# builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + + + + +components: + - id: "columnFields" + className: "backtype.storm.tuple.Fields" + constructorArgs: + - ["word"] + + - id: "counterFields" + className: "backtype.storm.tuple.Fields" + constructorArgs: + - ["count"] + + - id: "mapper" + className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper" + configMethods: + - name: "withRowKeyField" + args: ["word"] + - name: "withColumnFields" + args: [ref: "columnFields"] + - name: "withCounterFields" + args: [ref: "counterFields"] + - name: "withColumnFamily" + args: ["cf"] + +# topology configuration +# this will be passed to the submitter as a map of config options +# +config: + topology.workers: 1 + hbase.conf: + hbase.rootdir: "hdfs://hadoop:54310/hbase" + hbase.zookeeper.quorum: "hadoop" + +# spout definitions +spouts: + - id: "word-spout" + className: "backtype.storm.testing.TestWordSpout" + parallelism: 1 + +# bolt definitions + +bolts: + - id: "count-bolt" + className: "backtype.storm.testing.TestWordCounter" + + - id: "hbase-bolt" + className: "org.apache.storm.hbase.bolt.HBaseBolt" + constructorArgs: + - "WordCount" # HBase table name + - ref: "mapper" + configMethods: + - name: "withConfigKey" + args: ["hbase.conf"] + parallelism: 1 + + +streams: + - name: "" # name isn't used (placeholder for logging, UI, etc.) + from: "word-spout" + to: "count-bolt" + grouping: + type: SHUFFLE + + - name: "" # name isn't used (placeholder for logging, UI, etc.) + from: "count-bolt" + to: "hbase-bolt" + grouping: + type: FIELDS + args: ["word"] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flux-examples/pom.xml b/flux-examples/pom.xml index 09db717..2321074 100644 --- a/flux-examples/pom.xml +++ b/flux-examples/pom.xml @@ -50,6 +50,11 @@ <artifactId>storm-hdfs</artifactId> <version>${storm.version}</version> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + <version>${storm.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java ---------------------------------------------------------------------- diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java new file mode 100644 index 0000000..55873d5 --- /dev/null +++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.flux.examples; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Connects to the 'WordCount' HBase table and prints counts for each word. + * + * Assumes you have run (or are running) the YAML topology definition in + * <code>simple_hbase.yaml</code> + * + * You will also need to modify `src/main/resources/hbase-site.xml` + * to point to your HBase instance, and then repackage with `mvn package`. + * This is a known issue. + * + */ +public class WordCountClient { + + public static void main(String[] args) throws Exception { + Configuration config = HBaseConfiguration.create(); + if(args.length > 0){ + config.set("hbase.rootdir", args[0]); + } + + HTable table = new HTable(config, "WordCount"); + String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; + + for (String word : words) { + Get get = new Get(Bytes.toBytes(word)); + Result result = table.get(get); + + byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count")); + byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word")); + + String wordStr = Bytes.toString(wordBytes); + System.out.println(wordStr); + long count = Bytes.toLong(countBytes); + System.out.println("Word: '" + wordStr + "', Count: " + count); + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java ---------------------------------------------------------------------- diff --git a/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java new file mode 100644 index 0000000..f7c80c7 --- /dev/null +++ b/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.flux.examples; + +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IBasicBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static backtype.storm.utils.Utils.tuple; + +/** + * This bolt is used by the HBase example. It simply emits the first field + * found in the incoming tuple as "word", with a "count" of `1`. + * + * In this case, the downstream HBase bolt handles the counting, so a value + * of `1` will just increment the HBase counter by one. + */ +public class WordCounter extends BaseBasicBolt { + private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class); + + + + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context) { + } + + /* + * Just output the word value with a count of 1. + * The HBaseBolt will handle incrementing the counter. + */ + public void execute(Tuple input, BasicOutputCollector collector) { + collector.emit(tuple(input.getValues().get(0), 1)); + } + + public void cleanup() { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/flux-examples/src/main/resources/hbase-site.xml b/flux-examples/src/main/resources/hbase-site.xml new file mode 100644 index 0000000..06c3031 --- /dev/null +++ b/flux-examples/src/main/resources/hbase-site.xml @@ -0,0 +1,36 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + <property> + <name>hbase.cluster.distributed</name> + <value>true</value> + </property> + <property> + <name>hbase.rootdir</name> + <value>hdfs://hadoop:54310/hbase</value> + </property> + <property> + <name>hbase.zookeeper.quorum</name> + <value>hadoop</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4a1db96f/flux-examples/src/main/resources/simple_hbase.yaml ---------------------------------------------------------------------- diff --git a/flux-examples/src/main/resources/simple_hbase.yaml b/flux-examples/src/main/resources/simple_hbase.yaml new file mode 100644 index 0000000..5eb70ed --- /dev/null +++ b/flux-examples/src/main/resources/simple_hbase.yaml @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +--- +# NOTE: To use this example, you will need to modify `src/main/resources/hbase-site.xml` +# to point to your HBase instance, and then repackage with `mvn package`. +# This is a known issue. + +# topology definition +# name to be used when submitting +name: "hbase-persistent-wordcount" + +# Components +components: + - id: "columnFields" + className: "backtype.storm.tuple.Fields" + constructorArgs: + - ["word"] + + - id: "counterFields" + className: "backtype.storm.tuple.Fields" + constructorArgs: + - ["count"] + + - id: "mapper" + className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper" + configMethods: + - name: "withRowKeyField" + args: ["word"] + - name: "withColumnFields" + args: [ref: "columnFields"] + - name: "withCounterFields" + args: [ref: "counterFields"] + - name: "withColumnFamily" + args: ["cf"] + +# topology configuration +# this will be passed to the submitter as a map of config options +config: + topology.workers: 1 + hbase.conf: + hbase.rootdir: "hdfs://hadoop:54310/hbase" + +# spout definitions +spouts: + - id: "word-spout" + className: "backtype.storm.testing.TestWordSpout" + parallelism: 1 + +# bolt definitions + +bolts: + - id: "count-bolt" + className: "org.apache.storm.flux.examples.WordCounter" + parallelism: 1 + + - id: "hbase-bolt" + className: "org.apache.storm.hbase.bolt.HBaseBolt" + constructorArgs: + - "WordCount" # HBase table name + - ref: "mapper" + configMethods: + - name: "withConfigKey" + args: ["hbase.conf"] + parallelism: 1 + +streams: + - name: "" # name isn't used (placeholder for logging, UI, etc.) + from: "word-spout" + to: "count-bolt" + grouping: + type: SHUFFLE + + - name: "" # name isn't used (placeholder for logging, UI, etc.) + from: "count-bolt" + to: "hbase-bolt" + grouping: + type: FIELDS + args: ["word"]
