Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Spout-implementations.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Spout-implementations.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Spout-implementations.md 
(added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Spout-implementations.md 
Wed Mar 16 21:01:12 2016
@@ -0,0 +1,10 @@
+---
+title: Spout Implementations
+layout: documentation
+documentation: true
+---
+* [storm-kestrel](https://github.com/nathanmarz/storm-kestrel): Adapter to use 
Kestrel as a spout
+* [storm-amqp-spout](https://github.com/rapportive-oss/storm-amqp-spout): 
Adapter to use AMQP source as a spout
+* [storm-jms](https://github.com/ptgoetz/storm-jms): Adapter to use a JMS 
source as a spout
+* [storm-redis-pubsub](https://github.com/sorenmacbeth/storm-redis-pubsub): A 
spout that subscribes to a Redis pubsub stream
+* 
[storm-beanstalkd-spout](https://github.com/haitaoyao/storm-beanstalkd-spout): 
A spout that subscribes to a beanstalkd queue
\ No newline at end of file

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Storm-multi-language-protocol-(versions-0.7.0-and-below).md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Storm-multi-language-protocol-%28versions-0.7.0-and-below%29.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Storm-multi-language-protocol-(versions-0.7.0-and-below).md
 (added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Storm-multi-language-protocol-(versions-0.7.0-and-below).md
 Wed Mar 16 21:01:12 2016
@@ -0,0 +1,124 @@
+---
+title: Storm Multi-Lang Protocol (Versions 0.7.0 and below)
+layout: documentation
+documentation: true
+---
+This page explains the multilang protocol for versions 0.7.0 and below. The 
protocol changed in version 0.7.1.
+
+# Storm Multi-Language Protocol
+
+## The ShellBolt
+
+Support for multiple languages is implemented via the ShellBolt class.  This
+class implements the IBolt interfaces and implements the protocol for
+executing a script or program via the shell using Java's ProcessBuilder class.
+
+## Output fields
+
+Output fields are part of the Thrift definition of the topology. This means 
that when you multilang in Java, you need to create a bolt that extends 
ShellBolt, implements IRichBolt, and declared the fields in 
`declareOutputFields`. 
+You can learn more about this on [Concepts](Concepts.html)
+
+## Protocol Preamble
+
+A simple protocol is implemented via the STDIN and STDOUT of the executed
+script or program. A mix of simple strings and JSON encoded data are exchanged
+with the process making support possible for pretty much any language.
+
+# Packaging Your Stuff
+
+To run a ShellBolt on a cluster, the scripts that are shelled out to must be
+in the `resources/` directory within the jar submitted to the master.
+
+However, During development or testing on a local machine, the resources
+directory just needs to be on the classpath.
+
+## The Protocol
+
+Notes:
+* Both ends of this protocol use a line-reading mechanism, so be sure to
+trim off newlines from the input and to append them to your output.
+* All JSON inputs and outputs are terminated by a single line contained "end".
+* The bullet points below are written from the perspective of the script 
writer's
+STDIN and STDOUT.
+
+
+* Your script will be executed by the Bolt.
+* STDIN: A string representing a path. This is a PID directory.
+Your script should create an empty file named with it's pid in this directory. 
e.g.
+the PID is 1234, so an empty file named 1234 is created in the directory. This
+file lets the supervisor know the PID so it can shutdown the process later on.
+* STDOUT: Your PID. This is not JSON encoded, just a string. ShellBolt will 
log the PID to its log.
+* STDIN: (JSON) The Storm configuration.  Various settings and properties.
+* STDIN: (JSON) The Topology context
+* The rest happens in a while(true) loop
+* STDIN: A tuple! This is a JSON encoded structure like this:
+
+```
+{
+    // The tuple's id
+       "id": -6955786537413359385,
+       // The id of the component that created this tuple
+       "comp": 1,
+       // The id of the stream this tuple was emitted to
+       "stream": 1,
+       // The id of the task that created this tuple
+       "task": 9,
+       // All the values in this tuple
+       "tuple": ["snow white and the seven dwarfs", "field2", 3]
+}
+```
+
+* STDOUT: The results of your bolt, JSON encoded. This can be a sequence of 
acks, fails, emits, and/or logs. Emits look like:
+
+```
+{
+       "command": "emit",
+       // The ids of the tuples this output tuples should be anchored to
+       "anchors": [1231231, -234234234],
+       // The id of the stream this tuple was emitted to. Leave this empty to 
emit to default stream.
+       "stream": 1,
+       // If doing an emit direct, indicate the task to sent the tuple to
+       "task": 9,
+       // All the values in this tuple
+       "tuple": ["field1", 2, 3]
+}
+```
+
+An ack looks like:
+
+```
+{
+       "command": "ack",
+       // the id of the tuple to ack
+       "id": 123123
+}
+```
+
+A fail looks like:
+
+```
+{
+       "command": "fail",
+       // the id of the tuple to fail
+       "id": 123123
+}
+```
+
+A "log" will log a message in the worker log. It looks like:
+
+```
+{
+       "command": "log",
+       // the message to log
+       "msg": "hello world!"
+
+}
+```
+
+* STDOUT: emit "sync" as a single line by itself when the bolt has finished 
emitting/acking/failing and is ready for the next input
+
+### sync
+
+Note: This command is not JSON encoded, it is sent as a simple string.
+
+This lets the parent bolt know that the script has finished processing and is 
ready for another tuple.
\ No newline at end of file

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Structure-of-the-codebase.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Structure-of-the-codebase.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Structure-of-the-codebase.md
 (added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Structure-of-the-codebase.md
 Wed Mar 16 21:01:12 2016
@@ -0,0 +1,142 @@
+---
+title: Structure of the Codebase
+layout: documentation
+documentation: true
+---
+There are three distinct layers to Storm's codebase.
+
+First, Storm was designed from the very beginning to be compatible with 
multiple languages. Nimbus is a Thrift service and topologies are defined as 
Thrift structures. The usage of Thrift allows Storm to be used from any 
language.
+
+Second, all of Storm's interfaces are specified as Java interfaces. So even 
though there's a lot of Clojure in Storm's implementation, all usage must go 
through the Java API. This means that every feature of Storm is always 
available via Java.
+
+Third, Storm's implementation is largely in Clojure. Line-wise, Storm is about 
half Java code, half Clojure code. But Clojure is much more expressive, so in 
reality the great majority of the implementation logic is in Clojure. 
+
+The following sections explain each of these layers in more detail.
+
+### storm.thrift
+
+The first place to look to understand the structure of Storm's codebase is the 
[storm.thrift](https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift)
 file.
+
+Storm uses [this fork](https://github.com/nathanmarz/thrift/tree/storm) of 
Thrift (branch 'storm') to produce the generated code. This "fork" is actually 
Thrift 7 with all the Java packages renamed to be `org.apache.thrift7`. 
Otherwise, it's identical to Thrift 7. This fork was done because of the lack 
of backwards compatibility in Thrift and the need for many people to use other 
versions of Thrift in their Storm topologies.
+
+Every spout or bolt in a topology is given a user-specified identifier called 
the "component id". The component id is used to specify subscriptions from a 
bolt to the output streams of other spouts or bolts. A 
[StormTopology](https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift#L91)
 structure contains a map from component id to component for each type of 
component (spouts and bolts).
+
+Spouts and bolts have the same Thrift definition, so let's just take a look at 
the [Thrift definition for 
bolts](https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift#L79).
 It contains a `ComponentObject` struct and a `ComponentCommon` struct.
+
+The `ComponentObject` defines the implementation for the bolt. It can be one 
of three types:
+
+1. A serialized java object (that implements 
[IBolt](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/task/IBolt.java))
+2. A `ShellComponent` object that indicates the implementation is in another 
language. Specifying a bolt this way will cause Storm to instantiate a 
[ShellBolt](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/task/ShellBolt.java)
 object to handle the communication between the JVM-based worker process and 
the non-JVM-based implementation of the component.
+3. A `JavaObject` structure which tells Storm the classname and constructor 
arguments to use to instantiate that bolt. This is useful if you want to define 
a topology in a non-JVM language. This way, you can make use of JVM-based 
spouts and bolts without having to create and serialize a Java object yourself.
+
+`ComponentCommon` defines everything else for this component. This includes:
+
+1. What streams this component emits and the metadata for each stream (whether 
it's a direct stream, the fields declaration)
+2. What streams this component consumes (specified as a map from 
component_id:stream_id to the stream grouping to use)
+3. The parallelism for this component
+4. The component-specific 
[configuration](https://github.com/apache/storm/wiki/Configuration) for this 
component
+
+Note that the structure spouts also have a `ComponentCommon` field, and so 
spouts can also have declarations to consume other input streams. Yet the Storm 
Java API does not provide a way for spouts to consume other streams, and if you 
put any input declarations there for a spout you would get an error when you 
tried to submit the topology. The reason that spouts have an input declarations 
field is not for users to use, but for Storm itself to use. Storm adds implicit 
streams and bolts to the topology to set up the [acking 
framework](https://github.com/apache/storm/wiki/Guaranteeing-message-processing),
 and two of these implicit streams are from the acker bolt to each spout in the 
topology. The acker sends "ack" or "fail" messages along these streams whenever 
a tuple tree is detected to be completed or failed. The code that transforms 
the user's topology into the runtime topology is located 
[here](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/co
 mmon.clj#L279).
+
+### Java interfaces
+
+The interfaces for Storm are generally specified as Java interfaces. The main 
interfaces are:
+
+1. [IRichBolt](/javadoc/apidocs/backtype/storm/topology/IRichBolt.html)
+2. [IRichSpout](/javadoc/apidocs/backtype/storm/topology/IRichSpout.html)
+3. 
[TopologyBuilder](/javadoc/apidocs/backtype/storm/topology/TopologyBuilder.html)
+
+The strategy for the majority of the interfaces is to:
+
+1. Specify the interface using a Java interface
+2. Provide a base class that provides default implementations when appropriate
+
+You can see this strategy at work with the 
[BaseRichSpout](/javadoc/apidocs/backtype/storm/topology/base/BaseRichSpout.html)
 class.
+
+Spouts and bolts are serialized into the Thrift definition of the topology as 
described above. 
+
+One subtle aspect of the interfaces is the difference between `IBolt` and 
`ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is 
the addition of the `declareOutputFields` method in the "Rich" versions of the 
interfaces. The reason for the split is that the output fields declaration for 
each output stream needs to be part of the Thrift struct (so it can be 
specified from any language), but as a user you want to be able to declare the 
streams as part of your class. What `TopologyBuilder` does when constructing 
the Thrift representation is call `declareOutputFields` to get the declaration 
and convert it into the Thrift structure. The conversion happens [at this 
portion](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java#L205)
 of the `TopologyBuilder` code.
+
+
+### Implementation
+
+Specifying all the functionality via Java interfaces ensures that every 
feature of Storm is available via Java. Moreso, the focus on Java interfaces 
ensures that the user experience from Java-land is pleasant as well.
+
+The implementation of Storm, on the other hand, is primarily in Clojure. While 
the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the 
implementation logic is in Clojure. There are two notable exceptions to this, 
and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) 
and [transactional 
topologies](https://github.com/apache/storm/wiki/Transactional-topologies) 
implementations. These are implemented purely in Java. This was done to serve 
as an illustration for how to implement a higher level abstraction on Storm. 
The DRPC and transactional topologies implementations are in the 
[backtype.storm.coordination](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/coordination),
 
[backtype.storm.drpc](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/drpc),
 and 
[backtype.storm.transactional](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/transactional)
 packages.
+
+Here's a summary of the purpose of the main Java packages and Clojure 
namespace:
+
+#### Java packages
+
+[backtype.storm.coordination](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/coordination):
 Implements the pieces required to coordinate batch-processing on top of Storm, 
which both DRPC and transactional topologies use. `CoordinatedBolt` is the most 
important class here.
+
+[backtype.storm.drpc](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/drpc):
 Implementation of the DRPC higher level abstraction
+
+[backtype.storm.generated](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/generated):
 The generated Thrift code for Storm (generated using [this 
fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the 
packages to org.apache.thrift7 to avoid conflicts with other Thrift versions)
+
+[backtype.storm.grouping](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/grouping):
 Contains interface for making custom stream groupings
+
+[backtype.storm.hooks](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/hooks):
 Interfaces for hooking into various events in Storm, such as when tasks emit 
tuples, when tuples are acked, etc. User guide for hooks is 
[here](https://github.com/apache/storm/wiki/Hooks).
+
+[backtype.storm.serialization](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/serialization):
 Implementation of how Storm serializes/deserializes tuples. Built on top of 
[Kryo](http://code.google.com/p/kryo/).
+
+[backtype.storm.spout](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/spout):
 Definition of spout and associated interfaces (like the 
`SpoutOutputCollector`). Also contains `ShellSpout` which implements the 
protocol for defining spouts in non-JVM languages.
+
+[backtype.storm.task](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/task):
 Definition of bolt and associated interfaces (like `OutputCollector`). Also 
contains `ShellBolt` which implements the protocol for defining bolts in 
non-JVM languages. Finally, `TopologyContext` is defined here as well, which is 
provided to spouts and bolts so they can get data about the topology and its 
execution at runtime.
+
+[backtype.storm.testing](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/testing):
 Contains a variety of test bolts and utilities used in Storm's unit tests.
+
+[backtype.storm.topology](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/topology):
 Java layer over the underlying Thrift structure to provide a clean, pure-Java 
API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here 
as well as the helpful base classes for the different spouts and bolts. The 
slightly-higher level `IBasicBolt` interface is here, which is a simpler way to 
write certain kinds of bolts.
+
+[backtype.storm.transactional](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/transactional):
 Implementation of transactional topologies.
+
+[backtype.storm.tuple](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/tuple):
 Implementation of Storm's tuple data model.
+
+[backtype.storm.utils](https://github.com/apache/storm/tree/master/storm-core/src/jvm/backtype/storm/tuple):
 Data structures and miscellaneous utilities used throughout the codebase.
+
+
+#### Clojure namespaces
+
+[backtype.storm.bootstrap](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/bootstrap.clj):
 Contains a helpful macro to import all the classes and namespaces that are 
used throughout the codebase.
+
+[backtype.storm.clojure](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/clojure.clj):
 Implementation of the Clojure DSL for Storm.
+
+[backtype.storm.cluster](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/cluster.clj):
 All Zookeeper logic used in Storm daemons is encapsulated in this file. This 
code manages how cluster state (like what tasks are running where, what 
spout/bolt each task runs as) is mapped to the Zookeeper "filesystem" API.
+
+[backtype.storm.command.*](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/command):
 These namespaces implement various commands for the `storm` command line 
client. These implementations are very short.
+
+[backtype.storm.config](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/config.clj):
 Implementation of config reading/parsing code for Clojure. Also has utility 
functions for determining what local path nimbus/supervisor/daemons should be 
using for various things. e.g. the `master-inbox` function will return the 
local path that Nimbus should use when jars are uploaded to it.
+
+[backtype.storm.daemon.acker](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/acker.clj):
 Implementation of the "acker" bolt, which is a key part of how Storm 
guarantees data processing.
+
+[backtype.storm.daemon.common](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/common.clj):
 Implementation of common functions used in Storm daemons, like getting the id 
for a topology based on the name, mapping a user's topology into the one that 
actually executes (with implicit acking streams and acker bolt added - see 
`system-topology!` function), and definitions for the various heartbeat and 
other structures persisted by Storm.
+
+[backtype.storm.daemon.drpc](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/drpc.clj):
 Implementation of the DRPC server for use with DRPC topologies.
+
+[backtype.storm.daemon.nimbus](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/nimbus.clj):
 Implementation of Nimbus.
+
+[backtype.storm.daemon.supervisor](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/supervisor.clj):
 Implementation of Supervisor.
+
+[backtype.storm.daemon.task](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/task.clj):
 Implementation of an individual task for a spout or bolt. Handles message 
routing, serialization, stats collection for the UI, as well as the 
spout-specific and bolt-specific execution implementations.
+
+[backtype.storm.daemon.worker](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/worker.clj):
 Implementation of a worker process (which will contain many tasks within). 
Implements message transferring and task launching.
+
+[backtype.storm.event](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/event.clj):
 Implements a simple asynchronous function executor. Used in various places in 
Nimbus and Supervisor to make functions execute in serial to avoid any race 
conditions.
+
+[backtype.storm.log](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/log.clj):
 Defines the functions used to log messages to log4j.
+
+[backtype.storm.messaging.*](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/messaging):
 Defines a higher level interface to implementing point to point messaging. In 
local mode Storm uses in-memory Java queues to do this; on a cluster, it uses 
ZeroMQ. The generic interface is defined in protocol.clj.
+
+[backtype.storm.stats](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/stats.clj):
 Implementation of stats rollup routines used when sending stats to ZK for use 
by the UI. Does things like windowed and rolling aggregations at multiple 
granularities.
+
+[backtype.storm.testing](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/testing.clj):
 Implementation of facilities used to test Storm topologies. Includes time 
simulation, `complete-topology` for running a fixed set of tuples through a 
topology and capturing the output, tracker topologies for having fine grained 
control over detecting when a cluster is "idle", and other utilities.
+
+[backtype.storm.thrift](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/thrift.clj):
 Clojure wrappers around the generated Thrift API to make working with Thrift 
structures more pleasant.
+
+[backtype.storm.timer](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/timer.clj):
 Implementation of a background timer to execute functions in the future or on 
a recurring interval. Storm couldn't use the 
[Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) 
class because it needed integration with time simulation in order to be able to 
unit test Nimbus and the Supervisor.
+
+[backtype.storm.ui.*](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/ui):
 Implementation of Storm UI. Completely independent from rest of code base and 
uses the Nimbus Thrift API to get data.
+
+[backtype.storm.util](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/util.clj):
 Contains generic utility functions used throughout the code base.
+ 
+[backtype.storm.zookeeper](https://github.com/apache/storm/blob/master/storm-core/src/clj/backtype/storm/zookeeper.clj):
 Clojure wrapper around the Zookeeper API and implements some "high-level" 
stuff like "mkdirs" and "delete-recursive".
\ No newline at end of file

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Support-for-non-java-languages.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Support-for-non-java-languages.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Support-for-non-java-languages.md
 (added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Support-for-non-java-languages.md
 Wed Mar 16 21:01:12 2016
@@ -0,0 +1,9 @@
+---
+title: Support for Non-Java Languages
+layout: documentation
+documentation: true
+---
+* [Scala DSL](https://github.com/velvia/ScalaStorm)
+* [JRuby DSL](https://github.com/colinsurprenant/storm-jruby)
+* [Clojure DSL](Clojure-DSL.html)
+* [io-storm](https://github.com/gphat/io-storm): Perl multilang adapter

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Transactional-topologies.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Transactional-topologies.md?rev=1735297&view=auto
==============================================================================
--- 
storm/branches/bobby-versioned-site/releases/0.10.0/Transactional-topologies.md 
(added)
+++ 
storm/branches/bobby-versioned-site/releases/0.10.0/Transactional-topologies.md 
Wed Mar 16 21:01:12 2016
@@ -0,0 +1,361 @@
+---
+title: Transactional Topologies
+layout: documentation
+documentation: true
+---
+**NOTE**: Transactional topologies have been deprecated -- use the 
[Trident](Trident-tutorial.html) framework instead.
+
+__________________________________________________________________________
+
+Storm [guarantees data processing](Guaranteeing-message-processing.html) by 
providing an at least once processing guarantee. The most common question asked 
about Storm is "Given that tuples can be replayed, how do you do things like 
counting on top of Storm? Won't you overcount?"
+
+Storm 0.7.0 introduces transactional topologies, which enable you to get 
exactly once messaging semantics for pretty much any computation. So you can do 
things like counting in a fully-accurate, scalable, and fault-tolerant way.
+
+Like [Distributed RPC](Distributed-RPC.html), transactional topologies aren't 
so much a feature of Storm as they are a higher level abstraction built on top 
of Storm's primitives of streams, spouts, bolts, and topologies.
+
+This page explains the transactional topology abstraction, how to use the API, 
and provides details as to its implementation.
+
+## Concepts
+
+Let's build up to Storm's abstraction for transactional topologies one step at 
a time. Let's start by looking at the simplest possible approach, and then 
we'll iterate on the design until we reach Storm's design.
+
+### Design 1
+
+The core idea behind transactional topologies is to provide a _strong 
ordering_ on the processing of data. The simplest manifestation of this, and 
the first design we'll look at, is processing the tuples one at a time and not 
moving on to the next tuple until the current tuple has been successfully 
processed by the topology.
+
+Each tuple is associated with a transaction id. If the tuple fails and needs 
to be replayed, then it is emitted with the exact same transaction id. A 
transaction id is an integer that increments for every tuple, so the first 
tuple will have transaction id `1`, the second id `2`, and so on.
+
+The strong ordering of tuples gives you the capability to achieve exactly-once 
semantics even in the case of tuple replay. Let's look at an example of how you 
would do this.
+
+Suppose you want to do a global count of the tuples in the stream. Instead of 
storing just the count in the database, you instead store the count and the 
latest transaction id together as one value in the database. When your code 
updates the count in the db, it should update the count *only if the 
transaction id in the database differs from the transaction id for the tuple 
currently being processed*. Consider the two cases:
+
+1. *The transaction id in the database is different than the current 
transaction id:* Because of the strong ordering of transactions, we know for 
sure that the current tuple isn't represented in that count. So we can safely 
increment the count and update the transaction id.
+2. *The transaction id is the same as the current transaction id:* Then we 
know that this tuple is already incorporated into the count and can skip the 
update. The tuple must have failed after updating the database but before 
reporting success back to Storm.
+
+This logic and the strong ordering of transactions ensures that the count in 
the database will be accurate even if tuples are replayed.  Credit for this 
trick of storing a transaction id in the database along with the value goes to 
the Kafka devs, particularly [this design 
document](http://incubator.apache.org/kafka/07/design.html).
+
+Furthermore, notice that the topology can safely update many sources of state 
in the same transaction and achieve exactly-once semantics. If there's a 
failure, any updates that already succeeded will skip on the retry, and any 
updates that failed will properly retry. For example, if you were processing a 
stream of tweeted urls, you could update a database that stores a tweet count 
for each url as well as a database that stores a tweet count for each domain.
+
+There is a significant problem though with this design of processing one tuple 
at time. Having to wait for each tuple to be _completely processed_ before 
moving on to the next one is horribly inefficient. It entails a huge amount of 
database calls (at least one per tuple), and this design makes very little use 
of the parallelization capabilities of Storm. So it isn't very scalable.
+
+### Design 2
+
+Instead of processing one tuple at a time, a better approach is to process a 
batch of tuples for each transaction. So if you're doing a global count, you 
would increment the count by the number of tuples in the entire batch. If a 
batch fails, you replay the exact batch that failed. Instead of assigning a 
transaction id to each tuple, you assign a transaction id to each batch, and 
the processing of the batches is strongly ordered. Here's a diagram of this 
design:
+
+![Storm cluster](images/transactional-batches.png)
+
+So if you're processing 1000 tuples per batch, your application will do 1000x 
less database operations than design 1. Additionally, it takes advantage of 
Storm's parallelization capabilities as the computation for each batch can be 
parallelized.
+
+While this design is significantly better than design 1, it's still not as 
resource-efficient as possible. The workers in the topology spend a lot of time 
being idle waiting for the other portions of the computation to finish. For 
example, in a topology like this:
+
+![Storm cluster](images/transactional-design-2.png)
+
+After bolt 1 finishes its portion of the processing, it will be idle until the 
rest of the bolts finish and the next batch can be emitted from the spout.
+
+### Design 3 (Storm's design)
+
+A key realization is that not all the work for processing batches of tuples 
needs to be strongly ordered. For example, when computing a global count, 
there's two parts to the computation:
+
+1. Computing the partial count for the batch
+2. Updating the global count in the database with the partial count
+
+The computation of #2 needs to be strongly ordered across the batches, but 
there's no reason you shouldn't be able to _pipeline_ the computation of the 
batches by computing #1 for many batches in parallel. So while batch 1 is 
working on updating the database, batches 2 through 10 can compute their 
partial counts.
+
+Storm accomplishes this distinction by breaking the computation of a batch 
into two phases:
+
+1. The processing phase: this is the phase that can be done in parallel for 
many batches
+2. The commit phase: The commit phases for batches are strongly ordered. So 
the commit for batch 2 is not done until the commit for batch 1 has been 
successful.
+
+The two phases together are called a "transaction". Many batches can be in the 
processing phase at a given moment, but only one batch can be in the commit 
phase. If there's any failure in the processing or commit phase for a batch, 
the entire transaction is replayed (both phases).
+
+## Design details
+
+When using transactional topologies, Storm does the following for you:
+
+1. *Manages state:* Storm stores in Zookeeper all the state necessary to do 
transactional topologies. This includes the current transaction id as well as 
the metadata defining the parameters for each batch.
+2. *Coordinates the transactions:* Storm will manage everything necessary to 
determine which transactions should be processing or committing at any point.
+3. *Fault detection:* Storm leverages the acking framework to efficiently 
determine when a batch has successfully processed, successfully committed, or 
failed. Storm will then replay batches appropriately. You don't have to do any 
acking or anchoring -- Storm manages all of this for you.
+4. *First class batch processing API*: Storm layers an API on top of regular 
bolts to allow for batch processing of tuples. Storm manages all the 
coordination for determining when a task has received all the tuples for that 
particular transaction. Storm will also take care of cleaning up any 
accumulated state for each transaction (like the partial counts).
+
+Finally, another thing to note is that transactional topologies require a 
source queue that can replay an exact batch of messages. Technologies like 
[Kestrel](https://github.com/robey/kestrel) can't do this. [Apache 
Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this 
kind of spout, and 
[storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka) 
contains a transactional spout implementation for Kafka.
+
+## The basics through example
+
+You build transactional topologies by using 
[TransactionalTopologyBuilder](/javadoc/apidocs/backtype/storm/transactional/TransactionalTopologyBuilder.html).
 Here's the transactional topology definition for a topology that computes the 
global count of tuples from the input stream. This code comes from 
[TransactionalGlobalCount](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java)
 in storm-starter.
+
+```java
+MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new 
Fields("word"), PARTITION_TAKE_PER_BATCH);
+TransactionalTopologyBuilder builder = new 
TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+builder.setBolt("partial-count", new BatchCount(), 5)
+        .shuffleGrouping("spout");
+builder.setBolt("sum", new UpdateGlobalCount())
+        .globalGrouping("partial-count");
+```
+
+`TransactionalTopologyBuilder` takes as input in the constructor an id for the 
transactional topology, an id for the spout within the topology, a 
transactional spout, and optionally the parallelism for the transactional 
spout. The id for the transactional topology is used to store state about the 
progress of topology in Zookeeper, so that if you restart the topology it will 
continue where it left off.
+
+A transactional topology has a single `TransactionalSpout` that is defined in 
the constructor of `TransactionalTopologyBuilder`. In this example, 
`MemoryTransactionalSpout` is used which reads in data from an in-memory 
partitioned source of data (the `DATA` variable). The second argument defines 
the fields for the data, and the third argument specifies the maximum number of 
tuples to emit from each partition per batch of tuples. The interface for 
defining your own transactional spouts is discussed later on in this tutorial.
+
+Now on to the bolts. This topology parallelizes the computation of the global 
count. The first bolt, `BatchCount`, randomly partitions the input stream using 
a shuffle grouping and emits the count for each partition. The second bolt, 
`UpdateGlobalCount`, does a global grouping and sums together the partial 
counts to get the count for the batch. It then updates the global count in the 
database if necessary.
+
+Here's the definition of `BatchCount`:
+
+```java
+public static class BatchCount extends BaseBatchBolt {
+    Object _id;
+    BatchOutputCollector _collector;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, 
BatchOutputCollector collector, Object id) {
+        _collector = collector;
+        _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        _count++;
+    }
+
+    @Override
+    public void finishBatch() {
+        _collector.emit(new Values(_id, _count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("id", "count"));
+    }
+}
+```
+
+A new instance of this object is created for every batch that's being 
processed. The actual bolt this runs within is called 
[BatchBoltExecutor](https://github.com/apache/storm/blob/0.7.0/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java)
 and manages the creation and cleanup for these objects.
+
+The `prepare` method parameterizes this batch bolt with the Storm config, the 
topology context, an output collector, and the id for this batch of tuples. In 
the case of transactional topologies, the id will be a 
[TransactionAttempt](/javadoc/apidocs/backtype/storm/transactional/TransactionAttempt.html)
 object. The batch bolt abstraction can be used in Distributed RPC as well 
which uses a different type of id for the batches. `BatchBolt` can actually be 
parameterized with the type of the id, so if you only intend to use the batch 
bolt for transactional topologies, you can extend `BaseTransactionalBolt` which 
has this definition:
+
+```java
+public abstract class BaseTransactionalBolt extends 
BaseBatchBolt<TransactionAttempt> {
+}
+```
+
+All tuples emitted within a transactional topology must have the 
`TransactionAttempt` as the first field of the tuple. This lets Storm identify 
which tuples belong to which batches. So when you emit tuples you need to make 
sure to meet this requirement.
+
+The `TransactionAttempt` contains two values: the "transaction id" and the 
"attempt id". The "transaction id" is the unique id chosen for this batch and 
is the same no matter how many times the batch is replayed. The "attempt id" is 
a unique id for this particular batch of tuples and lets Storm distinguish 
tuples from different emissions of the same batch. Without the attempt id, 
Storm could confuse a replay of a batch with tuples from a prior time that 
batch was emitted. This would be disastrous.
+
+The transaction id increases by 1 for every batch emitted. So the first batch 
has id "1", the second has id "2", and so on.
+
+The `execute` method is called for every tuple in the batch. You should 
accumulate state for the batch in a local instance variable every time this 
method is called. The `BatchCount` bolt increments a local counter variable for 
every tuple.
+
+Finally, `finishBatch` is called when the task has received all tuples 
intended for it for this particular batch. `BatchCount` emits the partial count 
to the output stream when this method is called.
+
+Here's the definition of `UpdateGlobalCount`:
+
+```java
+public static class UpdateGlobalCount extends BaseTransactionalBolt implements 
ICommitter {
+    TransactionAttempt _attempt;
+    BatchOutputCollector _collector;
+
+    int _sum = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, 
BatchOutputCollector collector, TransactionAttempt attempt) {
+        _collector = collector;
+        _attempt = attempt;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        _sum+=tuple.getInteger(1);
+    }
+
+    @Override
+    public void finishBatch() {
+        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+        Value newval;
+        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
+            newval = new Value();
+            newval.txid = _attempt.getTransactionId();
+            if(val==null) {
+                newval.count = _sum;
+            } else {
+                newval.count = _sum + val.count;
+            }
+            DATABASE.put(GLOBAL_COUNT_KEY, newval);
+        } else {
+            newval = val;
+        }
+        _collector.emit(new Values(_attempt, newval.count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("id", "sum"));
+    }
+}
+```
+
+`UpdateGlobalCount` is specific to transactional topologies so it extends 
`BaseTransactionalBolt`. In the `execute` method, `UpdateGlobalCount` 
accumulates the count for this batch by summing together the partial batches. 
The interesting stuff happens in `finishBatch`.
+
+First, notice that this bolt implements the `ICommitter` interface. This tells 
Storm that the `finishBatch` method of this bolt should be part of the commit 
phase of the transaction. So calls to `finishBatch` for this bolt will be 
strongly ordered by transaction id (calls to `execute` on the other hand can 
happen during either the processing or commit phases). An alternative way to 
mark a bolt as a committer is to use the `setCommitterBolt` method in 
`TransactionalTopologyBuilder` instead of `setBolt`.
+
+The code for `finishBatch` in `UpdateGlobalCount` gets the current value from 
the database and compares its transaction id to the transaction id for this 
batch. If they are the same, it does nothing. Otherwise, it increments the 
value in the database by the partial count for this batch.
+
+A more involved transactional topology example that updates multiple databases 
idempotently can be found in storm-starter in the 
[TransactionalWords](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java)
 class.
+
+## Transactional Topology API
+
+This section outlines the different pieces of the transactional topology API.
+
+### Bolts
+
+There are three kinds of bolts possible in a transactional topology:
+
+1. 
[BasicBolt](/javadoc/apidocs/backtype/storm/topology/base/BaseBasicBolt.html): 
This bolt doesn't deal with batches of tuples and just emits tuples based on a 
single tuple of input.
+2. 
[BatchBolt](/javadoc/apidocs/backtype/storm/topology/base/BaseBatchBolt.html): 
This bolt processes batches of tuples. `execute` is called for each tuple, and 
`finishBatch` is called when the batch is complete.
+3. BatchBolt's that are marked as committers: The only difference between this 
bolt and a regular batch bolt is when `finishBatch` is called. A committer bolt 
has `finishedBatch` called during the commit phase. The commit phase is 
guaranteed to occur only after all prior batches have successfully committed, 
and it will be retried until all bolts in the topology succeed the commit for 
the batch. There are two ways to make a `BatchBolt` a committer, by having the 
`BatchBolt` implement the 
[ICommitter](/javadoc/apidocs/backtype/storm/transactional/ICommitter.html) 
marker interface, or by using the `setCommiterBolt` method in 
`TransactionalTopologyBuilder`.
+
+#### Processing phase vs. commit phase in bolts
+
+To nail down the difference between the processing phase and commit phase of a 
transaction, let's look at an example topology:
+
+![Storm cluster](images/transactional-commit-flow.png)
+
+In this topology, only the bolts with a red outline are committers.
+
+During the processing phase, bolt A will process the complete batch from the 
spout, call `finishBatch` and send its tuples to bolts B and C. Bolt B is a 
committer so it will process all the tuples but finishBatch won't be called. 
Bolt C also will not have `finishBatch` called because it doesn't know if it 
has received all the tuples from Bolt B yet (because Bolt B is waiting for the 
transaction to commit). Finally, Bolt D will receive any tuples Bolt C emitted 
during invocations of its `execute` method.
+
+When the batch commits, `finishBatch` is called on Bolt B. Once it finishes, 
Bolt C can now detect that it has received all the tuples and will call 
`finishBatch`. Finally, Bolt D will receive its complete batch and call 
`finishBatch`.
+
+Notice that even though Bolt D is a committer, it doesn't have to wait for a 
second commit message when it receives the whole batch. Since it receives the 
whole batch during the commit phase, it goes ahead and completes the 
transaction.
+
+Committer bolts act just like batch bolts during the commit phase. The only 
difference between committer bolts and batch bolts is that committer bolts will 
not call `finishBatch` during the processing phase of a transaction.
+
+#### Acking
+
+Notice that you don't have to do any acking or anchoring when working with 
transactional topologies. Storm manages all of that underneath the hood. The 
acking strategy is heavily optimized.
+
+#### Failing a transaction
+
+When using regular bolts, you can call the `fail` method on `OutputCollector` 
to fail the tuple trees of which that tuple is a member. Since transactional 
topologies hide the acking framework from you, they provide a different 
mechanism to fail a batch (and cause the batch to be replayed). Just throw a 
[FailedException](/javadoc/apidocs/backtype/storm/topology/FailedException.html).
 Unlike regular exceptions, this will only cause that particular batch to 
replay and will not crash the process.
+
+### Transactional spout
+
+The `TransactionalSpout` interface is completely different from a regular 
`Spout` interface. A `TransactionalSpout` implementation emits batches of 
tuples and must ensure that the same batch of tuples is always emitted for the 
same transaction id.
+
+A transactional spout looks like this while a topology is executing:
+
+![Storm cluster](images/transactional-spout-structure.png)
+
+The coordinator on the left is a regular Storm spout that emits a tuple 
whenever a batch should be emitted for a transaction. The emitters execute as a 
regular Storm bolt and are responsible for emitting the actual tuples for the 
batch. The emitters subscribe to the "batch emit" stream of the coordinator 
using an all grouping.
+
+The need to be idempotent with respect to the tuples it emits requires a 
`TransactionalSpout` to store a small amount of state. The state is stored in 
Zookeeper.
+
+The details of implementing a `TransactionalSpout` are in [the 
Javadoc](/javadoc/apidocs/backtype/storm/transactional/ITransactionalSpout.html).
+
+#### Partitioned Transactional Spout
+
+A common kind of transactional spout is one that reads the batches from a set 
of partitions across many queue brokers. For example, this is how 
[TransactionalKafkaSpout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/TransactionalKafkaSpout.java)
 works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of 
managing the state for each partition to ensure idempotent replayability. See 
[the 
Javadoc](/javadoc/apidocs/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.html)
 for more details.
+
+### Configuration
+
+There's two important bits of configuration for transactional topologies:
+
+1. *Zookeeper:* By default, transactional topologies will store state in the 
same Zookeeper instance as used to manage the Storm cluster. You can override 
this with the "transactional.zookeeper.servers" and 
"transactional.zookeeper.port" configs.
+2. *Number of active batches permissible at once:* You must set a limit to the 
number of batches that can be processed at once. You configure this using the 
"topology.max.spout.pending" config. If you don't set this config, it will 
default to 1.
+
+## What if you can't emit the same batch of tuples for a given transaction id?
+
+So far the discussion around transactional topologies has assumed that you can 
always emit the exact same batch of tuples for the same transaction id. So what 
do you do if this is not possible?
+
+Consider an example of when this is not possible. Suppose you are reading 
tuples from a partitioned message broker (stream is partitioned across many 
machines), and a single transaction will include tuples from all the individual 
machines. Now suppose one of the nodes goes down at the same time that a 
transaction fails. Without that node, it is impossible to replay the same batch 
of tuples you just played for that transaction id. The processing in your 
topology will halt as its unable to replay the identical batch. The only 
possible solution is to emit a different batch for that transaction id than you 
emitted before. Is it possible to still achieve exactly-once messaging 
semantics even if the batches change?
+
+It turns out that you can still achieve exactly-once messaging semantics in 
your processing with a non-idempotent transactional spout, although this 
requires a bit more work on your part in developing the topology.
+
+If a batch can change for a given transaction id, then the logic we've been 
using so far of "skip the update if the transaction id in the database is the 
same as the id for the current transaction" is no longer valid. This is because 
the current batch is different than the batch for the last time the transaction 
was committed, so the result will not necessarily be the same. You can fix this 
problem by storing a little bit more state in the database. Let's again use the 
example of storing a global count in the database and suppose the partial count 
for the batch is stored in the `partialCount` variable.
+
+Instead of storing a value in the database that looks like this:
+
+```java
+class Value {
+  Object count;
+  BigInteger txid;
+}
+```
+
+For non-idempotent transactional spouts you should instead store a value that 
looks like this:
+
+```java
+class Value {
+  Object count;
+  BigInteger txid;
+  Object prevCount;
+}
+```
+
+The logic for the update is as follows:
+
+1. If the transaction id for the current batch is the same as the transaction 
id in the database, set `val.count = val.prevCount + partialCount`.
+2. Otherwise, set `val.prevCount = val.count`, `val.count = val.count + 
partialCount` and `val.txid = batchTxid`.
+
+This logic works because once you commit a particular transaction id for the 
first time, all prior transaction ids will never be committed again.
+
+There's a few more subtle aspects of transactional topologies that make opaque 
transactional spouts possible.
+
+When a transaction fails, all subsequent transactions in the processing phase 
are considered failed as well. Each of those transactions will be re-emitted 
and reprocessed. Without this behavior, the following situation could happen:
+
+1. Transaction A emits tuples 1-50
+2. Transaction B emits tuples 51-100
+3. Transaction A fails
+4. Transaction A emits tuples 1-40
+5. Transaction A commits
+6. Transaction B commits
+7. Transaction C emits tuples 101-150
+
+In this scenario, tuples 41-50 are skipped. By failing all subsequent 
transactions, this would happen instead:
+
+1. Transaction A emits tuples 1-50
+2. Transaction B emits tuples 51-100
+3. Transaction A fails (and causes Transaction B to fail)
+4. Transaction A emits tuples 1-40
+5. Transaction B emits tuples 41-90
+5. Transaction A commits
+6. Transaction B commits
+7. Transaction C emits tuples 91-140
+
+By failing all subsequent transactions on failure, no tuples are skipped. This 
also shows that a requirement of transactional spouts is that they always emit 
where the last transaction left off.
+
+A non-idempotent transactional spout is more concisely referred to as an 
"OpaqueTransactionalSpout" (opaque is the opposite of idempotent). 
[IOpaquePartitionedTransactionalSpout](/javadoc/apidocs/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html)
 is an interface for implementing opaque partitioned transactional spouts, of 
which 
[OpaqueTransactionalKafkaSpout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/OpaqueTransactionalKafkaSpout.java)
 is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual 
Kafka nodes without sacrificing accuracy as long as you use the update strategy 
as explained in this section.
+
+## Implementation
+
+The implementation for transactional topologies is very elegant. Managing the 
commit protocol, detecting failures, and pipelining batches seem complex, but 
everything turns out to be a straightforward mapping to Storm's primitives.
+
+How the data flow works:
+
+Here's how transactional spout works:
+
+1. Transactional spout is a subtopology consisting of a coordinator spout and 
an emitter bolt
+2. The coordinator is a regular spout with a parallelism of 1
+3. The emitter is a bolt with a parallelism of P, connected to the 
coordinator's "batch" stream using an all grouping
+4. When the coordinator determines it's time to enter the processing phase for 
a transaction, it emits a tuple containing the TransactionAttempt and the 
metadata for that transaction to the "batch" stream
+5. Because of the all grouping, every single emitter task receives the 
notification that it's time to emit its portion of the tuples for that 
transaction attempt
+6. Storm automatically manages the anchoring/acking necessary throughout the 
whole topology to determine when a transaction has completed the processing 
phase. The key here is that *the root tuple was created by the coordinator, so 
the coordinator will receive an "ack" if the processing phase succeeds, and a 
"fail" if it doesn't succeed for any reason (failure or timeout).
+7. If the processing phase succeeds, and all prior transactions have 
successfully committed, the coordinator emits a tuple containing the 
TransactionAttempt to the "commit" stream.
+8. All committing bolts subscribe to the commit stream using an all grouping, 
so that they will all receive a notification when the commit happens.
+9. Like the processing phase, the coordinator uses the acking framework to 
determine whether the commit phase succeeded or not. If it receives an "ack", 
it marks that transaction as complete in zookeeper.
+
+More notes:
+
+- Transactional spouts are a sub-topology consisting of a spout and a bolt
+  - the spout is the coordinator and contains a single task
+  - the bolt is the emitter
+  - the bolt subscribes to the coordinator with an all grouping
+  - serialization of metadata is handled by kryo. kryo is initialized ONLY 
with the registrations defined in the component configuration for the 
transactionalspout
+- the coordinator uses the acking framework to determine when a batch has been 
successfully processed, and then to determine when a batch has been 
successfully committed.
+- state is stored in zookeeper using RotatingTransactionalState
+- commiting bolts subscribe to the coordinators commit stream using an all 
grouping
+- CoordinatedBolt is used to detect when a bolt has received all the tuples 
for a particular batch.
+  - this is the same abstraction that is used in DRPC
+  - for commiting bolts, it waits to receive a tuple from the coordinator's 
commit stream before calling finishbatch
+  - so it can't call finishbatch until it's received all tuples from all 
subscribed components AND its received the commit stream tuple (for 
committers). this ensures that it can't prematurely call finishBatch

Added: 
storm/branches/bobby-versioned-site/releases/0.10.0/Trident-API-Overview.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Trident-API-Overview.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Trident-API-Overview.md 
(added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Trident-API-Overview.md 
Wed Mar 16 21:01:12 2016
@@ -0,0 +1,312 @@
+---
+title: Trident API Overview
+layout: documentation
+documentation: true
+---
+
+The core data model in Trident is the "Stream", processed as a series of 
batches. A stream is partitioned among the nodes in the cluster, and operations 
applied to a stream are applied in parallel across each partition.
+
+There are five kinds of operations in Trident:
+
+1. Operations that apply locally to each partition and cause no network 
transfer
+2. Repartitioning operations that repartition a stream but otherwise don't 
change the contents (involves network transfer)
+3. Aggregation operations that do network transfer as part of the operation
+4. Operations on grouped streams
+5. Merges and joins
+
+## Partition-local operations
+
+Partition-local operations involve no network transfer and are applied to each 
batch partition independently.
+
+### Functions
+
+A function takes in a set of input fields and emits zero or more tuples as 
output. The fields of the output tuple are appended to the original input tuple 
in the stream. If a function emits no tuples, the original input tuple is 
filtered out. Otherwise, the input tuple is duplicated for each output tuple. 
Suppose you have this function:
+
+```java
+public class MyFunction extends BaseFunction {
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        for(int i=0; i < tuple.getInteger(0); i++) {
+            collector.emit(new Values(i));
+        }
+    }
+}
+```
+
+Now suppose you have a stream in the variable "mystream" with the fields ["a", 
"b", "c"] with the following tuples:
+
+```
+[1, 2, 3]
+[4, 1, 6]
+[3, 0, 8]
+```
+
+If you run this code:
+
+```java
+mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
+```
+
+The resulting tuples would have fields ["a", "b", "c", "d"] and look like this:
+
+```
+[1, 2, 3, 0]
+[1, 2, 3, 1]
+[4, 1, 6, 0]
+```
+
+### Filters
+
+Filters take in a tuple as input and decide whether or not to keep that tuple 
or not. Suppose you had this filter:
+
+```java
+public class MyFilter extends BaseFilter {
+    public boolean isKeep(TridentTuple tuple) {
+        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
+    }
+}
+```
+
+Now suppose you had these tuples with fields ["a", "b", "c"]:
+
+```
+[1, 2, 3]
+[2, 1, 1]
+[2, 3, 4]
+```
+
+If you ran this code:
+
+```java
+mystream.each(new Fields("b", "a"), new MyFilter())
+```
+
+The resulting tuples would be:
+
+```
+[2, 1, 1]
+```
+
+### partitionAggregate
+
+partitionAggregate runs a function on each partition of a batch of tuples. 
Unlike functions, the tuples emitted by partitionAggregate replace the input 
tuples given to it. Consider this example:
+
+```java
+mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
+```
+
+Suppose the input stream contained fields ["a", "b"] and the following 
partitions of tuples:
+
+```
+Partition 0:
+["a", 1]
+["b", 2]
+
+Partition 1:
+["a", 3]
+["c", 8]
+
+Partition 2:
+["e", 1]
+["d", 9]
+["d", 10]
+```
+
+Then the output stream of that code would contain these tuples with one field 
called "sum":
+
+```
+Partition 0:
+[3]
+
+Partition 1:
+[11]
+
+Partition 2:
+[20]
+```
+
+There are three different interfaces for defining aggregators: 
CombinerAggregator, ReducerAggregator, and Aggregator.
+
+Here's the interface for CombinerAggregator:
+
+```java
+public interface CombinerAggregator<T> extends Serializable {
+    T init(TridentTuple tuple);
+    T combine(T val1, T val2);
+    T zero();
+}
+```
+
+A CombinerAggregator returns a single tuple with a single field as output. 
CombinerAggregators run the init function on each input tuple and use the 
combine function to combine values until there's only one value left. If 
there's no tuples in the partition, the CombinerAggregator emits the output of 
the zero function. For example, here's the implementation of Count:
+
+```java
+public class Count implements CombinerAggregator<Long> {
+    public Long init(TridentTuple tuple) {
+        return 1L;
+    }
+
+    public Long combine(Long val1, Long val2) {
+        return val1 + val2;
+    }
+
+    public Long zero() {
+        return 0L;
+    }
+}
+```
+
+The benefits of CombinerAggregators are seen when you use them with the 
aggregate method instead of partitionAggregate. In that case, Trident 
automatically optimizes the computation by doing partial aggregations before 
transferring tuples over the network.
+
+A ReducerAggregator has the following interface:
+
+```java
+public interface ReducerAggregator<T> extends Serializable {
+    T init();
+    T reduce(T curr, TridentTuple tuple);
+}
+```
+
+A ReducerAggregator produces an initial value with init, and then it iterates 
on that value for each input tuple to produce a single tuple with a single 
value as output. For example, here's how you would define Count as a 
ReducerAggregator:
+
+```java
+public class Count implements ReducerAggregator<Long> {
+    public Long init() {
+        return 0L;
+    }
+    
+    public Long reduce(Long curr, TridentTuple tuple) {
+        return curr + 1;
+    }
+}
+```
+
+ReducerAggregator can also be used with persistentAggregate, as you'll see 
later.
+
+The most general interface for performing aggregations is Aggregator, which 
looks like this:
+
+```java
+public interface Aggregator<T> extends Operation {
+    T init(Object batchId, TridentCollector collector);
+    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
+    void complete(T state, TridentCollector collector);
+}
+```
+
+Aggregators can emit any number of tuples with any number of fields. They can 
emit tuples at any point during execution. Aggregators execute in the following 
way:
+
+1. The init method is called before processing the batch. The return value of 
init is an Object that will represent the state of the aggregation and will be 
passed into the aggregate and complete methods.
+2. The aggregate method is called for each input tuple in the batch partition. 
This method can update the state and optionally emit tuples.
+3. The complete method is called when all tuples for the batch partition have 
been processed by aggregate. 
+
+Here's how you would implement Count as an Aggregator:
+
+```java
+public class CountAgg extends BaseAggregator<CountState> {
+    static class CountState {
+        long count = 0;
+    }
+
+    public CountState init(Object batchId, TridentCollector collector) {
+        return new CountState();
+    }
+
+    public void aggregate(CountState state, TridentTuple tuple, 
TridentCollector collector) {
+        state.count+=1;
+    }
+
+    public void complete(CountState state, TridentCollector collector) {
+        collector.emit(new Values(state.count));
+    }
+}
+```
+
+Sometimes you want to execute multiple aggregators at the same time. This is 
called chaining and can be accomplished like this:
+
+```java
+mystream.chainedAgg()
+        .partitionAggregate(new Count(), new Fields("count"))
+        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
+        .chainEnd()
+```
+
+This code will run the Count and Sum aggregators on each partition. The output 
will contain a single tuple with the fields ["count", "sum"].
+
+### stateQuery and partitionPersist
+
+stateQuery and partitionPersist query and update sources of state, 
respectively. You can read about how to use them on [Trident state 
doc](Trident-state.html).
+
+### projection
+
+The projection method on Stream keeps only the fields specified in the 
operation. If you had a Stream with fields ["a", "b", "c", "d"] and you ran 
this code:
+
+```java
+mystream.project(new Fields("b", "d"))
+```
+
+The output stream would contain only the fields ["b", "d"].
+
+
+## Repartitioning operations
+
+Repartitioning operations run a function to change how the tuples are 
partitioned across tasks. The number of partitions can also change as a result 
of repartitioning (for example, if the parallelism hint is greater after 
repartioning). Repartitioning requires network transfer. Here are the 
repartitioning functions:
+
+1. shuffle: Use random round robin algorithm to evenly redistribute tuples 
across all target partitions
+2. broadcast: Every tuple is replicated to all target partitions. This can 
useful during DRPC – for example, if you need to do a stateQuery on every 
partition of data.
+3. partitionBy: partitionBy takes in a set of fields and does semantic 
partitioning based on that set of fields. The fields are hashed and modded by 
the number of target partitions to select the target partition. partitionBy 
guarantees that the same set of fields always goes to the same target partition.
+4. global: All tuples are sent to the same partition. The same partition is 
chosen for all batches in the stream.
+5. batchGlobal: All tuples in the batch are sent to the same partition. 
Different batches in the stream may go to different partitions. 
+6. partition: This method takes in a custom partitioning function that 
implements backtype.storm.grouping.CustomStreamGrouping
+
+## Aggregation operations
+
+Trident has aggregate and persistentAggregate methods for doing aggregations 
on Streams. aggregate is run on each batch of the stream in isolation, while 
persistentAggregate will aggregation on all tuples across all batches in the 
stream and store the result in a source of state.
+
+Running aggregate on a Stream does a global aggregation. When you use a 
ReducerAggregator or an Aggregator, the stream is first repartitioned into a 
single partition, and then the aggregation function is run on that partition. 
When you use a CombinerAggregator, on the other hand, first Trident will 
compute partial aggregations of each partition, then repartition to a single 
partition, and then finish the aggregation after the network transfer. 
CombinerAggregator's are far more efficient and should be used when possible.
+
+Here's an example of using aggregate to get a global count for a batch:
+
+```java
+mystream.aggregate(new Count(), new Fields("count"))
+```
+
+Like partitionAggregate, aggregators for aggregate can be chained. However, if 
you chain a CombinerAggregator with a non-CombinerAggregator, Trident is unable 
to do the partial aggregation optimization.
+
+You can read more about how to use persistentAggregate in the [Trident state 
doc](https://github.com/apache/storm/wiki/Trident-state).
+
+## Operations on grouped streams
+
+The groupBy operation repartitions the stream by doing a partitionBy on the 
specified fields, and then within each partition groups tuples together whose 
group fields are equal. For example, here's an illustration of a groupBy 
operation:
+
+![Grouping](images/grouping.png)
+
+If you run aggregators on a grouped stream, the aggregation will be run within 
each group instead of against the whole batch. persistentAggregate can also be 
run on a GroupedStream, in which case the results will be stored in a 
[MapState](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/MapState.java)
 with the key being the grouping fields. You can read more about 
persistentAggregate in the [Trident state doc](Trident-state.html).
+
+Like regular streams, aggregators on grouped streams can be chained.
+
+## Merges and joins
+
+The last part of the API is combining different streams together. The simplest 
way to combine streams is to merge them into one stream. You can do that with 
the TridentTopology#merge method, like so:
+
+```java
+topology.merge(stream1, stream2, stream3);
+```
+
+Trident will name the output fields of the new, merged stream as the output 
fields of the first stream.
+
+Another way to combine streams is with a join. Now, a standard join, like the 
kind from SQL, require finite input. So they don't make sense with infinite 
streams. Joins in Trident only apply within each small batch that comes off of 
the spout. 
+
+Here's an example join between a stream containing fields ["key", "val1", 
"val2"] and another stream containing ["x", "val1"]:
+
+```java
+topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new 
Fields("key", "a", "b", "c"));
+```
+
+This joins stream1 and stream2 together using "key" and "x" as the join fields 
for each respective stream. Then, Trident requires that all the output fields 
of the new stream be named, since the input streams could have overlapping 
field names. The tuples emitted from the join will contain:
+
+1. First, the list of join fields. In this case, "key" corresponds to "key" 
from stream1 and "x" from stream2.
+2. Next, a list of all non-join fields from all streams, in order of how the 
streams were passed to the join method. In this case, "a" and "b" correspond to 
"val1" and "val2" from stream1, and "c" corresponds to "val1" from stream2.
+
+When a join happens between streams originating from different spouts, those 
spouts will be synchronized with how they emit batches. That is, a batch of 
processing will include tuples from each spout.
+
+You might be wondering – how do you do something like a "windowed join", 
where tuples from one side of the join are joined against the last hour of 
tuples from the other side of the join.
+
+To do this, you would make use of partitionPersist and stateQuery. The last 
hour of tuples from one side of the join would be stored and rotated in a 
source of state, keyed by the join field. Then the stateQuery would do lookups 
by the join field to perform the "join".
\ No newline at end of file

Added: storm/branches/bobby-versioned-site/releases/0.10.0/Trident-spouts.md
URL: 
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/0.10.0/Trident-spouts.md?rev=1735297&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/0.10.0/Trident-spouts.md 
(added)
+++ storm/branches/bobby-versioned-site/releases/0.10.0/Trident-spouts.md Wed 
Mar 16 21:01:12 2016
@@ -0,0 +1,44 @@
+---
+title: Trident Spouts
+layout: documentation
+documentation: true
+---
+# Trident spouts
+
+Like in the vanilla Storm API, spouts are the source of streams in a Trident 
topology. On top of the vanilla Storm spouts, Trident exposes additional APIs 
for more sophisticated spouts.
+
+There is an inextricable link between how you source your data streams and how 
you update state (e.g. databases) based on those data streams. See [Trident 
state doc](Trident-state.html) for an explanation of this – understanding 
this link is imperative for understanding the spout options available.
+
+Regular Storm spouts will be non-transactional spouts in a Trident topology. 
To use a regular Storm IRichSpout, create the stream like this in a 
TridentTopology:
+
+```java
+TridentTopology topology = new TridentTopology();
+topology.newStream("myspoutid", new MyRichSpout());
+```
+
+All spouts in a Trident topology are required to be given a unique identifier 
for the stream – this identifier must be unique across all topologies run on 
the cluster. Trident will use this identifier to store metadata about what the 
spout has consumed in Zookeeper, including the txid and any metadata associated 
with the spout.
+
+You can configure the Zookeeper storage of spout metadata via the following 
configuration options:
+
+1. `transactional.zookeeper.servers`: A list of Zookeeper hostnames 
+2. `transactional.zookeeper.port`: The port of the Zookeeper cluster
+3. `transactional.zookeeper.root`: The root dir in Zookeeper where metadata is 
stored. Metadata will be stored at the path <root path>/<spout id>
+
+## Pipelining
+
+By default, Trident processes a single batch at a time, waiting for the batch 
to succeed or fail before trying another batch. You can get significantly 
higher throughput – and lower latency of processing of each batch – by 
pipelining the batches. You configure the maximum amount of batches to be 
processed simultaneously with the "topology.max.spout.pending" property. 
+
+Even while processing multiple batches simultaneously, Trident will order any 
state updates taking place in the topology among batches. For example, suppose 
you're doing a global count aggregation into a database. The idea is that while 
you're updating the count in the database for batch 1, you can still be 
computing the partial counts for batches 2 through 10. Trident won't move on to 
the state updates for batch 2 until the state updates for batch 1 have 
succeeded. This is essential for achieving exactly-once processing semantics, 
as outline in [Trident state doc](Trident-state.html).
+
+## Trident spout types
+
+Here are the following spout APIs available:
+
+1. 
[ITridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java):
 The most general API that can support transactional or opaque transactional 
semantics. Generally you'll use one of the partitioned flavors of this API 
rather than this one directly.
+2. 
[IBatchSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java):
 A non-transactional spout that emits batches of tuples at a time
+3. 
[IPartitionedTridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java):
 A transactional spout that reads from a partitioned data source (like a 
cluster of Kafka servers)
+4. 
[IOpaquePartitionedTridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java):
 An opaque transactional spout that reads from a partitioned data source
+
+And, like mentioned in the beginning of this tutorial, you can use regular 
IRichSpout's as well.
+ 
+


Reply via email to