http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index 0000000,0000000..9456d1b new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@@ -1,0 -1,0 +1,234 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux; ++ ++import backtype.storm.Config; ++import backtype.storm.generated.StormTopology; ++import org.apache.storm.flux.model.ExecutionContext; ++import org.apache.storm.flux.model.TopologyDef; ++import org.apache.storm.flux.parser.FluxParser; ++import org.apache.storm.flux.test.TestBolt; ++import org.junit.Test; ++ ++import java.io.File; ++ ++import static org.junit.Assert.*; ++ ++public class TCKTest { ++ @Test ++ public void testTCK() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testShellComponents() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testKafkaSpoutConfig() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testLoadFromResource() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ ++ @Test ++ public void testHdfs() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testHbase() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test(expected = IllegalArgumentException.class) ++ public void testBadHbase() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testIncludes() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ assertTrue(topologyDef.getName().equals("include-topology")); ++ assertTrue(topologyDef.getBolts().size() > 0); ++ assertTrue(topologyDef.getSpouts().size() > 0); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testTopologySource() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testTopologySourceWithReflection() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testTopologySourceWithConfigParam() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testTopologySourceWithMethodName() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ ++ @Test ++ public void testTridentTopologySource() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test(expected = IllegalArgumentException.class) ++ public void testInvalidTopologySource() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false); ++ assertFalse("Topology config is invalid.", topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ } ++ ++ ++ @Test ++ public void testTopologySourceWithGetMethodName() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ } ++ ++ @Test ++ public void testTopologySourceWithConfigMethods() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ ++ // make sure the property was actually set ++ TestBolt bolt = (TestBolt)context.getBolt("bolt-1"); ++ assertTrue(bolt.getFoo().equals("foo")); ++ assertTrue(bolt.getBar().equals("bar")); ++ assertTrue(bolt.getFooBar().equals("foobar")); ++ } ++ ++ @Test ++ public void testVariableSubstitution() throws Exception { ++ TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true); ++ assertTrue(topologyDef.validate()); ++ Config conf = FluxBuilder.buildConfig(topologyDef); ++ ExecutionContext context = new ExecutionContext(topologyDef, conf); ++ StormTopology topology = FluxBuilder.buildTopology(context); ++ assertNotNull(topology); ++ topology.validate(); ++ ++ // test basic substitution ++ assertEquals("Property not replaced.", ++ "substitution-topology", ++ context.getTopologyDef().getName()); ++ ++ // test environment variable substitution ++ // $PATH should be defined on most systems ++ String envPath = System.getenv().get("PATH"); ++ assertEquals("ENV variable not replaced.", ++ envPath, ++ context.getTopologyDef().getConfig().get("test.env.value")); ++ ++ } ++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java index 0000000,0000000..dcded17 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java @@@ -1,0 -1,0 +1,89 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.multilang; ++ ++ ++import org.junit.Test; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.io.ByteArrayOutputStream; ++import java.io.InputStream; ++import java.io.OutputStream; ++ ++import static org.junit.Assert.assertEquals; ++ ++/** ++ * Sanity checks to make sure we can at least invoke the shells used. ++ */ ++public class MultilangEnvirontmentTest { ++ private static final Logger LOG = LoggerFactory.getLogger(MultilangEnvirontmentTest.class); ++ ++ @Test ++ public void testInvokePython() throws Exception { ++ String[] command = new String[]{"python", "--version"}; ++ int exitVal = invokeCommand(command); ++ assertEquals("Exit value for python is 0.", 0, exitVal); ++ } ++ ++ @Test ++ public void testInvokeNode() throws Exception { ++ String[] command = new String[]{"node", "--version"}; ++ int exitVal = invokeCommand(command); ++ assertEquals("Exit value for node is 0.", 0, exitVal); ++ } ++ ++ private static class StreamRedirect implements Runnable { ++ private InputStream in; ++ private OutputStream out; ++ ++ public StreamRedirect(InputStream in, OutputStream out) { ++ this.in = in; ++ this.out = out; ++ } ++ ++ @Override ++ public void run() { ++ try { ++ int i = -1; ++ while ((i = this.in.read()) != -1) { ++ out.write(i); ++ } ++ this.in.close(); ++ this.out.close(); ++ } catch (Exception e) { ++ e.printStackTrace(); ++ } ++ } ++ } ++ ++ private int invokeCommand(String[] args) throws Exception { ++ LOG.debug("Invoking command: {}", args); ++ ++ ProcessBuilder pb = new ProcessBuilder(args); ++ pb.redirectErrorStream(true); ++ final Process proc = pb.start(); ++ ++ ByteArrayOutputStream out = new ByteArrayOutputStream(); ++ Thread t = new Thread(new StreamRedirect(proc.getInputStream(), out)); ++ t.start(); ++ int exitVal = proc.waitFor(); ++ LOG.debug("Command result: {}", out.toString()); ++ return exitVal; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java index 0000000,0000000..0d37997 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java @@@ -1,0 -1,0 +1,42 @@@ ++package org.apache.storm.flux.test; ++ ++import backtype.storm.generated.StormTopology; ++import backtype.storm.topology.TopologyBuilder; ++import org.apache.storm.flux.api.TopologySource; ++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt; ++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout; ++ ++import java.util.Map; ++ ++/** ++ * Test topology source that does not implement TopologySource, but has the same ++ * `getTopology()` method. ++ */ ++public class SimpleTopology{ ++ ++ ++ public SimpleTopology(){} ++ ++ public SimpleTopology(String foo, String bar){} ++ ++ public StormTopology getTopologyWithDifferentMethodName(Map<String, Object> config){ ++ return getTopology(config); ++ } ++ ++ ++ public StormTopology getTopology(Map<String, Object> config) { ++ TopologyBuilder builder = new TopologyBuilder(); ++ ++ // spouts ++ FluxShellSpout spout = new FluxShellSpout( ++ new String[]{"node", "randomsentence.js"}, ++ new String[]{"word"}); ++ builder.setSpout("sentence-spout", spout, 1); ++ ++ // bolts ++ builder.setBolt("log-bolt", new LogInfoBolt(), 1) ++ .shuffleGrouping("sentence-spout"); ++ ++ return builder.createTopology(); ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java index 0000000,0000000..2007082 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java @@@ -1,0 -1,0 +1,35 @@@ ++package org.apache.storm.flux.test; ++ ++import backtype.storm.generated.StormTopology; ++import backtype.storm.topology.TopologyBuilder; ++import org.apache.storm.flux.api.TopologySource; ++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt; ++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout; ++ ++import java.util.Map; ++ ++public class SimpleTopologySource implements TopologySource { ++ ++ ++ public SimpleTopologySource(){} ++ ++ public SimpleTopologySource(String foo, String bar){} ++ ++ ++ @Override ++ public StormTopology getTopology(Map<String, Object> config) { ++ TopologyBuilder builder = new TopologyBuilder(); ++ ++ // spouts ++ FluxShellSpout spout = new FluxShellSpout( ++ new String[]{"node", "randomsentence.js"}, ++ new String[]{"word"}); ++ builder.setSpout("sentence-spout", spout, 1); ++ ++ // bolts ++ builder.setBolt("log-bolt", new LogInfoBolt(), 1) ++ .shuffleGrouping("sentence-spout"); ++ ++ return builder.createTopology(); ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java index 0000000,0000000..f29b543 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java @@@ -1,0 -1,0 +1,38 @@@ ++package org.apache.storm.flux.test; ++ ++import backtype.storm.Config; ++import backtype.storm.generated.StormTopology; ++import backtype.storm.topology.TopologyBuilder; ++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt; ++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout; ++ ++import java.util.Map; ++ ++/** ++ * Test topology source that does not implement TopologySource, but has the same ++ * `getTopology()` method. ++ */ ++public class SimpleTopologyWithConfigParam { ++ ++ ++ public SimpleTopologyWithConfigParam(){} ++ ++ public SimpleTopologyWithConfigParam(String foo, String bar){} ++ ++ ++ public StormTopology getTopology(Config config) { ++ TopologyBuilder builder = new TopologyBuilder(); ++ ++ // spouts ++ FluxShellSpout spout = new FluxShellSpout( ++ new String[]{"node", "randomsentence.js"}, ++ new String[]{"word"}); ++ builder.setSpout("sentence-spout", spout, 1); ++ ++ // bolts ++ builder.setBolt("log-bolt", new LogInfoBolt(), 1) ++ .shuffleGrouping("sentence-spout"); ++ ++ return builder.createTopology(); ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java index 0000000,0000000..e88f2cf new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java @@@ -1,0 -1,0 +1,63 @@@ ++package org.apache.storm.flux.test; ++ ++import backtype.storm.topology.BasicOutputCollector; ++import backtype.storm.topology.OutputFieldsDeclarer; ++import backtype.storm.topology.base.BaseBasicBolt; ++import backtype.storm.tuple.Tuple; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++ ++public class TestBolt extends BaseBasicBolt { ++ private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class); ++ ++ private String foo; ++ private String bar; ++ private String fooBar; ++ ++ public static enum TestEnum { ++ FOO, ++ BAR ++ } ++ ++ public TestBolt(TestEnum te){ ++ ++ } ++ ++ public TestBolt(TestEnum te, float f){ ++ ++ } ++ ++ @Override ++ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { ++ LOG.info("{}", tuple); ++ } ++ ++ @Override ++ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { ++ ++ } ++ ++ // config methods ++ public void withFoo(String foo){ ++ this.foo = foo; ++ } ++ public void withBar(String bar){ ++ this.bar = bar; ++ } ++ ++ public void withFooBar(String foo, String bar){ ++ this.fooBar = foo + bar; ++ } ++ ++ public String getFoo(){ ++ return this.foo; ++ } ++ public String getBar(){ ++ return this.bar; ++ } ++ ++ public String getFooBar(){ ++ return this.fooBar; ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java index 0000000,0000000..3cb6634 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java @@@ -1,0 -1,0 +1,54 @@@ ++package org.apache.storm.flux.test; ++ ++import backtype.storm.Config; ++import backtype.storm.generated.StormTopology; ++import backtype.storm.tuple.Fields; ++import backtype.storm.tuple.Values; ++import storm.kafka.StringScheme; ++import storm.trident.TridentTopology; ++import storm.trident.operation.BaseFunction; ++import storm.trident.operation.TridentCollector; ++import storm.trident.operation.builtin.Count; ++import storm.trident.testing.FixedBatchSpout; ++import storm.trident.testing.MemoryMapState; ++import storm.trident.tuple.TridentTuple; ++ ++/** ++ * Basic Trident example that will return a `StormTopology` from a `getTopology()` method. ++ */ ++public class TridentTopologySource { ++ ++ private FixedBatchSpout spout; ++ ++ public StormTopology getTopology(Config config) { ++ ++ this.spout = new FixedBatchSpout(new Fields("sentence"), 20, ++ new Values("one two"), ++ new Values("two three"), ++ new Values("three four"), ++ new Values("four five"), ++ new Values("five six") ++ ); ++ ++ ++ TridentTopology trident = new TridentTopology(); ++ ++ trident.newStream("wordcount", spout).name("sentence").parallelismHint(1).shuffle() ++ .each(new Fields("sentence"), new Split(), new Fields("word")) ++ .parallelismHint(1) ++ .groupBy(new Fields("word")) ++ .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) ++ .parallelismHint(1); ++ return trident.build(); ++ } ++ ++ public static class Split extends BaseFunction { ++ @Override ++ public void execute(TridentTuple tuple, TridentCollector collector) { ++ String sentence = tuple.getString(0); ++ for (String word : sentence.split(" ")) { ++ collector.emit(new Values(word)); ++ } ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml index 0000000,0000000..5d91400 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml @@@ -1,0 -1,0 +1,98 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "hbase-wordcount" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++ ++components: ++ - id: "columnFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ - ["word"] ++ ++ - id: "counterFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ # !!! the following won't work, and should thow an IllegalArgumentException... ++ - "count" ++ ++ - id: "mapper" ++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper" ++ configMethods: ++ - name: "withRowKeyField" ++ args: ["word"] ++ - name: "withColumnFields" ++ args: [ref: "columnFields"] ++ - name: "withCounterFields" ++ args: [ref: "counterFields"] ++ - name: "withColumnFamily" ++ args: ["cf"] ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ hbase.conf: ++ hbase.rootdir: "hdfs://hadoop:54310/hbase" ++ hbase.zookeeper.quorum: "hadoop" ++ ++# spout definitions ++spouts: ++ - id: "word-spout" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ ++# bolt definitions ++ ++bolts: ++ - id: "count-bolt" ++ className: "backtype.storm.testing.TestWordCounter" ++ ++ - id: "hbase-bolt" ++ className: "org.apache.storm.hbase.bolt.HBaseBolt" ++ constructorArgs: ++ - "WordCount" # HBase table name ++ - ref: "mapper" ++ configMethods: ++ - name: "withConfigKey" ++ args: ["hbase.conf"] ++ parallelism: 1 ++ ++ ++streams: ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "word-spout" ++ to: "count-bolt" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "count-bolt" ++ to: "hbase-bolt" ++ grouping: ++ type: FIELDS ++ args: ["word"] http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml index 0000000,0000000..65211ff new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml @@@ -1,0 -1,0 +1,70 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++--- ++name: "yaml-topology" ++ ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++bolts: ++ - id: "bolt-1" ++ className: "org.apache.storm.flux.test.TestBolt" ++ parallelism: 1 ++ constructorArgs: ++ - FOO # enum class ++ - 1.0 ++ configMethods: ++ - name: "withFoo" ++ args: ++ - "foo" ++ - name: "withBar" ++ args: ++ - "bar" ++ - name: "withFooBar" ++ args: ++ - "foo" ++ - "bar" ++ ++ ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++streams: ++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) ++# id: "connection-1" ++ from: "spout-1" ++ to: "bolt-1" ++ grouping: ++ type: SHUFFLE ++ ++ ++ ++ ++ ++ ++ ++ http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml index 0000000,0000000..6f3c88a new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml @@@ -1,0 -1,0 +1,10 @@@ ++--- ++ ++# configuration that uses an existing topology that does not implement TopologySource ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopology" ++ methodName: "getTopologyWithDifferentMethodName" ++ constructorArgs: ++ - "foo" ++ - "bar" http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml index 0000000,0000000..8af8a84 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml @@@ -1,0 -1,0 +1,9 @@@ ++--- ++ ++# configuration that uses an existing topology that does not implement TopologySource ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopologyWithConfigParam" ++ constructorArgs: ++ - "foo" ++ - "bar" http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml index 0000000,0000000..dd3e1e8 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml @@@ -1,0 -1,0 +1,9 @@@ ++--- ++ ++# configuration that uses an existing topology that does not implement TopologySource ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopology" ++ constructorArgs: ++ - "foo" ++ - "bar" http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml index 0000000,0000000..5ac682c new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml @@@ -1,0 -1,0 +1,9 @@@ ++--- ++ ++name: "existing-topology" ++ ++config: ++ topology.workers: 1 ++ ++topologySource: ++ className: "org.apache.storm.flux.test.TridentTopologySource" http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology.yaml index 0000000,0000000..fa6a0b3 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/existing-topology.yaml @@@ -1,0 -1,0 +1,8 @@@ ++--- ++ ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopologySource" ++ constructorArgs: ++ - "foo" ++ - "bar" http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml index 0000000,0000000..8fe0a9a new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml @@@ -1,0 -1,0 +1,97 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "hdfs-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++components: ++ - id: "syncPolicy" ++ className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy" ++ constructorArgs: ++ - 1000 ++ - id: "rotationPolicy" ++ className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy" ++ constructorArgs: ++ - 5.0 ++ - MB ++ ++ - id: "fileNameFormat" ++ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" ++ configMethods: ++ - name: "withPath" ++ args: ["/tmp/foo/"] ++ - name: "withExtension" ++ args: [".txt"] ++ ++ - id: "recordFormat" ++ className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat" ++ configMethods: ++ - name: "withFieldDelimiter" ++ args: ["|"] ++ ++ - id: "rotationAction" ++ className: "org.apache.storm.hdfs.common.rotation.MoveFileAction" ++ configMethods: ++ - name: "toDestination" ++ args: ["/tmp/dest2"] ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++ ++# HdfsBolt bolt = new HdfsBolt() ++# .withConfigKey("hdfs.config") ++# .withFsUrl(args[0]) ++# .withFileNameFormat(fileNameFormat) ++# .withRecordFormat(format) ++# .withRotationPolicy(rotationPolicy) ++# .withSyncPolicy(syncPolicy) ++# .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/")); ++bolts: ++ - id: "bolt-1" ++ className: "org.apache.storm.hdfs.bolt.HdfsBolt" ++ configMethods: ++ - name: "withConfigKey" ++ args: ["hdfs.config"] ++ - name: "withFsUrl" ++ args: ["hdfs://hadoop:54310"] ++ - name: "withFileNameFormat" ++ args: [ref: "fileNameFormat"] ++ - name: "withRecordFormat" ++ args: [ref: "recordFormat"] ++ - name: "withRotationPolicy" ++ args: [ref: "rotationPolicy"] ++ - name: "withSyncPolicy" ++ args: [ref: "syncPolicy"] ++ - name: "addRotationAction" ++ args: [ref: "rotationAction"] ++ parallelism: 1 ++ # ... ++ http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/include_test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/include_test.yaml index 0000000,0000000..702f590 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/include_test.yaml @@@ -1,0 -1,0 +1,25 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test includes by defining nothing, and simply override the topology name ++--- ++ ++name: "include-topology" ++ ++includes: ++ - resource: true ++ file: "/configs/shell_test.yaml" ++ override: false #otherwise subsequent includes that define 'name' would override http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml index 0000000,0000000..72128df new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml @@@ -1,0 -1,0 +1,17 @@@ ++# This is an invalid config. It defines both a topologySource and a list of spouts. ++--- ++ ++name: "existing-topology" ++topologySource: ++ className: "org.apache.storm.flux.test.SimpleTopologySource" ++ ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/kafka_test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/kafka_test.yaml index 0000000,0000000..17cd8e2 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/kafka_test.yaml @@@ -1,0 -1,0 +1,126 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "kafka-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++components: ++ - id: "stringScheme" ++ className: "storm.kafka.StringScheme" ++ ++ - id: "stringMultiScheme" ++ className: "backtype.storm.spout.SchemeAsMultiScheme" ++ constructorArgs: ++ - ref: "stringScheme" ++ ++ - id: "zkHosts" ++ className: "storm.kafka.ZkHosts" ++ constructorArgs: ++ - "localhost:2181" ++ ++# Alternative kafka config ++# - id: "kafkaConfig" ++# className: "storm.kafka.KafkaConfig" ++# constructorArgs: ++# # brokerHosts ++# - ref: "zkHosts" ++# # topic ++# - "myKafkaTopic" ++# # clientId (optional) ++# - "myKafkaClientId" ++ ++ - id: "spoutConfig" ++ className: "storm.kafka.SpoutConfig" ++ constructorArgs: ++ # brokerHosts ++ - ref: "zkHosts" ++ # topic ++ - "myKafkaTopic" ++ # zkRoot ++ - "/kafkaSpout" ++ # id ++ - "myId" ++ properties: ++ - name: "forceFromStart" ++ value: true ++ - name: "scheme" ++ ref: "stringMultiScheme" ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "kafka-spout" ++ className: "storm.kafka.KafkaSpout" ++ constructorArgs: ++ - ref: "spoutConfig" ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "kafka-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/shell_test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/shell_test.yaml index 0000000,0000000..b473fa7 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/shell_test.yaml @@@ -1,0 -1,0 +1,104 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "shell-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++#components: ++# - id: "myComponent" ++# className: "com.foo.bar.MyComponent" ++# constructorArgs: ++# - ... ++# properties: ++# foo: "bar" ++# bar: "foo" ++ ++# NOTE: We may want to consider some level of spring integration. For example, allowing component references ++# to a spring `ApplicationContext`. ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "sentence-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml index 0000000,0000000..e407bd9 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml @@@ -1,0 -1,0 +1,120 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "hbase-wordcount" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++ ++# WordSpout spout = new WordSpout(); ++# WordCounter bolt = new WordCounter(); ++# ++# SimpleHBaseMapper mapper = new SimpleHBaseMapper() ++# .withRowKeyField("word") ++# .withColumnFields(new Fields("word")) ++# .withCounterFields(new Fields("count")) ++# .withColumnFamily("cf"); ++# ++# HBaseBolt hbase = new HBaseBolt("WordCount", mapper) ++# .withConfigKey("hbase.conf"); ++# ++# ++# // wordSpout ==> countBolt ==> HBaseBolt ++# TopologyBuilder builder = new TopologyBuilder(); ++# ++# builder.setSpout(WORD_SPOUT, spout, 1); ++# builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); ++# builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); ++ ++ ++ ++ ++components: ++ - id: "columnFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ - ["word"] ++ ++ - id: "counterFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ - ["count"] ++ ++ - id: "mapper" ++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper" ++ configMethods: ++ - name: "withRowKeyField" ++ args: ["word"] ++ - name: "withColumnFields" ++ args: [ref: "columnFields"] ++ - name: "withCounterFields" ++ args: [ref: "counterFields"] ++ - name: "withColumnFamily" ++ args: ["cf"] ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ hbase.conf: ++ hbase.rootdir: "hdfs://hadoop:54310/hbase" ++ hbase.zookeeper.quorum: "hadoop" ++ ++# spout definitions ++spouts: ++ - id: "word-spout" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ ++# bolt definitions ++ ++bolts: ++ - id: "count-bolt" ++ className: "backtype.storm.testing.TestWordCounter" ++ ++ - id: "hbase-bolt" ++ className: "org.apache.storm.hbase.bolt.HBaseBolt" ++ constructorArgs: ++ - "WordCount" # HBase table name ++ - ref: "mapper" ++ configMethods: ++ - name: "withConfigKey" ++ args: ["hbase.conf"] ++ parallelism: 1 ++ ++ ++streams: ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "word-spout" ++ to: "count-bolt" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "count-bolt" ++ to: "hbase-bolt" ++ grouping: ++ type: FIELDS ++ args: ["word"] http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/substitution-test.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/substitution-test.yaml index 0000000,0000000..13f1960 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/substitution-test.yaml @@@ -1,0 -1,0 +1,106 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "${topology.name}" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++#components: ++# - id: "myComponent" ++# className: "com.foo.bar.MyComponent" ++# constructorArgs: ++# - ... ++# properties: ++# foo: "bar" ++# bar: "foo" ++ ++# NOTE: We may want to consider some level of spring integration. For example, allowing component references ++# to a spring `ApplicationContext`. ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # test environent variable substitution ++ test.env.value: ${ENV-PATH} ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "sentence-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/tck.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/tck.yaml index 0000000,0000000..7e9b614 new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/tck.yaml @@@ -1,0 -1,0 +1,95 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++ ++# YAML configuration to serve as a basic smoke test for what is supported. ++# ++# We should support comments, so if we've failed so far, things aren't good. ++ ++# we shouldn't choke if we see a document separator... ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "yaml-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++#components: ++# - id: "myComponent" ++# className: "com.foo.bar.MyComponent" ++# properties: ++# foo: "bar" ++# bar: "foo" ++ ++# NOTE: We may want to consider some level of spring integration. For example, allowing component references ++# to a spring `ApplicationContext`. ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "spout-1" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++bolts: ++ - id: "bolt-1" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++ ++ - id: "bolt-2" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++streams: ++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) ++# id: "connection-1" ++ from: "spout-1" ++ to: "bolt-1" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "bolt-1 --> bolt2" ++ from: "bolt-1" ++ to: "bolt-2" ++ grouping: ++ type: CUSTOM ++ customClass: ++ className: "backtype.storm.testing.NGrouping" ++ constructorArgs: ++ - 1 ++ ++ ++ ++ ++ ++ ++ http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/test.properties ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/configs/test.properties index 0000000,0000000..0730d5f new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/configs/test.properties @@@ -1,0 -1,0 +1,2 @@@ ++topology.name: substitution-topology ++some.other.property: foo bar http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/logback.xml ---------------------------------------------------------------------- diff --cc external/flux/flux-core/src/test/resources/logback.xml index 0000000,0000000..1853b8a new file mode 100644 --- /dev/null +++ b/external/flux/flux-core/src/test/resources/logback.xml @@@ -1,0 -1,0 +1,30 @@@ ++<?xml version="1.0"?> ++<!-- ++ Licensed to the Apache Software Foundation (ASF) under one or more ++ contributor license agreements. See the NOTICE file distributed with ++ this work for additional information regarding copyright ownership. ++ The ASF licenses this file to You under the Apache License, Version 2.0 ++ (the "License"); you may not use this file except in compliance with ++ the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++--> ++<configuration scan="true" scanPeriod="30 seconds"> ++ <appender name="A1" class="ch.qos.logback.core.ConsoleAppender"> ++ <encoder> ++ <pattern>%-4r [%t] %-5p %c - %m%n</pattern> ++ </encoder> ++ </appender> ++ <logger name="org.apache.storm.zookeeper" level="WARN"/> ++ <logger name="org.apache.storm.curator" level="WARN"/> ++ <logger name="org.apache.storm.flux" level="DEBUG"/> ++ <root level="DEBUG"> ++ <appender-ref ref="A1"/> ++ </root> ++</configuration> http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/README.md ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/README.md index 0000000,0000000..b3798a6 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/README.md @@@ -1,0 -1,0 +1,68 @@@ ++# Flux Examples ++A collection of examples illustrating various capabilities. ++ ++## Building From Source and Running ++ ++Checkout the projects source and perform a top level Maven build (i.e. from the `flux` directory): ++ ++```bash ++git clone https://github.com/ptgoetz/flux.git ++cd flux ++mvn install ++``` ++ ++This will create a shaded (i.e. "fat" or "uber") jar in the `flux-examples/target` directory that can run/deployed with ++the `storm` command: ++ ++```bash ++cd flux-examples ++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_wordcount.yaml ++``` ++ ++The example YAML files are also packaged in the examples jar, so they can also be referenced with Flux's `--resource` ++command line switch: ++ ++```bash ++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local --resource /simple_wordcount.yaml ++``` ++ ++## Available Examples ++ ++### [simple_wordcount.yaml](src/main/resources/simple_wordcount.yaml) ++ ++This is a very basic wordcount example using Java spouts and bolts. It simply logs the running count of each word ++received. ++ ++### [multilang.yaml](src/main/resources/multilang.yaml) ++ ++Another wordcount example that uses a spout written in JavaScript (node.js), a bolt written in Python, and two bolts ++written in java. ++ ++### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml) ++This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`, ++and `constructor arguments` constructs. ++ ++### [simple_hdfs.yaml](src/main/resources/simple_hdfs.yaml) ++ ++This example demonstrates using Flux to setup a storm-hdfs bolt to write to an HDFS cluster. It also demonstrates Flux's ++variable substitution/filtering feature. ++ ++To run the `simple_hdfs.yaml` example, copy the `hdfs_bolt.properties` file to a convenient location and change, at ++least, the property `hdfs.url` to point to a HDFS cluster. Then you can run the example something like: ++ ++```bash ++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_hdfs.yaml --filter my_hdfs_bolt.properties ++``` ++ ++### [simple_hbase.yaml](src/main/resources/simple_hbase.yaml) ++ ++This example illustrates how to use Flux to setup a storm-hbase bolt to write to HBase. ++ ++In order to use this example, you will need to edit the `src/main resrouces/hbase-site.xml` file to reflect your HBase ++environment, and then rebuild the topology jar. ++ ++You can do so by running the following Maven command in the `flux-examples` directory: ++ ++```bash ++mvn clean install ++``` http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/pom.xml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/pom.xml index 0000000,0000000..0b9796e new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/pom.xml @@@ -1,0 -1,0 +1,87 @@@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ Licensed to the Apache Software Foundation (ASF) under one or more ++ contributor license agreements. See the NOTICE file distributed with ++ this work for additional information regarding copyright ownership. ++ The ASF licenses this file to You under the Apache License, Version 2.0 ++ (the "License"); you may not use this file except in compliance with ++ the License. You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++ Unless required by applicable law or agreed to in writing, software ++ distributed under the License is distributed on an "AS IS" BASIS, ++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ See the License for the specific language governing permissions and ++ limitations under the License. ++--> ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <parent> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux</artifactId> ++ <version>0.3.1-SNAPSHOT</version> ++ <relativePath>../pom.xml</relativePath> ++ </parent> ++ ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-examples</artifactId> ++ <packaging>jar</packaging> ++ ++ <name>flux-examples</name> ++ <url>https://github.com/ptgoetz/flux</url> ++ ++ <dependencies> ++ <dependency> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-core</artifactId> ++ <version>${project.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>com.github.ptgoetz</groupId> ++ <artifactId>flux-wrappers</artifactId> ++ <version>${project.version}</version> ++ </dependency> ++ ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-hdfs</artifactId> ++ <version>${storm.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.storm</groupId> ++ <artifactId>storm-hbase</artifactId> ++ <version>${storm.version}</version> ++ </dependency> ++ </dependencies> ++ ++ <build> ++ <plugins> ++ <plugin> ++ <groupId>org.apache.maven.plugins</groupId> ++ <artifactId>maven-shade-plugin</artifactId> ++ <version>1.4</version> ++ <configuration> ++ <createDependencyReducedPom>true</createDependencyReducedPom> ++ </configuration> ++ <executions> ++ <execution> ++ <phase>package</phase> ++ <goals> ++ <goal>shade</goal> ++ </goals> ++ <configuration> ++ <transformers> ++ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> ++ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> ++ <mainClass>org.apache.storm.flux.Flux</mainClass> ++ </transformer> ++ </transformers> ++ </configuration> ++ </execution> ++ </executions> ++ </plugin> ++ </plugins> ++ </build> ++</project> http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java index 0000000,0000000..eb4fb7a new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java @@@ -1,0 -1,0 +1,74 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.examples; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.hbase.HBaseConfiguration; ++import org.apache.hadoop.hbase.client.Get; ++import org.apache.hadoop.hbase.client.HTable; ++import org.apache.hadoop.hbase.client.Result; ++import org.apache.hadoop.hbase.util.Bytes; ++ ++import java.io.FileInputStream; ++import java.util.Properties; ++ ++/** ++ * Connects to the 'WordCount' HBase table and prints counts for each word. ++ * ++ * Assumes you have run (or are running) the YAML topology definition in ++ * <code>simple_hbase.yaml</code> ++ * ++ * You will also need to modify `src/main/resources/hbase-site.xml` ++ * to point to your HBase instance, and then repackage with `mvn package`. ++ * This is a known issue. ++ * ++ */ ++public class WordCountClient { ++ ++ public static void main(String[] args) throws Exception { ++ Configuration config = HBaseConfiguration.create(); ++ if(args.length == 1){ ++ Properties props = new Properties(); ++ props.load(new FileInputStream(args[0])); ++ System.out.println("HBase configuration:"); ++ for(Object key : props.keySet()) { ++ System.out.println(key + "=" + props.get(key)); ++ config.set((String)key, props.getProperty((String)key)); ++ } ++ } else { ++ System.out.println("Usage: WordCountClient <hbase_config.properties>"); ++ System.exit(1); ++ } ++ ++ HTable table = new HTable(config, "WordCount"); ++ String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; ++ ++ for (String word : words) { ++ Get get = new Get(Bytes.toBytes(word)); ++ Result result = table.get(get); ++ ++ byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count")); ++ byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word")); ++ ++ String wordStr = Bytes.toString(wordBytes); ++ long count = Bytes.toLong(countBytes); ++ System.out.println("Word: '" + wordStr + "', Count: " + count); ++ } ++ ++ } ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java index 0000000,0000000..f7c80c7 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java @@@ -1,0 -1,0 +1,71 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.storm.flux.examples; ++ ++import backtype.storm.task.TopologyContext; ++import backtype.storm.topology.BasicOutputCollector; ++import backtype.storm.topology.IBasicBolt; ++import backtype.storm.topology.OutputFieldsDeclarer; ++import backtype.storm.topology.base.BaseBasicBolt; ++import backtype.storm.tuple.Fields; ++import backtype.storm.tuple.Tuple; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.util.Map; ++ ++import static backtype.storm.utils.Utils.tuple; ++ ++/** ++ * This bolt is used by the HBase example. It simply emits the first field ++ * found in the incoming tuple as "word", with a "count" of `1`. ++ * ++ * In this case, the downstream HBase bolt handles the counting, so a value ++ * of `1` will just increment the HBase counter by one. ++ */ ++public class WordCounter extends BaseBasicBolt { ++ private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class); ++ ++ ++ ++ @SuppressWarnings("rawtypes") ++ public void prepare(Map stormConf, TopologyContext context) { ++ } ++ ++ /* ++ * Just output the word value with a count of 1. ++ * The HBaseBolt will handle incrementing the counter. ++ */ ++ public void execute(Tuple input, BasicOutputCollector collector) { ++ collector.emit(tuple(input.getValues().get(0), 1)); ++ } ++ ++ public void cleanup() { ++ ++ } ++ ++ public void declareOutputFields(OutputFieldsDeclarer declarer) { ++ declarer.declare(new Fields("word", "count")); ++ } ++ ++ @Override ++ public Map<String, Object> getComponentConfiguration() { ++ return null; ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/hbase_bolt.properties ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/hbase_bolt.properties index 0000000,0000000..f8ed50c new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/hbase_bolt.properties @@@ -1,0 -1,0 +1,18 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++hbase.rootdir=hdfs://hadoop:54310/hbase ++hbase.zookeeper.quorum=hadoop http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/hdfs_bolt.properties ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/hdfs_bolt.properties index 0000000,0000000..7bcbe7a new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/hdfs_bolt.properties @@@ -1,0 -1,0 +1,26 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++ ++# The HDFS url ++hdfs.url=hdfs://hadoop:54310 ++ ++# The HDFS directory where the bolt will write incoming data ++hdfs.write.dir=/incoming ++ ++# The HDFS directory where files will be moved once the bolt has ++# finished writing to it. ++hdfs.dest.dir=/complete http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/kafka_spout.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/kafka_spout.yaml index 0000000,0000000..8ffddc5 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/kafka_spout.yaml @@@ -1,0 -1,0 +1,136 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "kafka-topology" ++ ++# Components ++# Components are analagous to Spring beans. They are meant to be used as constructor, ++# property(setter), and builder arguments. ++# ++# for the time being, components must be declared in the order they are referenced ++components: ++ - id: "stringScheme" ++ className: "storm.kafka.StringScheme" ++ ++ - id: "stringMultiScheme" ++ className: "backtype.storm.spout.SchemeAsMultiScheme" ++ constructorArgs: ++ - ref: "stringScheme" ++ ++ - id: "zkHosts" ++ className: "storm.kafka.ZkHosts" ++ constructorArgs: ++ - "localhost:2181" ++ ++# Alternative kafka config ++# - id: "kafkaConfig" ++# className: "storm.kafka.KafkaConfig" ++# constructorArgs: ++# # brokerHosts ++# - ref: "zkHosts" ++# # topic ++# - "myKafkaTopic" ++# # clientId (optional) ++# - "myKafkaClientId" ++ ++ - id: "spoutConfig" ++ className: "storm.kafka.SpoutConfig" ++ constructorArgs: ++ # brokerHosts ++ - ref: "zkHosts" ++ # topic ++ - "myKafkaTopic" ++ # zkRoot ++ - "/kafkaSpout" ++ # id ++ - "myId" ++ properties: ++ - name: "forceFromStart" ++ value: true ++ - name: "scheme" ++ ref: "stringMultiScheme" ++ ++ ++ ++# NOTE: We may want to consider some level of spring integration. For example, allowing component references ++# to a spring `ApplicationContext`. ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "kafka-spout" ++ className: "storm.kafka.KafkaSpout" ++ constructorArgs: ++ - ref: "spoutConfig" ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "kafka-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/multilang.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/multilang.yaml index 0000000,0000000..4f80667 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/multilang.yaml @@@ -1,0 -1,0 +1,89 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++# Test ability to wire together shell spouts/bolts ++--- ++ ++# topology definition ++# name to be used when submitting ++name: "shell-topology" ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++# ++config: ++ topology.workers: 1 ++ # ... ++ ++# spout definitions ++spouts: ++ - id: "sentence-spout" ++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" ++ # shell spout constructor takes 2 arguments: String[], String[] ++ constructorArgs: ++ # command line ++ - ["node", "randomsentence.js"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++# bolt definitions ++bolts: ++ - id: "splitsentence" ++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" ++ constructorArgs: ++ # command line ++ - ["python", "splitsentence.py"] ++ # output fields ++ - ["word"] ++ parallelism: 1 ++ # ... ++ ++ - id: "log" ++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" ++ parallelism: 1 ++ # ... ++ ++ - id: "count" ++ className: "backtype.storm.testing.TestWordCounter" ++ parallelism: 1 ++ # ... ++ ++#stream definitions ++# stream definitions define connections between spouts and bolts. ++# note that such connections can be cyclical ++# custom stream groupings are also supported ++ ++streams: ++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.) ++ from: "sentence-spout" ++ to: "splitsentence" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "split --> count" ++ from: "splitsentence" ++ to: "count" ++ grouping: ++ type: FIELDS ++ args: ["word"] ++ ++ - name: "count --> log" ++ from: "count" ++ to: "log" ++ grouping: ++ type: SHUFFLE http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_hbase.yaml ---------------------------------------------------------------------- diff --cc external/flux/flux-examples/src/main/resources/simple_hbase.yaml index 0000000,0000000..62686d0 new file mode 100644 --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/simple_hbase.yaml @@@ -1,0 -1,0 +1,92 @@@ ++# Licensed to the Apache Software Foundation (ASF) under one ++# or more contributor license agreements. See the NOTICE file ++# distributed with this work for additional information ++# regarding copyright ownership. The ASF licenses this file ++# to you under the Apache License, Version 2.0 (the ++# "License"); you may not use this file except in compliance ++# with the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++--- ++# NOTE: To use this example, you will need to modify `src/main/resources/hbase-site.xml` ++# to point to your HBase instance, and then repackage with `mvn package`. ++# This is a known issue. ++ ++# topology definition ++# name to be used when submitting ++name: "hbase-persistent-wordcount" ++ ++# Components ++components: ++ - id: "columnFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ - ["word"] ++ ++ - id: "counterFields" ++ className: "backtype.storm.tuple.Fields" ++ constructorArgs: ++ - ["count"] ++ ++ - id: "mapper" ++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper" ++ configMethods: ++ - name: "withRowKeyField" ++ args: ["word"] ++ - name: "withColumnFields" ++ args: [ref: "columnFields"] ++ - name: "withCounterFields" ++ args: [ref: "counterFields"] ++ - name: "withColumnFamily" ++ args: ["cf"] ++ ++# topology configuration ++# this will be passed to the submitter as a map of config options ++config: ++ topology.workers: 1 ++ hbase.conf: ++ hbase.rootdir: "${hbase.rootdir}" ++ hbase.zookeeper.quorum: "${hbase.zookeeper.quorum}" ++ ++# spout definitions ++spouts: ++ - id: "word-spout" ++ className: "backtype.storm.testing.TestWordSpout" ++ parallelism: 1 ++ ++# bolt definitions ++ ++bolts: ++ - id: "count-bolt" ++ className: "org.apache.storm.flux.examples.WordCounter" ++ parallelism: 1 ++ ++ - id: "hbase-bolt" ++ className: "org.apache.storm.hbase.bolt.HBaseBolt" ++ constructorArgs: ++ - "WordCount" # HBase table name ++ - ref: "mapper" ++ configMethods: ++ - name: "withConfigKey" ++ args: ["hbase.conf"] ++ parallelism: 1 ++ ++streams: ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "word-spout" ++ to: "count-bolt" ++ grouping: ++ type: SHUFFLE ++ ++ - name: "" # name isn't used (placeholder for logging, UI, etc.) ++ from: "count-bolt" ++ to: "hbase-bolt" ++ grouping: ++ type: FIELDS ++ args: ["word"]