http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/nimbus-ha-design.md
----------------------------------------------------------------------
diff --git a/docs/nimbus-ha-design.md b/docs/nimbus-ha-design.md
new file mode 100644
index 0000000..ae1a936
--- /dev/null
+++ b/docs/nimbus-ha-design.md
@@ -0,0 +1,222 @@
+---
+title: Highly Available Nimbus Design
+layout: documentation
+documentation: true
+---
+
+##Problem Statement:
+Currently the storm master aka nimbus, is a process that runs on a single 
machine under supervision. In most cases the 
+nimbus failure is transient and it is restarted by the supervisor. However 
sometimes when disks fail and networks 
+partitions occur, nimbus goes down. Under these circumstances the topologies 
run normally but no new topologies can be 
+submitted, no existing topologies can be killed/deactivated/activated and if a 
supervisor node fails then the 
+reassignments are not performed resulting in performance degradation or 
topology failures. With this project we intend 
+to resolve this problem by running nimbus in a primary backup mode to 
guarantee that even if a nimbus server fails one 
+of the backups will take over.
+##Requirements:
+* Increase overall availability of nimbus.
+* Allow nimbus hosts to leave and join the cluster at will any time. A newly 
joined host should auto catch up and join 
+the list of potential leaders automatically. 
+* No topology resubmissions required in case of nimbus fail overs.
+* No active topology should ever be lost. 
+
+##Leader Election:
+The nimbus server will use the following interface:
+
+```java
+public interface ILeaderElector {
+    /**
+     * queue up for leadership lock. The call returns immediately and the 
caller                     
+     * must check isLeader() to perform any leadership action.
+     */
+    void addToLeaderLockQueue();
+
+    /**
+     * Removes the caller from the leader lock queue. If the caller is leader
+     * also releases the lock.
+     */
+    void removeFromLeaderLockQueue();
+
+    /**
+     *
+     * @return true if the caller currently has the leader lock.
+     */
+    boolean isLeader();
+
+    /**
+     *
+     * @return the current leader's address , throws exception if noone has 
has    lock.
+     */
+    InetSocketAddress getLeaderAddress();
+
+    /**
+     * 
+     * @return list of current nimbus addresses, includes leader.
+     */
+    List<InetSocketAddress> getAllNimbusAddresses();
+}
+```
+On startup nimbus will check if it has code for all active topologies 
available locally. Once it gets to this state it 
+will call addToLeaderLockQueue() function. When a nimbus is notified to become 
a leader it will check if it has all the
+code locally before assuming the leadership role. If any active topology code 
is missing, the node will not accept the 
+leadership role instead it will release the lock and wait till it has all the 
code before requeueing for leader lock. 
+
+The first implementation will be Zookeeper based. If the zookeeper connection 
is lost/resetted resulting in loss of lock
+or the spot in queue the implementation will take care of updating the state 
such that isLeader() will reflect the 
+current status.The leader like actions must finish in less than 
minimumOf(connectionTimeout, SessionTimeout) to ensure
+the lock was held by nimbus for the entire duration of the action (Not sure if 
we want to just state this expectation 
+and ensure that zk configurations are set high enough which will result in 
higher failover time or we actually want to 
+create some sort of rollback mechanism for all actions, the second option 
needs a lot of code). If a nimbus that is not 
+leader receives a request that only a leader can perform it will throw a 
RunTimeException.
+
+Following steps describes a nimbus failover scenario:
+* Let’s say we have 4 topologies running with 3 nimbus nodes and 
code-replication-factor = 2. We assume that the 
+invariant “The leader nimbus has code for all topologies locally” holds 
true at the beginning. nonleader-1 has code for 
+the first 2 topologies and nonLeader-2 has code for the other 2 topologies.
+* Leader nimbus dies, hard disk failure so no recovery possible.
+* nonLeader-1 gets a zookeeper notification to indicate it is now the new 
leader. before accepting the leadership it 
+checks if it has code available for all 4 topologies(these are topologies 
under /storm/storms/). It realizes it only has
+code for 2 topologies so it relinquishes the lock and looks under  
/storm/code-distributor/topologyId to find out from 
+where can it download the code/metafile for the missing topologies. it finds 
entries for the leader nimbus and 
+nonleader-2. It will try downloading from both as part of its retry mechanism.
+* nonLeader-2’s code sync thread also realizes that it is missing code for 2 
topologies and follows the same process 
+described in step-3 to download code for missing topologies. 
+* eventually at least one of the nimbuses will have all the code locally and 
will accept leadership.
+This sequence diagram describes how leader election and failover would work 
with multiple components.
+
+![Nimbus Fail Over](images/nimbus_ha_leader_election_and_failover.png)
+
+##Nimbus state store:
+
+Currently the nimbus stores 2 kind of data
+* Meta information like supervisor info, assignment info which is stored in 
zookeeper
+* Actual topology configs and jars that is stored on nimbus host’s local 
disk.
+
+To achieve fail over from primary to backup servers nimbus state/data needs to 
be replicated across all nimbus hosts or 
+needs to be stored in a distributed storage. Replicating the data correctly 
involves state management, consistency checks
+and it is hard to test for correctness.However many storm users do not want to 
take extra dependency on another replicated
+storage system like HDFS and still need high availability.Eventually, we want 
to move to the bittorrent protocol for code 
+distribution given the size of the jars and to achieve better scaling when the 
total number of supervisors is very high. 
+The current file system based model for code distribution works fine with 
systems that have file system like structure
+but it fails to support a non file system based approach like bit torrent. To 
support bit torrent and all the file
+system based replicated storage systems we propose the following interface:
+
+```java
+/**
+ * Interface responsible to distribute code in the cluster.
+ */
+public interface ICodeDistributor {
+    /**
+     * Prepare this code distributor.
+     * @param conf
+     */
+    void prepare(Map conf);
+
+    /**
+     * This API will perform the actual upload of the code to the distributed 
implementation.
+     * The API should return a Meta file which should have enough information 
for downloader 
+     * so it can download the code e.g. for bittorrent it will be a torrent 
file, in case of something         
+     * like HDFS or s3  it might have the actual directory or paths for files 
to be downloaded.
+     * @param dirPath local directory where all the code to be distributed 
exists.
+     * @param topologyId the topologyId for which the meta file needs to be 
created.
+     * @return metaFile
+     */
+    File upload(Path dirPath, String topologyId);
+
+    /**
+     * Given the topologyId and metafile, download the actual code and return 
the downloaded file's list.
+     * @param topologyid
+     * @param metafile 
+     * @param destDirPath the folder where all the files will be downloaded.
+     * @return
+     */
+    List<File> download(Path destDirPath, String topologyid, File metafile);
+
+    /**
+      * Given the topologyId, returns number of hosts where the code has been 
replicated.
+      */
+    int getReplicationCount(String topologyId);
+    
+   /**
+     * Performs the cleanup.
+     * @param topologyid
+     */
+    void cleanup(String topologyid);
+
+    /**
+     * Close this distributor.
+     * @param conf
+     */
+    void close(Map conf);
+}
+```
+To support replication we will allow the user to define a code replication 
factor which would reflect number of nimbus 
+hosts to which the code must be replicated before starting the topology. With 
replication comes the issue of consistency. 
+We will treat zookeeper’s list of active topologies as our authority for 
topologies for which the code must exist on a 
+nimbus host. Any nimbus host that does not have all the code for all the 
topologies which are marked as active in zookeeper 
+will relinquish it’s lock so some other nimbus host could become leader. A 
background thread on all nimbus host will 
+continuously try to sync code from other hosts where the code was successfully 
replicated so eventually at least one nimbus 
+will accept leadership as long as at least one seed hosts exists for each 
active topology. 
+               
+Following steps describe code replication amongst nimbus hosts for a topology:
+* When client uploads jar, nothing changes.
+* When client submits a topology, leader nimbus calls code distributor’s 
upload function which will create a metafile stored 
+locally on leader nimbus. Leader nimbus will write new entries under 
/storm/code-distributor/topologyId to notify all 
+nonleader nimbuses that they should download this new code.
+* We wait on the leader nimbus to ensure at least N non leader nimbus has the 
code replicated, with a user configurable timeout.
+* When a non leader nimbus receives the notification about new code, it 
downloads the meta file from leader nimbus and then
+downloads the real code by calling code distributor’s download function with 
metafile as input.
+* Once non leader finishes downloading code, it will write an entry under 
/storm/code-distributor/topologyId to indicate 
+it is one of the possible places to download the code/metafile in case the 
leader nimbus dies.
+* leader nimbus goes ahead and does all the usual things it does as part of 
submit topologies.
+
+The following sequence diagram describes the communication between different 
components involved in code distribution.
+
+![Nimbus HA Topology Submission](images/nimbus_ha_topology_submission.png)
+
+##Thrift and Rest API 
+In order to avoid workers/supervisors/ui talking to zookeeper for getting 
master nimbus address we are going to modify the 
+`getClusterInfo` API so it can also return nimbus information. getClusterInfo 
currently returns `ClusterSummary` instance
+which has a list of `supervisorSummary` and a list of 'topologySummary` 
instances. We will add a list of `NimbusSummary` 
+to the `ClusterSummary`. See the structures below:
+
+```thrift
+struct ClusterSummary {
+  1: required list<SupervisorSummary> supervisors;
+  3: required list<TopologySummary> topologies;
+  4: required list<NimbusSummary> nimbuses;
+}
+
+struct NimbusSummary {
+  1: required string host;
+  2: required i32 port;
+  3: required i32 uptime_secs;
+  4: required bool isLeader;
+  5: required string version;
+}
+```
+
+This will be used by StormSubmitter, Nimbus clients,supervisors and ui to 
discover the current leaders and participating 
+nimbus hosts. Any nimbus host will be able to respond to these requests. The 
nimbus hosts can read this information once 
+from zookeeper and cache it and keep updating the cache when the watchers are 
fired to indicate any changes,which should 
+be rare in general case.
+
+## Configuration
+You can use nimbus ha with default configuration , however the default 
configuration assumes a single nimbus host so it
+trades off replication for lower topology submission latency. Depending on 
your use case you can adjust following configurations:
+* storm.codedistributor.class : This is a string representing fully qualified 
class name of a class that implements
+org.apache.storm.codedistributor.ICodeDistributor. The default is set to 
"org.apache.storm.codedistributor.LocalFileSystemCodeDistributor".
+This class leverages local file system to store both meta files and 
code/configs. This class adds extra load on zookeeper as even after
+downloading the code-distrbutor meta file it contacts zookeeper in order to 
figure out hosts from where it can download
+actual code/config and to get the current replication count. An alternative is 
to use 
+"org.apache.storm.hdfs.ha.codedistributor.HDFSCodeDistributor" which relies on 
HDFS but does not add extra load on zookeeper and will 
+make topology submission faster.
+* topology.min.replication.count : Minimum number of nimbus hosts where the 
code must be replicated before leader nimbus
+can mark the topology as active and create assignments. Default is 1.
+* topology.max.replication.wait.time.sec: Maximum wait time for the nimbus 
host replication to achieve the nimbus.min.replication.count.
+Once this time is elapsed nimbus will go ahead and perform topology activation 
tasks even if required nimbus.min.replication.count is not achieved. 
+The default is 60 seconds, a value of -1 indicates to wait for ever.
+*nimbus.code.sync.freq.secs: frequency at which the background thread on 
nimbus which syncs code for locally missing topologies will run. default is 5 
minutes.
+
+Note: Even though all nimbus hosts have watchers on zookeeper to be notified 
immediately as soon as a new topology is available for code
+download, the callback pretty much never results in code download. In practice 
we have observed that the desired replication is only achieved once the 
background-thread runs. 
+So you should expect your topology submission time to be somewhere between 0 
to (2 * nimbus.code.sync.freq.secs) for any nimbus.min.replication.count > 1.

http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
index 46a2b89..38d7cdc 100644
--- a/docs/storm-kafka.md
+++ b/docs/storm-kafka.md
@@ -68,7 +68,7 @@ In addition to these parameters, SpoutConfig contains the 
following fields that
 
     // Exponential back-off retry settings.  These are used when retrying 
messages after a bolt
     // calls OutputCollector.fail().
-    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS 
appropriately to prevent
+    // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS 
appropriately to prevent
     // resubmitting the message while still retrying.
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
@@ -187,9 +187,9 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use 
the following dependen
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version 
conflicts with Storm's dependencies.
 
 ##Writing to Kafka as part of your topology
-You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a 
component to your topology or if you 
-are using trident you can use storm.kafka.trident.TridentState, 
storm.kafka.trident.TridentStateFactory and
-storm.kafka.trident.TridentKafkaUpdater.
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach 
it as a component to your topology or if you 
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
 You need to provide implementation of following 2 interfaces
 

http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/storm-metrics-profiling-internal-actions.md
----------------------------------------------------------------------
diff --git a/docs/storm-metrics-profiling-internal-actions.md 
b/docs/storm-metrics-profiling-internal-actions.md
new file mode 100644
index 0000000..e549c0c
--- /dev/null
+++ b/docs/storm-metrics-profiling-internal-actions.md
@@ -0,0 +1,70 @@
+# Storm Metrics for Profiling Various Storm Internal Actions
+
+With the addition of these metrics, Storm users can collect, view, and analyze 
the performance of various internal actions.  The actions that are profiled 
include thrift rpc calls and http quests within the storm daemons. For 
instance, in the Storm Nimbus daemon, the following thrift calls defined in the 
Nimbus$Iface are profiled:
+
+- submitTopology
+- submitTopologyWithOpts
+- killTopology
+- killTopologyWithOpts
+- activate
+- deactivate
+- rebalance
+- setLogConfig
+- getLogConfig
+
+Various HTTP GET and POST requests are marked for profiling as well such as 
the GET and POST requests for the Storm UI daemon (ui/core.cj)
+To implement these metrics the following packages are used: 
+- io.dropwizard.metrics
+- metrics-clojure
+
+## How it works
+
+By using packages io.dropwizard.metrics and metrics-clojure (clojure wrapper 
for the metrics Java API), we can mark functions to profile by declaring 
(defmeter num-some-func-calls) and then adding the (mark! num-some-func-calls) 
to where the function is invoked. For example:
+
+    (defmeter num-some-func-calls)
+    (defn some-func [args]
+        (mark! num-some-func-calls)
+        (body))
+        
+What essentially the mark! API call does is increment a counter that 
represents how many times a certain action occured.  For instantanous 
measurements user can use gauges.  For example: 
+
+    (defgauge nimbus:num-supervisors
+         (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
+         
+The above example will get the number of supervisors in the cluster.  This 
metric is not accumlative like one previously discussed.
+
+A metrics reporting server needs to also be activated to collect the metrics. 
You can do this by calling the following function:
+
+    (defn start-metrics-reporters []
+        (jmx/start (jmx/reporter {})))
+
+## How to collect the metrics
+
+Metrics can be reported via JMX or HTTP.  A user can use JConsole or VisualVM 
to connect to the jvm process and view the stats.
+
+To view the metrics in a GUI use VisualVM or JConsole.  Screenshot of using 
VisualVm for metrics: 
+
+![Viewing metrics with VisualVM](images/viewing_metrics_with_VisualVM.png)
+
+For detailed information regarding how to collect the metrics please 
reference: 
+
+https://dropwizard.github.io/metrics/3.1.0/getting-started/
+
+If you want use JMX and view metrics through JConsole or VisualVM, remember 
launch JVM processes your want to profile with the correct JMX configurations.  
For example in Storm you would add the following to conf/storm.yaml
+
+    nimbus.childopts: "-Xmx1024m 
-Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.port=3333  
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false"
+    
+    ui.childopts: "-Xmx768m -Dcom.sun.management.jmxremote.port=3334 
-Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false"
+    
+    logviewer.childopts: "-Xmx128m -Dcom.sun.management.jmxremote.port=3335 
-Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false"
+    
+    drpc.childopts: "-Xmx768m -Dcom.sun.management.jmxremote.port=3336 
-Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false"
+   
+    supervisor.childopts: "-Xmx256m -Dcom.sun.management.jmxremote.port=3337 
-Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.authenticate=false"
+
+### Please Note:
+Since we shade all of the packages we use, additional plugins for collecting 
metrics might not work at this time.  Currently collecting the metrics via JMX 
is supported.
+   
+For more information about io.dropwizard.metrics and metrics-clojure packages 
please reference their original documentation:
+- https://dropwizard.github.io/metrics/3.1.0/
+- http://metrics-clojure.readthedocs.org/en/latest/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/storm-solr.md
----------------------------------------------------------------------
diff --git a/docs/storm-solr.md b/docs/storm-solr.md
new file mode 100644
index 0000000..4f5106d
--- /dev/null
+++ b/docs/storm-solr.md
@@ -0,0 +1,184 @@
+---
+title: Storm Solr Integration
+layout: documentation
+documentation: true
+---
+
+Storm and Trident integration for Apache Solr. This package includes a bolt 
and a trident state that enable a Storm topology
+stream the contents of storm tuples to index Solr collections.
+ 
+# Index Storm tuples into a Solr collection
+The The bolt and trident state provided use one of the supplied mappers to 
build a `SolrRequest` object that is 
+responsible for making the update calls to Solr, thus updating the index of 
the collection specified.
+ 
+# Usage Examples 
+In this section we provide some simple code snippets on how to build Storm and 
Trident topologies to index Solr. In subsequent sections we 
+describe in detail the two key components of the Storm Solr integration, the 
`SolrUpdateBolt`, and the `Mappers`, `SolrFieldsMapper`, and `SolrJsonMapper`.
+
+## Storm Bolt With JSON Mapper and Count Based Commit Strategy
+
+```java
+    new SolrUpdateBolt(solrConfig, solrMapper, solrCommitStgy)
+    
+    // zkHostString for Solr 'gettingstarted' example
+    SolrConfig solrConfig = new SolrConfig("127.0.0.1:9983");
+    
+    // JSON Mapper used to generate 'SolrRequest' requests to update the 
"gettingstarted" Solr collection with JSON content declared the tuple field 
with name "JSON"
+    SolrMapper solrMapper = new SolrJsonMapper.Builder("gettingstarted", 
"JSON").build(); 
+     
+    // Acks every other five tuples. Setting to null acks every tuple
+    SolrCommitStrategy solrCommitStgy = new CountBasedCommit(5);          
+```
+
+## Trident Topology With Fields Mapper
+```java
+    new SolrStateFactory(solrConfig, solrMapper);
+    
+    // zkHostString for Solr 'gettingstarted' example
+    SolrConfig solrConfig = new SolrConfig("127.0.0.1:9983");
+    
+    /* Solr Fields Mapper used to generate 'SolrRequest' requests to update 
the "gettingstarted" Solr collection. The Solr index is updated using the field 
values of the tuple fields that match static or dynamic fields declared in the 
schema object build using schemaBuilder */ 
+    SolrMapper solrMapper = new SolrFieldsMapper.Builder(schemaBuilder, 
"gettingstarted").build();
+
+    // builds the Schema object from the JSON representation of the schema as 
returned by the URL http://localhost:8983/solr/gettingstarted/schema/ 
+    SchemaBuilder schemaBuilder = new RestJsonSchemaBuilder("localhost", 
"8983", "gettingstarted")
+```
+
+## SolrUpdateBolt
+ `SolrUpdateBolt` streams tuples directly into Apache Solr. The Solr index is 
updated using`SolrRequest` requests. 
+ The `SolrUpdateBolt` is configurable using implementations of `SolrConfig`, 
`SolrMapper`, and optionally `SolrCommitStrategy`.
+   
+ The data to stream onto Solr is extracted from the tuples using the strategy 
defined in the `SolrMapper` implementation.
+ 
+ The `SolrRquest` can be sent every tuple, or according to a strategy defined 
by `SolrCommitStrategy` implementations. 
+ If a `SolrCommitStrategy` is in place and one of the tuples in the batch 
fails, the batch is not committed, and all the tuples in that 
+  batch are marked as Fail, and retried. On the other hand, if all tuples 
succeed, the `SolrRequest` is committed and all tuples are successfully acked.
+ 
+ `SolrConfig` is the class containing Solr configuration to be made available 
to Storm Solr bolts. Any configuration needed in the bolts should be put in 
this class.
+ 
+
+## SolrMapper
+`SorlMapper` implementations define the strategy to extract information from 
the tuples. The public method
+`toSolrRequest` receives a tuple or a list of tuples and returns a 
`SolrRequest` object that is used to update the Solr index.
+
+
+### SolrJsonMapper
+The `SolrJsonMapper` creates a Solr update request that is sent to the URL 
endpoint defined by Solr as the resource 
+destination for requests in JSON format. 
+ 
+To create a `SolrJsonMapper` the client must specify the name of the 
collection to update as well as the 
+tuple field that contains the JSON object used to update the Solr index. If 
the tuple does not contain the field specified, 
+a `SolrMapperException` will be thrown when the method `toSolrRequest`is 
called. If the field exists, its value can either 
+be a String with the contents in JSON format, or a Java object that will be 
serialized to JSON
+ 
+Code snippet illustrating how to create a `SolrJsonMapper` object to update 
the `gettingstarted` Solr collection with JSON content 
+declared in the tuple field with name "JSON"
+``` java
+    SolrMapper solrMapper = new SolrJsonMapper.Builder("gettingstarted", 
"JSON").build();
+```
+
+
+### SolrFieldsMapper
+The `SolrFieldsMapper` creates a Solr update request that is sent to the Solr 
URL endpoint that handles the updates of `SolrInputDocument` objects.
+
+To create a `SolrFieldsMapper` the client must specify the name of the 
collection to update as well as the `SolrSchemaBuilder`. 
+The Solr `Schema` is used to extract information about the Solr schema fields 
and corresponding types. This metadata is used
+to get the information from the tuples. Only tuple fields that match a static 
or dynamic Solr fields are added to the document. Tuple fields 
+that do not match the schema are not added to the `SolrInputDocument` being 
prepared for indexing. A debug log message is printed for the 
+ tuple fields that do not match the schema and hence are not indexed.
+ 
+
+The `SolrFieldsMapper` supports multivalue fields. A multivalue tuple field 
must be tokenized. The default token is |. Any 
+arbitrary token can be specified by calling the method 
`org.apache.storm.solr.mapper.SolrFieldsMapper.Builder.setMultiValueFieldToken`
+that is part of the `SolrFieldsMapper.Builder` builder class. 
+
+Code snippet illustrating how to create a `SolrFieldsMapper` object to update 
the `gettingstarted` Solr collection. The multivalue 
+field separates each value with the token % instead of the default | . To use 
the default token you can ommit the call to the method
+`setMultiValueFieldToken`.
+
+``` java
+    new SolrFieldsMapper.Builder(
+            new RestJsonSchemaBuilder("localhost", "8983", "gettingstarted"), 
"gettingstarted")
+                .setMultiValueFieldToken("%").build();
+```
+
+# Build And Run Bundled Examples  
+To be able to run the examples you must first build the java code in the 
package `storm-solr`, 
+and then generate an uber jar with all the dependencies.
+
+
+## Build the Storm Apache Solr Integration Code
+
+`mvn clean install -f REPO_HOME/storm/external/storm-solr/pom.xml`
+ 
+## Use the Maven Shade Plugin to Build the Uber Jar
+
+ Add the following to `REPO_HOME/storm/external/storm-solr/pom.xml`
+ 
+ ```
+ <plugin>
+     <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-shade-plugin</artifactId>
+     <version>2.4.1</version>
+     <executions>
+         <execution>
+             <phase>package</phase>
+             <goals>
+                 <goal>shade</goal>
+             </goals>
+             <configuration>
+                 <transformers>
+                     <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                         
<mainClass>org.apache.storm.solr.topology.SolrJsonTopology</mainClass>
+                     </transformer>
+                 </transformers>
+             </configuration>
+         </execution>
+     </executions>
+</plugin>
+ ```
+
+create the uber jar by running the commmand:
+
+`mvn package -f REPO_HOME/storm/external/storm-solr/pom.xml`
+
+This will create the uber jar file with the name and location matching the 
following pattern:
+ 
+`REPO_HOME/storm/external/storm/target/storm-solr-0.11.0-SNAPSHOT.jar`
+
+## Run Examples
+Copy the file 
`REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT.jar` to 
`STORM_HOME/extlib`
+
+**The code examples provided require that you first run the [Solr 
gettingstarted](http://lucene.apache.org/solr/quickstart.html) example** 
+
+### Run Storm Topology
+
+STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar 
org.apache.storm.solr.topology.SolrFieldsTopology
+ 
+STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar 
org.apache.storm.solr.topology.SolrJsonTopology
+
+### Run Trident Topology
+
+STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar 
org.apache.storm.solr.trident.SolrFieldsTridentTopology
+
+STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar 
org.apache.storm.solr.trident.SolrJsonTridentTopology
+
+
+### Verify Results
+
+The aforementioned Storm and Trident topologies index the Solr 
`gettingstarted` collection with objects that have the following `id` pattern:
+
+\*id_fields_test_val\* for `SolrFieldsTopology` and  
`SolrFieldsTridentTopology`
+
+\*json_test_val\* for `SolrJsonTopology` and `SolrJsonTridentTopology`
+
+Querying  Solr for these patterns, you will see the values that have been 
indexed by the Storm Apache Solr integration: 
+
+curl -X GET -H "Content-type:application/json" -H "Accept:application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*&wt=json&indent=true
+
+curl -X GET -H "Content-type: application/json" -H "Accept: application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*&wt=json&indent=true
+
+You can also see the results by opening the Apache Solr UI and pasting the 
`id` pattern in the `q` textbox in the queries page
+
+http://localhost:8983/solr/#/gettingstarted_shard1_replica2/query
+

http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/storm-sql-internal.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql-internal.md b/docs/storm-sql-internal.md
new file mode 100644
index 0000000..08969c6
--- /dev/null
+++ b/docs/storm-sql-internal.md
@@ -0,0 +1,55 @@
+---
+title: The Internals of Storm SQL
+layout: documentation
+documentation: true
+---
+
+This page describes the design and the implementation of the Storm SQL 
integration.
+
+## Overview
+
+SQL is a well-adopted yet complicated standard. Several projects including 
Drill, Hive, Phoenix and Spark have invested significantly in their SQL layers. 
One of the main design goal of StormSQL is to leverage the existing investments 
for these projects. StormSQL leverages [Apache Calcite](///calcite.apache.org) 
to implement the SQL standard. StormSQL focuses on compiling the SQL statements 
to Storm / Trident topologies so that they can be executed in Storm clusters.
+
+Figure 1 describes the workflow of executing a SQL query in StormSQL. First, 
users provide a sequence of SQL statements. StormSQL parses the SQL statements 
and translates them to a Calcite logical plan. A logical plan consists of a 
sequence of SQL logical operators that describe how the query should be 
executed irrespective to the underlying execution engines. Some examples of 
logical operators include `TableScan`, `Filter`, `Projection` and `GroupBy`.
+
+<div align="center">
+<img title="Workflow of StormSQL" src="images/storm-sql-internal-workflow.png" 
style="max-width: 80rem"/>
+
+<p>Figure 1: Workflow of StormSQL.</p>
+</div>
+
+The next step is to compile the logical execution plan down to a physical 
execution plan. A physical plan consists of physical operators that describes 
how to execute the SQL query in *StormSQL*. Physical operators such as 
`Filter`, `Projection`, and `GroupBy` are directly mapped to operations in 
Trident topologies. StormSQL also compiles expressions in the SQL statements 
into Java byte codes and plugs them into the Trident topologies.
+
+Finally, StormSQL packages both the Java byte codes and the topology into a 
JAR and submits it to the Storm cluster. Storm schedules and executes the JAR 
in the same way of it executes other Storm topologies.
+
+The follow code blocks show an example query that filters and projects results 
from a Kafka stream.
+
+```
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY 
INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' ...
+
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 
'kafka://localhost:2181/brokers?topic=large_orders' ...
+
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS 
WHERE UNIT_PRICE * QUANTITY > 50
+```
+
+The first two SQL statements define the inputs and outputs of external data. 
Figure 2 describes the processes of how StormSQL takes the last `SELECT` query 
and compiles it down to Trident topology.
+
+<div align="center">
+<img title="Compiling the example query to Trident topology" 
src="images/storm-sql-internal-example.png" style="max-width: 80rem"/>
+
+<p>Figure 2: Compiling the example query to Trident topology.</p>
+</div>
+
+
+## Constraints of querying streaming tables
+
+There are several constraints when querying tables that represent a real-time 
data stream:
+
+* The `ORDER BY` clause cannot be applied to a stream.
+* There is at least one monotonic field in the `GROUP BY` clauses to allow 
StormSQL bounds the size of the buffer.
+
+For more information please refer to 
http://calcite.apache.org/docs/stream.html.
+
+## Dependency
+
+StormSQL does not ship the dependency of the external data sources in the 
packaged JAR. The users have to provide the dependency in the `extlib` 
directory of the worker node.

http://git-wip-us.apache.org/repos/asf/storm/blob/69dfb532/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
new file mode 100644
index 0000000..3ad9805
--- /dev/null
+++ b/docs/storm-sql.md
@@ -0,0 +1,97 @@
+---
+title: Storm SQL integration
+layout: documentation
+documentation: true
+---
+
+The Storm SQL integration allows users to run SQL queries over streaming data 
in Storm. Not only the SQL interface allows faster development cycles on 
streaming analytics, but also opens up the opportunities to unify batch data 
processing like [Apache Hive](///hive.apache.org) and real-time streaming data 
analytics.
+
+At a very high level StormSQL compiles the SQL queries to 
[Trident](Trident-API-Overview.html) topologies and executes them in Storm 
clusters. This document provides information of how to use StormSQL as end 
users. For people that are interested in more details in the design and the 
implementation of StormSQL please refer to the [this](storm-sql-internal.html) 
page.
+
+## Usage
+
+Run the ``storm sql`` command to compile SQL statements into Trident topology, 
and submit it to the Storm cluster
+
+```
+$ bin/storm sql <sql-file> <topo-name>
+```
+
+In which `sql-file` contains a list of SQL statements to be executed, and 
`topo-name` is the name of the topology.
+
+
+## Supported Features
+
+The following features are supported in the current repository:
+
+* Streaming from and to external data sources
+* Filtering tuples
+* Projections
+
+## Specifying External Data Sources
+
+In StormSQL data is represented by external tables. Users can specify data 
sources using the `CREATE EXTERNAL TABLE` statement. The syntax of `CREATE 
EXTERNAL TABLE` closely follows the one defined in [Hive Data Definition 
Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL):
+
+```
+CREATE EXTERNAL TABLE table_name field_list
+    [ STORED AS
+      INPUTFORMAT input_format_classname
+      OUTPUTFORMAT output_format_classname
+    ]
+    LOCATION location
+    [ TBLPROPERTIES tbl_properties ]
+    [ AS select_stmt ]
+```
+
+You can find detailed explanations of the properties in [Hive Data Definition 
Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). 
For example, the following statement specifies a Kafka spouts and sink:
+
+```
+CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 
'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
+```
+
+## Plugging in External Data Sources
+
+Users plug in external data sources through implementing the 
`ISqlTridentDataSource` interface and registers them using the mechanisms of 
Java's service loader. The external data source will be chosen based on the 
scheme of the URI of the tables. Please refer to the implementation of 
`storm-sql-kafka` for more details.
+
+## Example: Filtering Kafka Stream
+
+Let's say there is a Kafka stream that represents the transactions of orders. 
Each message in the stream contains the id of the order, the unit price of the 
product and the quantity of the orders. The goal is to filter orders where the 
transactions are significant and to insert these orders into another Kafka 
stream for further analysis.
+
+The user can specify the following SQL statements in the SQL file:
+
+```
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY 
INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 
'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS 
WHERE UNIT_PRICE * QUANTITY > 50
+```
+
+The first statement defines the table `ORDER` which represents the input 
stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path 
of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`). The 
`TBLPROPERTIES` clause specifies the configuration of 
[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs).
+Current implementation of `storm-sql-kafka` requires specifying both 
`LOCATION` and `TBLPROPERTIES` clauses even though the table is read-only or 
write-only.
+
+Similarly, the second statement specifies the table `LARGE_ORDERS` which 
represents the output stream. The third statement is a `SELECT` statement which 
defines the topology: it instructs StormSQL to filter all orders in the 
external table `ORDERS`, calculates the total price and inserts matching 
records into the Kafka stream specified by `LARGE_ORDER`.
+
+To run this example, users need to include the data sources (`storm-sql-kafka` 
in this case) and its dependency in the class path. One approach is to put the 
required jars into the `extlib` directory:
+
+```
+$ cp curator-client-2.5.0.jar curator-framework-2.5.0.jar zookeeper-3.4.6.jar
+ extlib/
+$ cp scala-library-2.10.4.jar kafka-clients-0.8.2.1.jar kafka_2.10-0.8.2.1.jar 
metrics-core-2.2.0.jar extlib/
+$ cp json-simple-1.1.1.jar extlib/
+$ cp jackson-annotations-2.6.0.jar extlib/
+$ cp storm-kafka-*.jar storm-sql-kafka-*.jar storm-sql-runtime-*.jar extlib/
+```
+
+The next step is to submit the SQL statements to StormSQL:
+
+```
+$ bin/storm sql order_filtering order_filtering.sql
+```
+
+By now you should be able to see the `order_filtering` topology in the Storm 
UI.
+
+## Current Limitations
+
+Aggregation, windowing and joining tables are yet to be implemented. 
Specifying parallelism hints in the topology is not yet supported. All 
processors have a parallelism hint of 1.
+
+Users also need to provide the dependency of the external data sources in the 
`extlib` directory. Otherwise the topology will fail to run because of 
`ClassNotFoundException`.
+
+The current implementation of the Kafka connector in StormSQL assumes both the 
input and the output are in JSON formats. The connector has not yet recognized 
the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.

Reply via email to