http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Trident-API-Overview.md
----------------------------------------------------------------------
diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md
index 3b68645..4dec1e3 100644
--- a/docs/Trident-API-Overview.md
+++ b/docs/Trident-API-Overview.md
@@ -1,7 +1,8 @@
 ---
+title: Trident API Overview
 layout: documentation
+documentation: true
 ---
-# Trident API overview
 
 The core data model in Trident is the "Stream", processed as a series of 
batches. A stream is partitioned among the nodes in the cluster, and operations 
applied to a stream are applied in parallel across each partition.
 
@@ -58,7 +59,7 @@ The resulting tuples would have fields ["a", "b", "c", "d"] 
and look like this:
 Filters take in a tuple as input and decide whether or not to keep that tuple 
or not. Suppose you had this filter:
 
 ```java
-public class MyFilter extends BaseFunction {
+public class MyFilter extends BaseFilter {
     public boolean isKeep(TridentTuple tuple) {
         return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
     }
@@ -76,14 +77,227 @@ Now suppose you had these tuples with fields ["a", "b", 
"c"]:
 If you ran this code:
 
 ```java
-mystream.each(new Fields("b", "a"), new MyFilter())
+mystream.filter(new MyFilter())
 ```
 
 The resulting tuples would be:
 
 ```
-[2, 1, 1]
+[1, 2, 3]
+```
+
+### map and flatMap
+
+`map` returns a stream consisting of the result of applying the given mapping 
function to the tuples of the stream. This
+can be used to apply a one-one transformation to the tuples.
+
+For example, if there is a stream of words and you wanted to convert it to a 
stream of upper case words,
+you could define a mapping function as follows,
+
+```java
+public class UpperCase extends MapFunction {
+ @Override
+ public Values execute(TridentTuple input) {
+   return new Values(input.getString(0).toUpperCase());
+ }
+}
+```
+
+The mapping function can then be applied on the stream to produce a stream of 
uppercase words.
+
+```java
+mystream.map(new UpperCase())
+```
+
+`flatMap` is similar to `map` but has the effect of applying a one-to-many 
transformation to the values of the stream,
+and then flattening the resulting elements into a new stream.
+
+For example, if there is a stream of sentences and you wanted to convert it to 
a stream of words,
+you could define a flatMap function as follows,
+
+```java
+public class Split extends FlatMapFunction {
+  @Override
+  public Iterable<Values> execute(TridentTuple input) {
+    List<Values> valuesList = new ArrayList<>();
+    for (String word : input.getString(0).split(" ")) {
+      valuesList.add(new Values(word));
+    }
+    return valuesList;
+  }
+}
+```
+
+The flatMap function can then be applied on the stream of sentences to produce 
a stream of words,
+
+```java
+mystream.flatMap(new Split())
+```
+
+Of course these operations can be chained, so a stream of uppercase words can 
be obtained from a stream of sentences as follows,
+
+```java
+mystream.flatMap(new Split()).map(new UpperCase())
+```
+### peek
+`peek` can be used to perform an additional action on each trident tuple as 
they flow through the stream.
+ This could be useful for debugging to see the tuples as they flow past a 
certain point in a pipeline.
+
+For example, the below code would print the result of converting the words to 
uppercase before they are passed to `groupBy`
+```java
+ mystream.flatMap(new Split()).map(new UpperCase())
+         .peek(new Consumer() {
+                @Override
+                public void accept(TridentTuple input) {
+                  System.out.println(input.getString(0));
+                }
+         })
+         .groupBy(new Fields("word"))
+         .persistentAggregate(new MemoryMapState.Factory(), new Count(), new 
Fields("count"))
+```
+
+### min and minBy
+`min` and `minBy` operations return minimum value on each partition of a batch 
of tuples in a trident stream.
+
+Suppose, a trident stream contains fields ["device-id", "count"] and the 
following partitions of tuples
+
+```
+Partition 0:
+[123, 2]
+[113, 54]
+[23,  28]
+[237, 37]
+[12,  23]
+[62,  17]
+[98,  42]
+
+Partition 1:
+[64,  18]
+[72,  54]
+[2,   28]
+[742, 71]
+[98,  45]
+[62,  12]
+[19,  174]
+
+
+Partition 2:
+[27,  94]
+[82,  23]
+[9,   86]
+[53,  71]
+[74,  37]
+[51,  49]
+[37,  98]
+
+```
+
+`minBy` operation can be applied on the above stream of tuples like below 
which results in emitting tuples with minimum values of `count` field in each 
partition.
+
+``` java
+  mystream.minBy(new Fields("count"))
 ```
+Result of the above code on mentioned partitions is:
+ 
+```
+Partition 0:
+[123, 2]
+
+
+Partition 1:
+[62,  12]
+
+
+Partition 2:
+[82,  23]
+
+```
+
+You can look at other `min` and `minBy` operations on Stream
+``` java
+      public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) 
+      public Stream min(Comparator<TridentTuple> comparator) 
+```
+Below example shows how these APIs can be used to find minimum using 
respective Comparators on a tuple. 
+
+``` java
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, 
Vehicle.generateVehicles(20));
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+                
+        Stream slowVehiclesStream =
+                vehiclesStream
+                        .min(new SpeedComparator()) // Comparator w.r.t speed 
on received tuple.
+                        .each(vehicleField, new Debug("#### slowest vehicle"));
+
+        vehiclesStream
+                .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // 
Comparator w.r.t efficiency on received tuple.
+                .each(vehicleField, new Debug("#### least efficient vehicle"));
+
+```
+Example applications of these APIs can be located at 
[TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java)
 and 
[TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java)
 
+
+### max and maxBy
+`max` and `maxBy` operations return maximum value on each partition of a batch 
of tuples in a trident stream.
+
+Suppose, a trident stream contains fields ["device-id", "count"] as mentioned 
in the above section.
+
+`max` and `maxBy` operations can be applied on the above stream of tuples like 
below which results in emitting tuples with maximum values of `count` field for 
each partition.
+
+``` java
+  mystream.maxBy(new Fields("count"))
+```
+Result of the above code on mentioned partitions is:
+ 
+```
+Partition 0:
+[113, 54]
+
+
+Partition 1:
+[19,  174]
+
+
+Partition 2:
+[37,  98]
+
+```
+
+You can look at other `max` and `maxBy` functions on Stream
+
+``` java
+
+      public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) 
+      public Stream max(Comparator<TridentTuple> comparator) 
+      
+```
+
+Below example shows how these APIs can be used to find maximum using 
respective Comparators on a tuple.
+
+``` java
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, 
Vehicle.generateVehicles(20));
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+
+        vehiclesStream
+                .max(new SpeedComparator()) // Comparator w.r.t speed on 
received tuple.
+                .each(vehicleField, new Debug("#### fastest vehicle"))
+                .project(driverField)
+                .each(driverField, new Debug("##### fastest driver"));
+        
+        vehiclesStream
+                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // 
Comparator w.r.t efficiency on received tuple.
+                .each(vehicleField, new Debug("#### most efficient vehicle"));
+
+```
+
+Example applications of these APIs can be located at 
[TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java)
 and 
[TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java)
 
 
 ### partitionAggregate
 
@@ -153,7 +367,7 @@ public class Count implements CombinerAggregator<Long> {
 }
 ```
 
-The benefits of CombinerAggregators are seen when you use the with the 
aggregate method instead of partitionAggregate. In that case, Trident 
automatically optimizes the computation by doing partial aggregations before 
transferring tuples over the network.
+The benefits of CombinerAggregators are seen when you use them with the 
aggregate method instead of partitionAggregate. In that case, Trident 
automatically optimizes the computation by doing partial aggregations before 
transferring tuples over the network.
 
 A ReducerAggregator has the following interface:
 
@@ -269,7 +483,7 @@ mystream.aggregate(new Count(), new Fields("count"))
 
 Like partitionAggregate, aggregators for aggregate can be chained. However, if 
you chain a CombinerAggregator with a non-CombinerAggregator, Trident is unable 
to do the partial aggregation optimization.
 
-You can read more about how to use persistentAggregate in the [Trident state 
doc](https://github.com/apache/incubator-storm/wiki/Trident-state).
+You can read more about how to use persistentAggregate in the [Trident state 
doc](Trident-state.html).
 
 ## Operations on grouped streams
 
@@ -277,7 +491,7 @@ The groupBy operation repartitions the stream by doing a 
partitionBy on the spec
 
 ![Grouping](images/grouping.png)
 
-If you run aggregators on a grouped stream, the aggregation will be run within 
each group instead of against the whole batch. persistentAggregate can also be 
run on a GroupedStream, in which case the results will be stored in a 
[MapState](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/MapState.java)
 with the key being the grouping fields. You can read more about 
persistentAggregate in the [Trident state doc](Trident-state.html).
+If you run aggregators on a grouped stream, the aggregation will be run within 
each group instead of against the whole batch. persistentAggregate can also be 
run on a GroupedStream, in which case the results will be stored in a 
[MapState]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/state/map/MapState.java)
 with the key being the grouping fields. You can read more about 
persistentAggregate in the [Trident state doc](Trident-state.html).
 
 Like regular streams, aggregators on grouped streams can be chained.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Trident-spouts.md
----------------------------------------------------------------------
diff --git a/docs/Trident-spouts.md b/docs/Trident-spouts.md
index 92330a7..ab7de89 100644
--- a/docs/Trident-spouts.md
+++ b/docs/Trident-spouts.md
@@ -1,5 +1,7 @@
 ---
+title: Trident Spouts
 layout: documentation
+documentation: true
 ---
 # Trident spouts
 
@@ -32,10 +34,10 @@ Even while processing multiple batches simultaneously, 
Trident will order any st
 
 Here are the following spout APIs available:
 
-1. 
[ITridentSpout](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java):
 The most general API that can support transactional or opaque transactional 
semantics. Generally you'll use one of the partitioned flavors of this API 
rather than this one directly.
-2. 
[IBatchSpout](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java):
 A non-transactional spout that emits batches of tuples at a time
-3. 
[IPartitionedTridentSpout](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java):
 A transactional spout that reads from a partitioned data source (like a 
cluster of Kafka servers)
-4. 
[IOpaquePartitionedTridentSpout](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java):
 An opaque transactional spout that reads from a partitioned data source
+1. 
[ITridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java):
 The most general API that can support transactional or opaque transactional 
semantics. Generally you'll use one of the partitioned flavors of this API 
rather than this one directly.
+2. 
[IBatchSpout]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java):
 A non-transactional spout that emits batches of tuples at a time
+3. 
[IPartitionedTridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java):
 A transactional spout that reads from a partitioned data source (like a 
cluster of Kafka servers)
+4. 
[IOpaquePartitionedTridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java):
 An opaque transactional spout that reads from a partitioned data source
 
 And, like mentioned in the beginning of this tutorial, you can use regular 
IRichSpout's as well.
  

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Trident-state.md
----------------------------------------------------------------------
diff --git a/docs/Trident-state.md b/docs/Trident-state.md
index 2ace8c8..af779e4 100644
--- a/docs/Trident-state.md
+++ b/docs/Trident-state.md
@@ -1,7 +1,8 @@
 ---
+title: Trident State
 layout: documentation
 ---
-# State in Trident
+
 
 Trident has first-class abstractions for reading from and writing to stateful 
sources. The state can either be internal to the topology – e.g., kept 
in-memory and backed by HDFS – or externally stored in a database like 
Memcached or Cassandra. There's no difference in the Trident API for either 
case.
 
@@ -27,7 +28,7 @@ Remember, Trident processes tuples as small batches with each 
batch being given
 2. There's no overlap between batches of tuples (tuples are in one batch or 
another, never multiple).
 3. Every tuple is in a batch (no tuples are skipped)
 
-This is a pretty easy type of spout to understand, the stream is divided into 
fixed batches that never change. storm-contrib has [an implementation of a 
transactional 
spout](https://github.com/nathanmarz/storm-contrib/blob/{{page.version}}/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java)
 for Kafka.
+This is a pretty easy type of spout to understand, the stream is divided into 
fixed batches that never change. storm-contrib has [an implementation of a 
transactional 
spout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java)
 for Kafka.
 
 You might be wondering – why wouldn't you just always use a transactional 
spout? They're simple and easy to understand. One reason you might not use one 
is because they're not necessarily very fault-tolerant. For example, the way 
TransactionalTridentKafkaSpout works is the batch for a txid will contain 
tuples from all the Kafka partitions for a topic. Once a batch has been 
emitted, any time that batch is re-emitted in the future the exact same set of 
tuples must be emitted to meet the semantics of transactional spouts. Now 
suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails 
to process, and at the same time one of the Kafka nodes goes down. You're now 
incapable of replaying the same batch as you did before (since the node is down 
and some partitions for the topic are not unavailable), and processing will 
halt. 
 
@@ -71,7 +72,7 @@ As described before, an opaque transactional spout cannot 
guarantee that the bat
 
 1. Every tuple is *successfully* processed in exactly one batch. However, it's 
possible for a tuple to fail to process in one batch and then succeed to 
process in a later batch.
 
-[OpaqueTridentKafkaSpout](https://github.com/nathanmarz/storm-contrib/blob/{{page.version}}/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java)
 is a spout that has this property and is fault-tolerant to losing Kafka nodes. 
Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples 
starting from where the last batch finished emitting. This ensures that no 
tuple is ever skipped or successfully processed by multiple batches.
+[OpaqueTridentKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java)
 is a spout that has this property and is fault-tolerant to losing Kafka nodes. 
Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples 
starting from where the last batch finished emitting. This ensures that no 
tuple is ever skipped or successfully processed by multiple batches.
 
 With opaque transactional spouts, it's no longer possible to use the trick of 
skipping state updates if the transaction id in the database is the same as the 
transaction id for the current batch. This is because the batch may have 
changed between state updates.
 
@@ -308,7 +309,7 @@ public interface Snapshottable<T> extends State {
 }
 ```
 
-[MemoryMapState](https://github.com/apache/incubator-storm/blob/{{page.version}}/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java)
 and 
[MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 each implement both of these interfaces.
+[MemoryMapState]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java)
 and 
[MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/{{page.version}}/src/jvm/trident/memcached/MemcachedState.java)
 each implement both of these interfaces.
 
 ## Implementing Map States
 
@@ -321,10 +322,10 @@ public interface IBackingMap<T> {
 }
 ```
 
-OpaqueMap's will call multiPut with 
[OpaqueValue](https://github.com/apache/incubator-storm/blob/{{page.version}}/storm-core/src/jvm/storm/trident/state/OpaqueValue.java)'s
 for the vals, TransactionalMap's will give 
[TransactionalValue](https://github.com/apache/incubator-storm/blob/{{page.version}}/storm-core/src/jvm/storm/trident/state/TransactionalValue.java)'s
 for the vals, and NonTransactionalMaps will just pass the objects from the 
topology through.
+OpaqueMap's will call multiPut with 
[OpaqueValue]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/state/OpaqueValue.java)'s
 for the vals, TransactionalMap's will give 
[TransactionalValue]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/state/TransactionalValue.java)'s
 for the vals, and NonTransactionalMaps will just pass the objects from the 
topology through.
 
-Trident also provides the 
[CachedMap](https://github.com/apache/incubator-storm/blob/{{page.version}}/storm-core/src/jvm/storm/trident/state/map/CachedMap.java)
 class to do automatic LRU caching of map key/vals.
+Trident also provides the 
[CachedMap]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/state/map/CachedMap.java)
 class to do automatic LRU caching of map key/vals.
 
-Finally, Trident provides the 
[SnapshottableMap](https://github.com/apache/incubator-storm/blob/{{page.version}}/storm-core/src/jvm/storm/trident/state/map/SnapshottableMap.java)
 class that turns a MapState into a Snapshottable object, by storing global 
aggregations into a fixed key.
+Finally, Trident provides the 
[SnapshottableMap]({{page.git-blob-base}}/storm-core/src/jvm/storm/trident/state/map/SnapshottableMap.java)
 class that turns a MapState into a Snapshottable object, by storing global 
aggregations into a fixed key.
 
 Take a look at the implementation of 
[MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 to see how all these utilities can be put together to make a high performance 
MapState implementation. MemcachedState allows you to choose between opaque 
transactional, transactional, and non-transactional semantics.

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Trident-tutorial.md
----------------------------------------------------------------------
diff --git a/docs/Trident-tutorial.md b/docs/Trident-tutorial.md
index 862dd8b..6ad9103 100644
--- a/docs/Trident-tutorial.md
+++ b/docs/Trident-tutorial.md
@@ -1,7 +1,8 @@
 ---
+title: Trident Tutorial
 layout: documentation
+documentation: true
 ---
-# Trident tutorial
 
 Trident is a high-level abstraction for doing realtime computing on top of 
Storm. It allows you to seamlessly intermix high throughput (millions of 
messages per second), stateful stream processing with low latency distributed 
querying. If you're familiar with high level batch processing tools like Pig or 
Cascading, the concepts of Trident will be very familiar – Trident has joins, 
aggregations, grouping, functions, and filters. In addition to these, Trident 
adds primitives for doing stateful, incremental processing on top of any 
database or persistence store. Trident has consistent, exactly-once semantics, 
so it is easy to reason about Trident topologies.
 
@@ -234,7 +235,7 @@ Trident solves this problem by doing two things:
 
 With these two primitives, you can achieve exactly-once semantics with your 
state updates. Rather than store just the count in the database, what you can 
do instead is store the transaction id with the count in the database as an 
atomic value. Then, when updating the count, you can just compare the 
transaction id in the database with the transaction id for the current batch. 
If they're the same, you skip the update – because of the strong ordering, 
you know for sure that the value in the database incorporates the current 
batch. If they're different, you increment the count.
 
-Of course, you don't have to do this logic manually in your topologies. This 
logic is wrapped by the State abstraction and done automatically. Nor is your 
State object required to implement the transaction id trick: if you don't want 
to pay the cost of storing the transaction id in the database, you don't have 
to. In that case the State will have at-least-once-processing semantics in the 
case of failures (which may be fine for your application). You can read more 
about how to implement a State and the various fault-tolerance tradeoffs 
possible [in this doc](Trident-state.html).
+Of course, you don't have to do this logic manually in your topologies. This 
logic is wrapped by the State abstraction and done automatically. Nor is your 
State object required to implement the transaction id trick: if you don't want 
to pay the cost of storing the transaction id in the database, you don't have 
to. In that case the State will have at-least-once-processing semantics in the 
case of failures (which may be fine for your application). You can read more 
about how to implement a State and the various fault-tolerance tradeoffs 
possible [in this doc](/documentation/Trident-state.html).
 
 A State is allowed to use whatever strategy it wants to store state. So it 
could store state in an external database or it could keep the state in-memory 
but backed by HDFS (like how HBase works). State's are not required to hold 
onto state forever. For example, you could have an in-memory State 
implementation that only keeps the last X hours of data available and drops 
anything older. Take a look at the implementation of the [Memcached 
integration](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java)
 for an example State implementation.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Troubleshooting.md
----------------------------------------------------------------------
diff --git a/docs/Troubleshooting.md b/docs/Troubleshooting.md
index c9df298..83a7d6b 100644
--- a/docs/Troubleshooting.md
+++ b/docs/Troubleshooting.md
@@ -1,7 +1,8 @@
 ---
+title: Troubleshooting
 layout: documentation
+documentation: true
 ---
-## Troubleshooting
 
 This page lists issues people have run into when using Storm along with their 
solutions.
 
@@ -139,6 +140,43 @@ Caused by: java.lang.NullPointerException
     ... 6 more
 ```
 
+or 
+
+```
+java.lang.RuntimeException: java.lang.NullPointerException
+        at
+backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.disruptor$consume_loop_STAR_$fn__759.invoke(disruptor.clj:94)
+~[storm-core-0.9.3.jar:0.9.3]
+        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
+~[storm-core-0.9.3.jar:0.9.3]
+        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
+        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
+Caused by: java.lang.NullPointerException: null
+        at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]
+        at
+backtype.storm.daemon.worker$mk_transfer_fn$fn__3548.invoke(worker.clj:129)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3282.invoke(executor.clj:258)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.disruptor$clojure_handler$reify__746.onEvent(disruptor.clj:58)
+~[storm-core-0.9.3.jar:0.9.3]
+        at
+backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
+~[storm-core-0.9.3.jar:0.9.3]
+        ... 6 common frames omitted
+```
+
 Solution:
 
  * This is caused by having multiple threads issue methods on the 
`OutputCollector`. All emits, acks, and fails must happen on the same thread. 
One subtle way this can happen is if you make a `IBasicBolt` that emits on a 
separate thread. `IBasicBolt`'s automatically ack after execute is called, so 
this would cause multiple threads to use the `OutputCollector` leading to this 
exception. When using a basic bolt, all emits must happen in the same thread 
that runs `execute`.

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Tutorial.md
----------------------------------------------------------------------
diff --git a/docs/Tutorial.md b/docs/Tutorial.md
index 73bf9a4..cc05504 100644
--- a/docs/Tutorial.md
+++ b/docs/Tutorial.md
@@ -1,11 +1,13 @@
 ---
+title: Tutorial
 layout: documentation
+documentation: true
 ---
 In this tutorial, you'll learn how to create Storm topologies and deploy them 
to a Storm cluster. Java will be the main language used, but a few examples 
will use Python to illustrate Storm's multi-language capabilities.
 
 ## Preliminaries
 
-This tutorial uses examples from the 
[storm-starter](http://github.com/nathanmarz/storm-starter) project. It's 
recommended that you clone the project and follow along with the examples. Read 
[Setting up a development environment](Setting-up-development-environment.html) 
and [Creating a new Storm project](Creating-a-new-Storm-project.html) to get 
your machine set up. 
+This tutorial uses examples from the 
[storm-starter]({{page.git-blob-base}}/examples/storm-starter) project. It's 
recommended that you clone the project and follow along with the examples. Read 
[Setting up a development environment](Setting-up-development-environment.html) 
and [Creating a new Storm project](Creating-a-new-Storm-project.html) to get 
your machine set up.
 
 ## Components of a Storm cluster
 
@@ -101,7 +103,7 @@ This topology contains a spout and two bolts. The spout 
emits words, and each bo
 
 This code defines the nodes using the `setSpout` and `setBolt` methods. These 
methods take as input a user-specified id, an object containing the processing 
logic, and the amount of parallelism you want for the node. In this example, 
the spout is given id "words" and the bolts are given ids "exclaim1" and 
"exclaim2". 
 
-The object containing the processing logic implements the 
[IRichSpout](javadocs/backtype/storm/topology/IRichSpout.html) interface for 
spouts and the [IRichBolt](javadocs/backtype/storm/topology/IRichBolt.html) 
interface for bolts. 
+The object containing the processing logic implements the 
[IRichSpout](javadocs/backtype/storm/topology/IRichSpout.html) interface for 
spouts and the [IRichBolt](javadocs/backtype/storm/topology/IRichBolt.html) 
interface for bolts.
 
 The last parameter, how much parallelism you want for the node, is optional. 
It indicates how many threads should execute that component across the cluster. 
If you omit it, Storm will only allocate one thread for that node.
 
@@ -137,23 +139,28 @@ As you can see, the implementation is very 
straightforward.
 public static class ExclamationBolt implements IRichBolt {
     OutputCollector _collector;
 
+    @Override
     public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
         _collector = collector;
     }
 
+    @Override
     public void execute(Tuple tuple) {
         _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
         _collector.ack(tuple);
     }
 
+    @Override
     public void cleanup() {
     }
 
+    @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word"));
     }
     
-    public Map getComponentConfiguration() {
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
         return null;
     }
 }
@@ -161,9 +168,9 @@ public static class ExclamationBolt implements IRichBolt {
 
 The `prepare` method provides the bolt with an `OutputCollector` that is used 
for emitting tuples from this bolt. Tuples can be emitted at anytime from the 
bolt -- in the `prepare`, `execute`, or `cleanup` methods, or even 
asynchronously in another thread. This `prepare` implementation simply saves 
the `OutputCollector` as an instance variable to be used later on in the 
`execute` method.
 
-The `execute` method receives a tuple from one of the bolt's inputs. The 
`ExclamationBolt` grabs the first field from the tuple and emits a new tuple 
with the string "!!!" appended to it. If you implement a bolt that subscribes 
to multiple input sources, you can find out which component the 
[Tuple](javadocs/backtype/storm/tuple/Tuple.html) came from by using the 
`Tuple#getSourceComponent` method.
+The `execute` method receives a tuple from one of the bolt's inputs. The 
`ExclamationBolt` grabs the first field from the tuple and emits a new tuple 
with the string "!!!" appended to it. If you implement a bolt that subscribes 
to multiple input sources, you can find out which component the 
[Tuple](/javadoc/apidocs/backtype/storm/tuple/Tuple.html) came from by using 
the `Tuple#getSourceComponent` method.
 
-There's a few other things going in in the `execute` method, namely that the 
input tuple is passed as the first argument to `emit` and the input tuple is 
acked on the final line. These are part of Storm's reliability API for 
guaranteeing no data loss and will be explained later in this tutorial. 
+There's a few other things going on in the `execute` method, namely that the 
input tuple is passed as the first argument to `emit` and the input tuple is 
acked on the final line. These are part of Storm's reliability API for 
guaranteeing no data loss and will be explained later in this tutorial. 
 
 The `cleanup` method is called when a Bolt is being shutdown and should 
cleanup any resources that were opened. There's no guarantee that this method 
will be called on the cluster: for example, if the machine the task is running 
on blows up, there's no way to invoke the method. The `cleanup` method is 
intended for when you run topologies in [local mode](Local-mode.html) (where a 
Storm cluster is simulated in process), and you want to be able to run and kill 
many topologies without suffering any resource leaks.
 
@@ -177,15 +184,18 @@ Methods like `cleanup` and `getComponentConfiguration` 
are often not needed in a
 public static class ExclamationBolt extends BaseRichBolt {
     OutputCollector _collector;
 
+    @Override
     public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
         _collector = collector;
     }
 
+    @Override
     public void execute(Tuple tuple) {
         _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
         _collector.ack(tuple);
     }
 
+    @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word"));
     }    
@@ -235,7 +245,7 @@ A stream grouping tells a topology how to send tuples 
between two components. Re
 
 When a task for Bolt A emits a tuple to Bolt B, which task should it send the 
tuple to?
 
-A "stream grouping" answers this question by telling Storm how to send tuples 
between sets of tasks. Before we dig into the different kinds of stream 
groupings, let's take a look at another topology from 
[storm-starter](http://github.com/nathanmarz/storm-starter). This 
[WordCountTopology](https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/WordCountTopology.java)
 reads sentences off of a spout and streams out of `WordCountBolt` the total 
number of times it has seen that word before:
+A "stream grouping" answers this question by telling Storm how to send tuples 
between sets of tasks. Before we dig into the different kinds of stream 
groupings, let's take a look at another topology from 
[storm-starter](http://github.com/apache/storm/blob/{{page.version}}/examples/storm-starter).
 This 
[WordCountTopology]({{page.git-blob-base}}/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java)
 reads sentences off of a spout and streams out of `WordCountBolt` the total 
number of times it has seen that word before:
 
 ```java
 TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Understanding-the-parallelism-of-a-Storm-topology.md
----------------------------------------------------------------------
diff --git a/docs/Understanding-the-parallelism-of-a-Storm-topology.md 
b/docs/Understanding-the-parallelism-of-a-Storm-topology.md
index adc4c41..21d74ef 100644
--- a/docs/Understanding-the-parallelism-of-a-Storm-topology.md
+++ b/docs/Understanding-the-parallelism-of-a-Storm-topology.md
@@ -1,7 +1,9 @@
 ---
+title: Understanding the Parallelism of a Storm Topology
 layout: documentation
+documentation: true
 ---
-# What makes a running topology: worker processes, executors and tasks
+## What makes a running topology: worker processes, executors and tasks
 
 Storm distinguishes between the following three main entities that are used to 
actually run a topology in a Storm cluster:
 
@@ -19,29 +21,29 @@ An _executor_ is a thread that is spawned by a worker 
process. It may run one or
 
 A _task_ performs the actual data processing — each spout or bolt that you 
implement in your code executes as many tasks across the cluster. The number of 
tasks for a component is always the same throughout the lifetime of a topology, 
but the number of executors (threads) for a component can change over time. 
This means that the following condition holds true: ``#threads ≤ #tasks``. By 
default, the number of tasks is set to be the same as the number of executors, 
i.e. Storm will run one task per thread.
 
-# Configuring the parallelism of a topology
+## Configuring the parallelism of a topology
 
 Note that in Storm’s terminology "parallelism" is specifically used to 
describe the so-called _parallelism hint_, which means the initial number of 
executor (threads) of a component. In this document though we use the term 
"parallelism" in a more general sense to describe how you can configure not 
only the number of executors but also the number of worker processes and the 
number of tasks of a Storm topology. We will specifically call out when 
"parallelism" is used in the normal, narrow definition of Storm.
 
 The following sections give an overview of the various configuration options 
and how to set them in your code. There is more than one way of setting these 
options though, and the table lists only some of them. Storm currently has the 
following [order of precedence for configuration settings](Configuration.html): 
``defaults.yaml`` < ``storm.yaml`` < topology-specific configuration < internal 
component-specific configuration < external component-specific configuration.
 
-## Number of worker processes
+### Number of worker processes
 
 * Description: How many worker processes to create _for the topology_ across 
machines in the cluster.
 * Configuration option: 
[TOPOLOGY_WORKERS](javadocs/backtype/storm/Config.html#TOPOLOGY_WORKERS)
 * How to set in your code (examples):
     * [Config#setNumWorkers](javadocs/backtype/storm/Config.html)
 
-## Number of executors (threads)
+### Number of executors (threads)
 
 * Description: How many executors to spawn _per component_.
-* Configuration option: ?
+* Configuration option: None (pass ``parallelism_hint`` parameter to 
``setSpout`` or ``setBolt``)
 * How to set in your code (examples):
     * 
[TopologyBuilder#setSpout()](javadocs/backtype/storm/topology/TopologyBuilder.html)
     * 
[TopologyBuilder#setBolt()](javadocs/backtype/storm/topology/TopologyBuilder.html)
     * Note that as of Storm 0.8 the ``parallelism_hint`` parameter now 
specifies the initial number of executors (not tasks!) for that bolt.
 
-## Number of tasks
+### Number of tasks
 
 * Description: How many tasks to create _per component_.
 * Configuration option: 
[TOPOLOGY_TASKS](javadocs/backtype/storm/Config.html#TOPOLOGY_TASKS)
@@ -54,12 +56,12 @@ Here is an example code snippet to show these settings in 
practice:
 ```java
 topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
                .setNumTasks(4)
-               .shuffleGrouping("blue-spout);
+               .shuffleGrouping("blue-spout");
 ```
 
 In the above code we configured Storm to run the bolt ``GreenBolt`` with an 
initial number of two executors and four associated tasks. Storm will run two 
tasks per executor (thread). If you do not explicitly configure the number of 
tasks, Storm will run by default one task per executor.
 
-# Example of a running topology
+## Example of a running topology
 
 The following illustration shows how a simple topology would look like in 
operation. The topology consists of three components: one spout called 
``BlueSpout`` and two bolts called ``GreenBolt`` and ``YellowBolt``. The 
components are linked such that ``BlueSpout`` sends its output to 
``GreenBolt``, which in turns sends its own output to ``YellowBolt``.
 
@@ -89,9 +91,9 @@ StormSubmitter.submitTopology(
 
 And of course Storm comes with additional configuration settings to control 
the parallelism of a topology, including:
 
-* 
[TOPOLOGY_MAX_TASK_PARALLELISM](javadocs/backtype/storm/Config.html#TOPOLOGY_MAX_TASK_PARALLELISM):
 This setting puts a ceiling on the number of executors that can be spawned for 
a single component. It is typically used during testing to limit the number of 
threads spawned when running a topology in local mode. You can set this option 
via e.g. [Config#setMaxTaskParallelism()](javadocs/backtype/storm/Config.html).
+* 
[TOPOLOGY_MAX_TASK_PARALLELISM](javadocs/backtype/storm/Config.html#TOPOLOGY_MAX_TASK_PARALLELISM):
 This setting puts a ceiling on the number of executors that can be spawned for 
a single component. It is typically used during testing to limit the number of 
threads spawned when running a topology in local mode. You can set this option 
via e.g. 
[Config#setMaxTaskParallelism()](javadocs/backtype/storm/Config.html#setMaxTaskParallelism(int)).
 
-# How to change the parallelism of a running topology
+## How to change the parallelism of a running topology
 
 A nifty feature of Storm is that you can increase or decrease the number of 
worker processes and/or executors without being required to restart the cluster 
or the topology. The act of doing so is called rebalancing.
 
@@ -103,14 +105,14 @@ You have two options to rebalance a topology:
 Here is an example of using the CLI tool:
 
 ```
-# Reconfigure the topology "mytopology" to use 5 worker processes,
-# the spout "blue-spout" to use 3 executors and
-# the bolt "yellow-bolt" to use 10 executors.
+## Reconfigure the topology "mytopology" to use 5 worker processes,
+## the spout "blue-spout" to use 3 executors and
+## the bolt "yellow-bolt" to use 10 executors.
 
 $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
 ```
 
-# References for this article
+## References
 
 * [Concepts](Concepts.html)
 * [Configuration](Configuration.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Using-non-JVM-languages-with-Storm.md
----------------------------------------------------------------------
diff --git a/docs/Using-non-JVM-languages-with-Storm.md 
b/docs/Using-non-JVM-languages-with-Storm.md
index 7b2a2f2..1b3ae45 100644
--- a/docs/Using-non-JVM-languages-with-Storm.md
+++ b/docs/Using-non-JVM-languages-with-Storm.md
@@ -1,4 +1,5 @@
 ---
+title: Using non JVM languages with Storm
 layout: documentation
 ---
 - two pieces: creating topologies and implementing spouts and bolts in other 
languages

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/flux.md
----------------------------------------------------------------------
diff --git a/docs/flux.md b/docs/flux.md
new file mode 100644
index 0000000..8f2b264
--- /dev/null
+++ b/docs/flux.md
@@ -0,0 +1,835 @@
+---
+title: Flux
+layout: documentation
+documentation: true
+---
+
+A framework for creating and deploying Apache Storm streaming computations 
with less friction.
+
+## Definition
+**flux** |fləks| _noun_
+
+1. The action or process of flowing or flowing out
+2. Continuous change
+3. In physics, the rate of flow of a fluid, radiant energy, or particles 
across a given area
+4. A substance mixed with a solid to lower its melting point
+
+## Rationale
+Bad things happen when configuration is hard-coded. No one should have to 
recompile or repackage an application in
+order to change configuration.
+
+## About
+Flux is a framework and set of utilities that make defining and deploying 
Apache Storm topologies less painful and
+deveoper-intensive.
+
+Have you ever found yourself repeating this pattern?:
+
+```java
+
+public static void main(String[] args) throws Exception {
+    // logic to determine if we're running locally or not...
+    // create necessary config options...
+    boolean runLocal = shouldRunLocal();
+    if(runLocal){
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(name, conf, topology);
+    } else {
+        StormSubmitter.submitTopology(name, conf, topology);
+    }
+}
+```
+
+Wouldn't something like this be easier:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
+```
+
+or:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
+```
+
+Another pain point often mentioned is the fact that the wiring for a Topology 
graph is often tied up in Java code,
+and that any changes require recompilation and repackaging of the topology jar 
file. Flux aims to alleviate that
+pain by allowing you to package all your Storm components in a single jar, and 
use an external text file to define
+the layout and configuration of your topologies.
+
+## Features
+
+ * Easily configure and deploy Storm topologies (Both Storm core and 
Microbatch API) without embedding configuration
+   in your topology code
+ * Support for existing topology code (see below)
+ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
+ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, 
storm-hbase, etc.)
+ * Convenient support for multi-lang components
+ * External property substitution/filtering for easily switching between 
configurations/environments (similar to Maven-style
+   `${variable.name}` substitution)
+
+## Usage
+
+To use Flux, add it as a dependency and package all your Storm components in a 
fat jar, then create a YAML document
+to define your topology (see below for YAML configuration options).
+
+### Building from Source
+The easiest way to use Flux, is to add it as a Maven dependency in you project 
as described below.
+
+If you would like to build Flux from source and run the unit/integration 
tests, you will need the following installed
+on your system:
+
+* Python 2.6.x or later
+* Node.js 0.10.x or later
+
+#### Building with unit tests enabled:
+
+```
+mvn clean install
+```
+
+#### Building with unit tests disabled:
+If you would like to build Flux without installing Python or Node.js you can 
simply skip the unit tests:
+
+```
+mvn clean install -DskipTests=true
+```
+
+Note that if you plan on using Flux to deploy topologies to a remote cluster, 
you will still need to have Python
+installed since it is required by Apache Storm.
+
+
+#### Building with integration tests enabled:
+
+```
+mvn clean install -DskipIntegration=false
+```
+
+
+### Packaging with Maven
+To enable Flux for your Storm components, you need to add it as a dependency 
such that it's included in the Storm
+topology jar. This can be accomplished with the Maven shade plugin (preferred) 
or the Maven assembly plugin (not
+recommended).
+
+#### Flux Maven Dependency
+The current version of Flux is available in Maven Central at the following 
coordinates:
+```xml
+<dependency>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>flux-core</artifactId>
+    <version>${storm.version}</version>
+</dependency>
+```
+
+#### Creating a Flux-Enabled Topology JAR
+The example below illustrates Flux usage with the Maven shade plugin:
+
+ ```xml
+<!-- include Flux and user dependencies in the shaded jar -->
+<dependencies>
+    <!-- Flux include -->
+    <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux-core</artifactId>
+        <version>${storm.version}</version>
+    </dependency>
+
+    <!-- add user dependencies here... -->
+
+</dependencies>
+<!-- create a fat jar that includes all dependencies -->
+<build>
+    <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>1.4</version>
+            <configuration>
+                <createDependencyReducedPom>true</createDependencyReducedPom>
+            </configuration>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <transformers>
+                            <transformer
+                                    
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            <transformer
+                                    
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                
<mainClass>org.apache.storm.flux.Flux</mainClass>
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+    </plugins>
+</build>
+ ```
+
+### Deploying and Running a Flux Topology
+Once your topology components are packaged with the Flux dependency, you can 
run different topologies either locally
+or remotely using the `storm jar` command. For example, if your fat jar is 
named `myTopology-0.1.0-SNAPSHOT.jar` you
+could run it locally with the command:
+
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local 
my_config.yaml
+
+```
+
+### Command line options
+```
+usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
+             [options] <topology-config.yaml>
+ -d,--dry-run                 Do not run or deploy the topology. Just
+                              build, validate, and print information about
+                              the topology.
+ -e,--env-filter              Perform environment variable substitution.
+                              Replace keys identified with `${ENV-[NAME]}`
+                              will be replaced with the corresponding
+                              `NAME` environment value
+ -f,--filter <file>           Perform property substitution. Use the
+                              specified file as a source of properties,
+                              and replace keys identified with {$[property
+                              name]} with the value defined in the
+                              properties file.
+ -i,--inactive                Deploy the topology, but do not activate it.
+ -l,--local                   Run the topology in local mode.
+ -n,--no-splash               Suppress the printing of the splash screen.
+ -q,--no-detail               Suppress the printing of topology details.
+ -r,--remote                  Deploy the topology to a remote cluster.
+ -R,--resource                Treat the supplied path as a classpath
+                              resource instead of a file.
+ -s,--sleep <ms>              When running locally, the amount of time to
+                              sleep (in ms.) before killing the topology
+                              and shutting down the local cluster.
+ -z,--zookeeper <host:port>   When running in local mode, use the
+                              ZooKeeper at the specified <host>:<port>
+                              instead of the in-process ZooKeeper.
+                              (requires Storm 0.9.3 or later)
+```
+
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` 
command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a 
topology configuration property. The following
+example command will run Flux and override the `nimbus.seeds` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote 
my_config.yaml -c 'nimbus.seeds=["localhost"]'
+```
+
+### Sample output
+```
+███████╗██╗     ██╗   ██╗██╗  ██╗
+██╔════╝██║     ██║   
██║╚██╗██╔╝
+█████╗  ██║     ██║   ██║ ╚███╔╝
+██╔══╝  ██║     ██║   ██║ ██╔██╗
+██║     ███████╗╚██████╔╝██╔╝ 
██╗
+╚═╝     ╚══════╝ ╚═════╝ ╚═╝  
╚═╝
++-         Apache Storm        -+
++-  data FLow User eXperience  -+
+Version: 0.3.0
+Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
+---------- TOPOLOGY DETAILS ----------
+Name: shell-topology
+--------------- SPOUTS ---------------
+sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout)
+---------------- BOLTS ---------------
+splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt)
+log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
+count[1](backtype.storm.testing.TestWordCounter)
+--------------- STREAMS ---------------
+sentence-spout --SHUFFLE--> splitsentence
+splitsentence --FIELDS--> count
+count --SHUFFLE--> log
+--------------------------------------
+Submitting topology: 'shell-topology' to remote cluster...
+```
+
+## YAML Configuration
+Flux topologies are defined in a YAML file that describes a topology. A Flux 
topology
+definition consists of the following:
+
+  1. A topology name
+  2. A list of topology "components" (named Java objects that will be made 
available in the environment)
+  3. **EITHER** (A DSL topology definition):
+      * A list of spouts, each identified by a unique ID
+      * A list of bolts, each identified by a unique ID
+      * A list of "stream" objects representing a flow of tuples between 
spouts and bolts
+  4. **OR** (A JVM class that can produce a 
`backtype.storm.generated.StormTopology` instance:
+      * A `topologySource` definition.
+
+
+
+For example, here is a simple definition of a wordcount topology using the 
YAML DSL:
+
+```yaml
+name: "yaml-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "backtype.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+  - id: "bolt-2"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, 
etc.)
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE
+
+
+```
+## Property Substitution/Filtering
+It's common for developers to want to easily switch between configurations, 
for example switching deployment between
+a development environment and a production environment. This can be 
accomplished by using separate YAML configuration
+files, but that approach would lead to unnecessary duplication, especially in 
situations where the Storm topology
+does not change, but configuration settings such as host names, ports, and 
parallelism paramters do.
+
+For this case, Flux offers properties filtering to allow you two externalize 
values to a `.properties` file and have
+them substituted before the `.yaml` file is parsed.
+
+To enable property filtering, use the `--filter` command line option and 
specify a `.properties` file. For example,
+if you invoked flux like so:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local 
my_config.yaml --filter dev.properties
+```
+With the following `dev.properties` file:
+
+```properties
+kafka.zookeeper.hosts: localhost:2181
+```
+
+You would then be able to reference those properties by key in your `.yaml` 
file using `${}` syntax:
+
+```yaml
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "${kafka.zookeeper.hosts}"
+```
+
+In this case, Flux would replace `${kafka.zookeeper.hosts}` with 
`localhost:2181` before parsing the YAML contents.
+
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an 
environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
+
+## Components
+Components are essentially named object instances that are made available as 
configuration options for spouts and
+bolts. If you are familiar with the Spring framework, components are roughly 
analagous to Spring beans.
+
+Every component is identified, at a minimum, by a unique identifier (String) 
and a class name (String). For example,
+the following will make an instance of the `storm.kafka.StringScheme` class 
available as a reference under the key
+`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default 
constructor.
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+```
+
+### Contructor Arguments, References, Properties and Configuration Methods
+
+####Constructor Arguments
+Arguments to a class constructor can be configured by adding a 
`contructorArgs` element to a components.
+`constructorArgs` is a list of objects that will be passed to the class' 
constructor. The following example creates an
+object by calling the constructor that takes a single string as an argument:
+
+```yaml
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+```
+
+####References
+Each component instance is identified by a unique id that allows it to be 
used/reused by other components. To
+reference an existing component, you specify the id of the component with the 
`ref` tag.
+
+In the following example, a component with the id `"stringScheme"` is created, 
and later referenced, as a an argument
+to another component's constructor:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme" # component with id "stringScheme" must be 
declared above.
+```
+**N.B.:** References can only be used after (below) the object they point to 
has been declared.
+
+####Properties
+In addition to calling constructors with different arguments, Flux also allows 
you to configure components using
+JavaBean-like setter methods and fields declared as `public`:
+
+```yaml
+  - id: "spoutConfig"
+    className: "storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "forceFromStart"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+```
+
+In the example above, the `properties` declaration will cause Flux to look for 
a public method in the `SpoutConfig` with
+the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a 
setter method is not found, Flux will then
+look for a public instance variable with the name `forceFromStart` and attempt 
to set its value.
+
+References may also be used as property values.
+
+####Configuration Methods
+Conceptually, configuration methods are similar to Properties and Constructor 
Args -- they allow you to invoke an
+arbitrary method on an object after it is constructed. Configuration methods 
are useful for working with classes that
+don't expose JavaBean methods or have constructors that can fully configure 
the object. Common examples include classes
+that use the builder pattern for configuration/composition.
+
+The following YAML example creates a bolt and configures it by calling several 
methods:
+
+```yaml
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.flux.test.TestBolt"
+    parallelism: 1
+    configMethods:
+      - name: "withFoo"
+        args:
+          - "foo"
+      - name: "withBar"
+        args:
+          - "bar"
+      - name: "withFooBar"
+        args:
+          - "foo"
+          - "bar"
+```
+
+The signatures of the corresponding methods are as follows:
+
+```java
+    public void withFoo(String foo);
+    public void withBar(String bar);
+    public void withFooBar(String foo, String bar);
+```
+
+Arguments passed to configuration methods work much the same way as 
constructor arguments, and support references as
+well.
+
+### Using Java `enum`s in Contructor Arguments, References, Properties and 
Configuration Methods
+You can easily use Java `enum` values as arguments in a Flux YAML file, simply 
by referencing the name of the `enum`.
+
+For example, [Storm's HDFS module]() includes the following `enum` definition 
(simplified for brevity):
+
+```java
+public static enum Units {
+    KB, MB, GB, TB
+}
+```
+
+And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has 
the following constructor:
+
+```java
+public FileSizeRotationPolicy(float count, Units units)
+
+```
+The following Flux `component` definition could be used to call the 
constructor:
+
+```yaml
+  - id: "rotationPolicy"
+    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
+    constructorArgs:
+      - 5.0
+      - MB
+```
+
+The above definition is functionally equivalent to the following Java code:
+
+```java
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+## Topology Config
+The `config` section is simply a map of Storm topology configuration 
parameters that will be passed to the
+`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` 
class:
+
+```yaml
+config:
+  topology.workers: 4
+  topology.max.spout.pending: 1000
+  topology.message.timeout.secs: 30
+```
+
+# Existing Topologies
+If you have existing Storm topologies, you can still use Flux to 
deploy/run/test them. This feature allows you to
+leverage Flux Constructor Arguments, References, Properties, and Topology 
Config declarations for existing topology
+classes.
+
+The easiest way to use an existing topology class is to define
+a `getTopology()` instance method with one of the following signatures:
+
+```java
+public StormTopology getTopology(Map<String, Object> config)
+```
+or:
+
+```java
+public StormTopology getTopology(Config config)
+```
+
+You could then use the following YAML to configure your topology:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+```
+
+If the class you would like to use as a topology source has a different method 
name (i.e. not `getTopology`), you can
+override it:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+  methodName: "getTopologyWithDifferentMethodName"
+```
+
+__N.B.:__ The specified method must accept a single argument of type 
`java.util.Map<String, Object>` or
+`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` 
object.
+
+# YAML DSL
+## Spouts and Bolts
+Spout and Bolts are configured in their own respective section of the YAML 
configuration. Spout and Bolt definitions
+are extensions to the `component` definition that add a `parallelism` 
parameter that sets the parallelism  for a
+component when the topology is deployed.
+
+Because spout and bolt definitions extend `component` they support constructor 
arguments, references, and properties as
+well.
+
+Shell spout example:
+
+```yaml
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.spouts.GenericShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+```
+
+Kafka spout example:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme"
+
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+
+# Alternative kafka config
+#  - id: "kafkaConfig"
+#    className: "storm.kafka.KafkaConfig"
+#    constructorArgs:
+#      # brokerHosts
+#      - ref: "zkHosts"
+#      # topic
+#      - "myKafkaTopic"
+#      # clientId (optional)
+#      - "myKafkaClientId"
+
+  - id: "spoutConfig"
+    className: "storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "forceFromStart"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "kafka-spout"
+    className: "storm.kafka.KafkaSpout"
+    constructorArgs:
+      - ref: "spoutConfig"
+
+```
+
+Bolt Examples:
+
+```yaml
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.bolts.GenericShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+    # ...
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+    # ...
+
+  - id: "count"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+    # ...
+```
+## Streams and Stream Groupings
+Streams in Flux are represented as a list of connections (Graph edges, data 
flow, etc.) between the Spouts and Bolts in
+a topology, with an associated Grouping definition.
+
+A Stream definition has the following properties:
+
+**`name`:** A name for the connection (optional, currently unused)
+
+**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
+
+**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
+
+**`grouping`:** The stream grouping definition for the Stream
+
+A Grouping definition has the following properties:
+
+**`type`:** The type of grouping. One of 
`ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or 
`NONE`.
+
+**`streamId`:** The Storm stream ID (Optional. If unspecified will use the 
default stream)
+
+**`args`:** For the `FIELDS` grouping, a list of field names.
+
+**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping 
class instance
+
+The `streams` definition example below sets up a topology with the following 
wiring:
+
+```
+    kafka-spout --> splitsentence --> count --> log
+```
+
+
+```yaml
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "kafka --> split" # name isn't used (placeholder for logging, UI, 
etc.)
+    from: "kafka-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+### Custom Stream Groupings
+Custom stream groupings are defined by setting the grouping type to `CUSTOM` 
and defining a `customClass` parameter
+that tells Flux how to instantiate the custom class. The `customClass` 
definition extends `component`, so it supports
+constructor arguments, references, and properties as well.
+
+The example below creates a Stream with an instance of the 
`backtype.storm.testing.NGrouping` custom stream grouping
+class.
+
+```yaml
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: CUSTOM
+      customClass:
+        className: "backtype.storm.testing.NGrouping"
+        constructorArgs:
+          - 1
+```
+
+## Includes and Overrides
+Flux allows you to include the contents of other YAML files, and have them 
treated as though they were defined in the
+same file. Includes may be either files, or classpath resources.
+
+Includes are specified as a list of maps:
+
+```yaml
+includes:
+  - resource: false
+    file: "src/test/resources/configs/shell_test.yaml"
+    override: false
+```
+
+If the `resource` property is set to `true`, the include will be loaded as a 
classpath resource from the value of the
+`file` attribute, otherwise it will be treated as a regular file.
+
+The `override` property controls how includes affect the values defined in the 
current file. If `override` is set to
+`true`, values in the included file will replace values in the current file 
being parsed. If `override` is set to
+`false`, values in the current file being parsed will take precedence, and the 
parser will refuse to replace them.
+
+**N.B.:** Includes are not yet recursive. Includes from included files will be 
ignored.
+
+
+## Basic Word Count Example
+
+This example uses a spout implemented in JavaScript, a bolt implemented in 
Python, and a bolt implemented in Java
+
+Topology YAML config:
+
+```yaml
+---
+name: "shell-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.spouts.GenericShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.bolts.GenericShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "count"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "spout --> split" # name isn't used (placeholder for logging, UI, 
etc.)
+    from: "sentence-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+
+## Micro-Batching (Trident) API Support
+Currenty, the Flux YAML DSL only supports the Core Storm API, but support for 
Storm's micro-batching API is planned.
+
+To use Flux with a Trident topology, define a topology getter method and 
reference it in your YAML config:
+
+```yaml
+name: "my-trident-topology"
+
+config:
+  topology.workers: 1
+
+topologySource:
+  className: "org.apache.storm.flux.test.TridentTopologySource"
+  # Flux will look for "getTopology", this will override that.
+  methodName: "getTopologyWithDifferentMethodName"
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/images/topology.png
----------------------------------------------------------------------
diff --git a/docs/images/topology.png b/docs/images/topology.png
new file mode 100644
index 0000000..a45c25c
Binary files /dev/null and b/docs/images/topology.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 55bf828..6a83b7a 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -1,69 +1,65 @@
 ---
+title: Documentation
 layout: documentation
+documentation: true
 ---
-Storm is a distributed realtime computation system. Similar to how Hadoop 
provides a set of general primitives for doing batch processing, Storm provides 
a set of general primitives for doing realtime computation. Storm is simple, 
can be used with any programming language, [is used by many 
companies](/documentation/Powered-By.html), and is a lot of fun to use!
+### Basics of Storm
 
-### Read these first
-
-* [Rationale](Rationale.html)
-* [Tutorial](Tutorial.html)
-* [Setting up development environment](Setting-up-development-environment.html)
-* [Creating a new Storm project](Creating-a-new-Storm-project.html)
-
-### Documentation
-
-* [Manual](Documentation.html)
 * [Javadoc](javadocs/index.html)
-* [FAQ](FAQ.html)
-* [SECURITY](SECURITY.html)
+* [Concepts](Concepts.html)
+* [Configuration](Configuration.html)
+* [Guaranteeing message processing](Guaranteeing-message-processing.html)
+* [Daemon Fault Tolerance](Daemon-Fault-Tolerance.html)
+* [Command line client](Command-line-client.html)
 * [REST API](STORM-UI-REST-API.html)
+* [Understanding the parallelism of a Storm 
topology](Understanding-the-parallelism-of-a-Storm-topology.html)
+* [FAQ](FAQ.html)
 
-### Getting help
-
-__NOTE:__ The google groups account [email protected] is now 
officially deprecated in favor of the Apache-hosted user/dev mailing lists.
-
-#### Storm Users
-Storm users should send messages and subscribe to 
[[email protected]](mailto:[email protected]).
-
-You can subscribe to this list by sending an email to 
[[email protected]](mailto:[email protected]). 
Likewise, you can cancel a subscription by sending an email to 
[[email protected]](mailto:[email protected]).
-
-You can view the archives of the mailing list 
[here](http://mail-archives.apache.org/mod_mbox/storm-user/).
-
-#### Storm Developers
-Storm developers should send messages and subscribe to 
[[email protected]](mailto:[email protected]).
-
-You can subscribe to this list by sending an email to 
[[email protected]](mailto:[email protected]). 
Likewise, you can cancel a subscription by sending an email to 
[[email protected]](mailto:[email protected]).
-
-You can view the archives of the mailing list 
[here](http://mail-archives.apache.org/mod_mbox/storm-dev/).
-
-#### Which list should I send/subscribe to?
-If you are using a pre-built binary distribution of Storm, then chances are 
you should send questions, comments, storm-related announcements, etc. to 
[[email protected]]([email protected]). 
-
-If you are building storm from source, developing new features, or otherwise 
hacking storm source code, then [[email protected]]([email protected]) 
is more appropriate. 
-
-#### What will happen with [email protected]?
-All existing messages will remain archived there, and can be accessed/searched 
[here](https://groups.google.com/forum/#!forum/storm-user).
-
-New messages sent to [email protected] will either be 
rejected/bounced or replied to with a message to direct the email to the 
appropriate Apache-hosted group.
-
-#### IRC
-You can also come to the #storm-user room on [freenode](http://freenode.net/). 
You can usually find a Storm developer there to help you out.
-
-
-
-### Related projects
-
-* [storm-contrib](https://github.com/nathanmarz/storm-contrib)
-* [storm-deploy](http://github.com/nathanmarz/storm-deploy): One click deploys 
for Storm clusters on AWS
-* [Spout implementations](Spout-implementations.html)
-* [DSLs and multilang adapters](DSLs-and-multilang-adapters.html)
-* [Serializers](Serializers.html)
-
-### Contributing to Storm
-
-* [Contributing to Storm](Contributing-to-Storm.html)
-* [Project ideas](Project-ideas.html)
-
-### Powered by Storm
+### Trident
+
+Trident is an alternative interface to Storm. It provides exactly-once 
processing, "transactional" datastore persistence, and a set of common stream 
analytics operations.
+
+* [Trident Tutorial](Trident-tutorial.html)     -- basic concepts and 
walkthrough
+* [Trident API Overview](Trident-API-Overview.html) -- operations for 
transforming and orchestrating data
+* [Trident State](Trident-state.html)        -- exactly-once processing and 
fast, persistent aggregation
+* [Trident spouts](Trident-spouts.html)       -- transactional and 
non-transactional data intake
+
+### Setup and deploying
+
+* [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html)
+* [Local mode](Local-mode.html)
+* [Troubleshooting](Troubleshooting.html)
+* [Running topologies on a production 
cluster](Running-topologies-on-a-production-cluster.html)
+* [Building Storm](Maven.html) with Maven
+* [Setting up a Secure Cluster](SECURITY.html)
+
+### Intermediate
+
+* [Serialization](Serialization.html)
+* [Common patterns](Common-patterns.html)
+* [Clojure DSL](Clojure-DSL.html)
+* [Using non-JVM languages with Storm](Using-non-JVM-languages-with-Storm.html)
+* [Distributed RPC](Distributed-RPC.html)
+* [Transactional topologies](Transactional-topologies.html)
+* [Direct groupings](Direct-groupings.html)
+* [Hooks](Hooks.html)
+* [Metrics](Metrics.html)
+* [Lifecycle of a trident tuple]()
+
+### Integration With External Systems, and Other Libraries
+* [Flux Data Driven Topology Builder](flux.html)
+* [Event Hubs Intergration](storm-eventhubs.html)
+* [Apache HBase Integration](storm-hbase.html)
+* [Apache HDFS Integration](storm-hdfs.html)
+* [Apache Hive Integration](storm-hive.html)
+* [JDBC Integration](storm-jdbc.html)
+* [Apache Kafka Integration](storm-kafka.html)
+* [REDIS Integration](storm-redis.html) 
+* [Kestrel and Storm](Kestrel-and-Storm.html)
+
+### Advanced
+
+* [Defining a non-JVM language DSL for 
Storm](Defining-a-non-jvm-language-dsl-for-storm.html)
+* [Multilang protocol](Multilang-protocol.html) (how to provide support for 
another language)
+* [Implementation docs](Implementation-docs.html)
 
-[Companies and projects powered by Storm](Powered-By.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/storm-eventhubs.md b/docs/storm-eventhubs.md
new file mode 100644
index 0000000..4af8c43
--- /dev/null
+++ b/docs/storm-eventhubs.md
@@ -0,0 +1,40 @@
+---
+title: Azue Event Hubs Integration
+layout: documentation
+documentation: true
+---
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+       mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+       eventhubspout.username = [username: policy name in EventHubs Portal]
+       eventhubspout.password = [password: shared access key in EventHubs 
Portal]
+       eventhubspout.namespace = [namespace]
+       eventhubspout.entitypath = [entitypath]
+       eventhubspout.partitions.count = [partitioncount]
+
+       # if not provided, will use storm's zookeeper settings
+       # 
zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+       eventhubspout.checkpoint.interval = 10
+       eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+       storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount 
{topologyname} {spoutconffile}
+       where the {jarfile} should be: 
eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can 
run the client like this:
+       java -cp 
.\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar 
com.microsoft.eventhubs.client.EventHubSendClient
+       [username] [password] [entityPath] [partitionId] [messageSize] 
[messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+       http://azure.microsoft.com/en-us/services/event-hubs/
+

http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
new file mode 100644
index 0000000..7f4fb62
--- /dev/null
+++ b/docs/storm-hbase.md
@@ -0,0 +1,241 @@
+---
+title: Storm HBase Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [Apache HBase](https://hbase.apache.org)
+
+## Usage
+The main API for interacting with HBase is the 
`org.apache.storm.hbase.bolt.mapper.HBaseMapper`
+interface:
+
+```java
+public interface HBaseMapper extends Serializable {
+    byte[] rowKey(Tuple tuple);
+
+    ColumnList columns(Tuple tuple);
+}
+```
+
+The `rowKey()` method is straightforward: given a Storm tuple, return a byte 
array representing the
+row key.
+
+The `columns()` method defines what will be written to an HBase row. The 
`ColumnList` class allows you
+to add both standard HBase columns as well as HBase counter columns.
+
+To add a standard column, use one of the `addColumn()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addColumn(this.columnFamily, field.getBytes(), 
toBytes(tuple.getValueByField(field)));
+```
+
+To add a counter column, use one of the `addCounter()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addCounter(this.columnFamily, field.getBytes(), 
toLong(tuple.getValueByField(field)));
+```
+
+When the remote HBase is security enabled, a kerberos keytab and the 
corresponding principal name need to be
+provided for the storm-hbase connector. Specifically, the Config object passed 
into the topology should contain
+{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", 
“$principal”)}. Example:
+
+```java
+Config config = new Config();
+...
+config.put("storm.keytab.file", "$keytab");
+config.put("storm.kerberos.principal", "$principle");
+StormSubmitter.submitTopology("$topologyName", config, 
builder.createTopology());
+```
+
+##Working with Secure HBASE using delegation tokens.
+If your topology is going to interact with secure HBase, your bolts/states 
needs to be authenticated by HBase. 
+The approach described above requires that all potential worker hosts have 
"storm.keytab.file" on them. If you have 
+multiple topologies on a cluster , each with different hbase user, you will 
have to create multiple keytabs and distribute
+it to all workers. Instead of doing that you could use the following approach:
+
+Your administrator can configure nimbus to automatically get delegation tokens 
on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : 
["org.apache.storm.hbase.security.AutoHBase"] 
+nimbus.credential.renewers.classes : 
["org.apache.storm.hbase.security.AutoHBase"] 
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase 
super user that can impersonate other users.)
+hbase.kerberos.principal: "[email protected]"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default 
expire every 7 days and can not be renewed, 
+if you have custom settings for hbase.auth.token.max.lifetime in 
hbase-site.xml than you should ensure this value is 
+atleast 1 hour less then that.)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+If nimbus did not have the above configuration you need to add it and then 
restart it. Ensure the hbase configuration 
+files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar 
with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to 
authenticate with HBase. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and 
acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with 
topology.auto-credentials set to AutoHBase, nimbus will push the
+delegation tokens to all the workers for your topology and the hbase 
bolt/state will authenticate with these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the 
user specified in storm.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this 
you need to follow configuration directions 
+listed on this link
+
+http://hbase.apache.org/book/security.html#security.rest.gateway
+
+You can read about setting up secure HBase 
here:http://hbase.apache.org/book/security.html.
+
+### SimpleHBaseMapper
+`storm-hbase` includes a general purpose `HBaseMapper` implementation called 
`SimpleHBaseMapper` that can map Storm
+tuples to both regular HBase columns as well as counter columns.
+
+To use `SimpleHBaseMapper`, you simply tell it which fields to map to which 
types of columns.
+
+The following code create a `SimpleHBaseMapper` instance that:
+
+1. Uses the `word` tuple value as a row key.
+2. Adds a standard HBase column for the tuple field `word`.
+3. Adds an HBase counter column for the tuple field `count`.
+4. Writes values to the `cf` column family.
+
+```java
+SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
+        .withRowKeyField("word")
+        .withColumnFields(new Fields("word"))
+        .withCounterFields(new Fields("count"))
+        .withColumnFamily("cf");
+```
+### HBaseBolt
+To use the `HBaseBolt`, construct it with the name of the table to write to, 
an a `HBaseMapper` implementation:
+
+ ```java
+HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ ```
+
+The `HBaseBolt` will delegate to the `mapper` instance to figure out how to 
persist tuple data to HBase.
+
+###HBaseValueMapper
+This class allows you to transform the HBase lookup result into storm Values 
that will be emitted by the `HBaseLookupBolt`.
+
+```java
+public interface HBaseValueMapper extends Serializable {
+    public List<Values> toTuples(Result result) throws Exception;
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+The `toTuples` method takes in a HBase `Result` instance and expects a List of 
`Values` instant. 
+Each of the value returned by this function will be emitted by the 
`HBaseLookupBolt`.
+
+The `declareOutputFields` should be used to declare the outputFields of the 
`HBaseLookupBolt`.
+
+There is an example implementation in `src/test/java` directory.
+
+###HBaseProjectionCriteria
+This class allows you to specify the projection criteria for your HBase Get 
function. This is optional parameter
+for the lookupBolt and if you do not specify this instance all the columns 
will be returned by `HBaseLookupBolt`.
+
+```java
+public class HBaseProjectionCriteria implements Serializable {
+    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
+    public HBaseProjectionCriteria addColumn(ColumnMetaData column);
+```    
+`addColumnFamily` takes in columnFamily. Setting this parameter means all 
columns for this family will be included
+ in the projection.
+ 
+`addColumn` takes in a columnMetaData instance. Setting this parameter means 
only this column from the column familty 
+ will be part of your projection.
+The following code creates a projectionCriteria which specifies a projection 
criteria that:
+
+1. includes count column from column family cf.
+2. includes all columns from column family cf2.
+
+```java
+HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
+    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
+    .addColumnFamily("cf2");
+```
+
+###HBaseLookupBolt
+To use the `HBaseLookupBolt`, Construct it with the name of the table to write 
to, an implementation of `HBaseMapper` 
+and an implementation of `HBaseRowToStormValueMapper`. You can optionally 
specify a `HBaseProjectionCriteria`. 
+
+The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will 
use the `HBaseProjectionCriteria` to 
+figure out which columns to include in the result and it will leverage the 
`HBaseRowToStormValueMapper` to get the 
+values to be emitted by the bolt.
+
+You can look at an example topology LookupWordCount.java under `src/test/java`.
+## Example: Persistent Word Count
+A runnable example can be found in the `src/test/java` directory.
+
+### Setup
+The following steps assume you are running HBase locally, or there is an 
`hbase-site.xml` on the
+classpath pointing to your HBase cluster.
+
+Use the `hbase shell` command to create the schema:
+
+```
+> create 'WordCount', 'cf'
+```
+
+### Execution
+Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will 
run the topology for 10 seconds, then exit).
+
+After (or while) the word count topology is running, run the 
`org.apache.storm.hbase.topology.WordCountClient` class
+to view the counter values stored in HBase. You should see something like to 
following:
+
+```
+Word: 'apple', Count: 6867
+Word: 'orange', Count: 6645
+Word: 'pineapple', Count: 6954
+Word: 'banana', Count: 6787
+Word: 'watermelon', Count: 6806
+```
+
+For reference, the sample topology is listed below:
+
+```java
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new 
Fields("word"));
+
+
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(10000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[0], config, 
builder.createTopology());
+        }
+    }
+}
+```
+

Reply via email to