Author: bobby
Date: Fri Mar 18 17:50:56 2016
New Revision: 1735652
URL: http://svn.apache.org/viewvc?rev=1735652&view=rev
Log:
Added in missing files
Added:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Logs.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Pacemaker.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Resource_Aware_Scheduler_overview.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/State-checkpointing.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Windowing.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/cgroups_in_storm.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/distcache-blobstore.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/dynamic-log-level-settings.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/dynamic-worker-profiling.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/batched-stream.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/drpc-workflow.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_log_level_settings_1.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_log_level_settings_2.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_profiling_debugging_1.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_profiling_debugging_2.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_profiling_debugging_3.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/dynamic_profiling_debugging_4.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/eclipse-project-properties.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/example-of-a-running-topology.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/grouping.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/hdfs_blobstore.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/ld-library-path-eclipse-linux.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/local_blobstore.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/nimbus_ha_blobstore.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/nimbus_ha_leader_election_and_failover.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/nimbus_ha_topology_submission.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/relationships-worker-processes-executors-tasks.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/search-a-topology.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/search-for-a-single-worker-log.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/spout-vs-state.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/storm-cluster.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/storm-sql-internal-example.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/storm-sql-internal-workflow.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/topology-tasks.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/transactional-batches.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/transactional-commit-flow.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/transactional-design-2.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/transactional-spout-structure.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/trident-to-storm1.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/images/trident-to-storm2.png
(with props)
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/nimbus-ha-design.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-metrics-profiling-internal-actions.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-solr.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql-internal.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql.md
Modified:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Implementation-docs.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/STORM-UI-REST-API.md
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/index.md
Modified:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Implementation-docs.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Implementation-docs.md?rev=1735652&r1=1735651&r2=1735652&view=diff
==============================================================================
---
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Implementation-docs.md
(original)
+++
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Implementation-docs.md
Fri Mar 18 17:50:56 2016
@@ -9,11 +9,5 @@ This section of the wiki is dedicated to
- [Lifecycle of a topology](Lifecycle-of-a-topology.html)
- [Message passing implementation](Message-passing-implementation.html)
- [Metrics](Metrics.html)
-- How transactional topologies work
- - subtopology for TransactionalSpout
- - how state is stored in ZK
- - subtleties around what to do when emitting batches out of order
-- Unit testing
- - time simulation
- - complete-topology
- - tracker clusters
+- [Nimbus HA](nimbus-ha-design.html)
+- [Storm SQL](storm-sql-internal.html)
Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Logs.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Logs.md?rev=1735652&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Logs.md (added)
+++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Logs.md Fri Mar
18 17:50:56 2016
@@ -0,0 +1,30 @@
+---
+title: Storm Logs
+layout: documentation
+documentation: true
+---
+Logs in Storm are essential for tracking the status, operations, error
messages and debug information for all the
+daemons (e.g., nimbus, supervisor, logviewer, drpc, ui, pacemaker) and
topologies' workers.
+
+### Location of the Logs
+All the daemon logs are placed under ${storm.log.dir} directory, which an
administrator can set in the System properties or
+in the cluster configuration. By default, ${storm.log.dir} points to
${storm.home}/logs.
+
+All the worker logs are placed under the workers-artifacts directory in a
hierarchical manner, e.g.,
+${workers-artifacts}/${topologyId}/${port}/worker.log. Users can set the
workers-artifacts directory
+by configuring the variable "storm.workers.artifacts.dir". By default,
workers-artifacts directory
+locates at ${storm.log.dir}/logs/workers-artifacts.
+
+### Using the Storm UI for Log View/Download and Log Search
+Daemon and worker logs are allowed to view and download through Storm UI by
authorized users.
+
+To improve the debugging of Storm, we provide the Log Search feature.
+Log Search supports searching in a certain log file or in all of a topology's
log files:
+
+String search in a log file: In the log page for a worker, a user can search a
certain string, e.g., "Exception", in a certain worker log. This search can
happen for both normal text log or rolled zip log files. In the results, the
offset and matched lines will be displayed.
+
+
+
+Search in a topology: a user can also search a string for a certain topology
by clicking the icon of magnifying lens at the top right corner of the UI page.
This means the UI will try to search on all the supervisor nodes in a
distributed way to find the matched string in all logs for this topology. The
search can happen for either normal text log files or rolled zip log files by
checking/unchecking the "Search archived logs:" box. Then the matched results
can be shown on the UI with url links, directing the user to the certain logs
on each supervisor node. This powerful feature is very helpful for users to
find certain problematic supervisor nodes running this topology.
+
+
Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Pacemaker.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Pacemaker.md?rev=1735652&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Pacemaker.md
(added)
+++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Pacemaker.md
Fri Mar 18 17:50:56 2016
@@ -0,0 +1,113 @@
+---
+title: Pacemaker
+layout: documentation
+documentation: true
+---
+
+
+### Introduction
+Pacemaker is a storm daemon designed to process heartbeats from workers. As
Storm is scaled up, ZooKeeper begins to become a bottleneck due to high volumes
of writes from workers doing heartbeats. Lots of writes to disk and too much
traffic across the network is generated as ZooKeeper tries to maintain
consistency.
+
+Because heartbeats are of an ephemeral nature, they do not need to be
persisted to disk or synced across nodes; an in-memory store will do. This is
the role of Pacemaker. Pacemaker functions as a simple in-memory key/value
store with ZooKeeper-like, directory-style keys and byte array values.
+
+The corresponding Pacemaker client is a plugin for the `ClusterState`
interface, `org.apache.storm.pacemaker.pacemaker_state_factory`. Heartbeat
calls are funneled by the `ClusterState` produced by `pacemaker_state_factory`
into the Pacemaker daemon, while other set/get operations are forwarded to
ZooKeeper.
+
+------
+
+### Configuration
+
+ - `pacemaker.host` : The host that the Pacemaker daemon is running on
+ - `pacemaker.port` : The port that Pacemaker will listen on
+ - `pacemaker.max.threads` : Maximum number of threads Pacemaker daemon will
use to handle requests.
+ - `pacemaker.childopts` : Any JVM parameters that need to go to the
Pacemaker. (used by storm-deploy project)
+ - `pacemaker.auth.method` : The authentication method that is used (more info
below)
+
+#### Example
+
+To get Pacemaker up and running, set the following option in the cluster
config on all nodes:
+```
+storm.cluster.state.store: "org.apache.storm.pacemaker.pacemaker_state_factory"
+```
+
+The Pacemaker host also needs to be set on all nodes:
+```
+pacemaker.host: somehost.mycompany.com
+```
+
+And then start all of your daemons
+
+(including Pacemaker):
+```
+$ storm pacemaker
+```
+
+The Storm cluster should now be pushing all worker heartbeats through
Pacemaker.
+
+### Security
+
+Currently digest (password-based) and Kerberos security are supported.
Security is currently only around reads, not writes. Writes may be performed by
anyone, whereas reads may only be performed by authorized and authenticated
users. This is an area for future development, as it leaves the cluster open to
DoS attacks, but it prevents any sensitive information from reaching
unauthorized eyes, which was the main goal.
+
+#### Digest
+To configure digest authentication, set `pacemaker.auth.method: DIGEST` in the
cluster config on the nodes hosting Nimbus and Pacemaker.
+The nodes must also have `java.security.auth.login.config` set to point to a
JAAS config file containing the following structure:
+```
+PacemakerDigest {
+ username="some username"
+ password="some password";
+};
+```
+
+Any node with these settings configured will be able to read from Pacemaker.
+Worker nodes need not have these configs set, and may keep
`pacemaker.auth.method: NONE` set, since they do not need to read from the
Pacemaker daemon.
+
+#### Kerberos
+To configure Kerberos authentication, set `pacemaker.auth.method: KERBEROS` in
the cluster config on the nodes hosting Nimbus and Pacemaker.
+The nodes must also have `java.security.auth.login.config` set to point to a
JAAS config.
+
+The JAAS config on Nimbus must look something like this:
+```
+PacemakerClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/keytabs/nimbus.keytab"
+ storeKey=true
+ useTicketCache=false
+ serviceName="pacemaker"
+ principal="[email protected]";
+};
+
+```
+
+The JAAS config on Pacemaker must look something like this:
+```
+PacemakerServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/keytabs/pacemaker.keytab"
+ storeKey=true
+ useTicketCache=false
+ principal="[email protected]";
+};
+```
+
+ - The client's user principal in the `PacemakerClient` section on the Nimbus
host must match the `nimbus.daemon.user` storm cluster config value.
+ - The client's `serviceName` value must match the server's user principal in
the `PacemakerServer` section on the Pacemaker host.
+
+
+### Fault Tolerance
+
+Pacemaker runs as a single daemon instance, making it a potential Single Point
of Failure.
+
+If Pacemaker becomes unreachable by Nimbus, through crash or network
partition, the workers will continue to run, and Nimbus will repeatedly attempt
to reconnect. Nimbus functionality will be disrupted, but the topologies
themselves will continue to run.
+In case of partition of the cluster where Nimbus and Pacemaker are on the same
side of the partition, the workers that are on the other side of the partition
will not be able to heartbeat, and Nimbus will reschedule the tasks elsewhere.
This is probably what we want to happen anyway.
+
+
+### ZooKeeper Comparison
+Compared to ZooKeeper, Pacemaker uses less CPU, less memory, and of course no
disk for the same load, thanks to lack of overhead from maintaining consistency
between nodes.
+On Gigabit networking, there is a theoretical limit of about 6000 nodes.
However, the real limit is likely around 2000-3000 nodes. These limits have not
yet been tested.
+On a 270 supervisor cluster, fully scheduled with topologies, Pacemaker
resource utilization was 70% of one core and nearly 1GiB of RAM on a machine
with 4 `Intel(R) Xeon(R) CPU E5530 @ 2.40GHz` and 24GiB of RAM.
+
+
+There is an easy route to HA for Pacemaker. Unlike ZooKeeper, Pacemaker should
be able to scale horizontally without overhead. By contrast, with ZooKeeper,
there are diminishing returns when adding ZK nodes.
+
+In short, a single Pacemaker node should be able to handle many times the load
that a ZooKeeper cluster can, and future HA work allowing horizontal scaling
will increase that even farther.
Added:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Resource_Aware_Scheduler_overview.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Resource_Aware_Scheduler_overview.md?rev=1735652&view=auto
==============================================================================
---
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Resource_Aware_Scheduler_overview.md
(added)
+++
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Resource_Aware_Scheduler_overview.md
Fri Mar 18 17:50:56 2016
@@ -0,0 +1,232 @@
+---
+title: Resource Aware Scheduler
+layout: documentation
+documentation: true
+---
+# Introduction
+
+The purpose of this document is to provide a description of the Resource Aware
Scheduler for the Storm distributed real-time computation system. This
document will provide you with both a high level description of the resource
aware scheduler in Storm
+
+## Using Resource Aware Scheduler
+
+The user can switch to using the Resource Aware Scheduler by setting the
following in *conf/storm.yaml*
+
+ storm.scheduler:
âbacktype.storm.scheduler.resource.ResourceAwareSchedulerâ
+
+
+## API Overview
+
+For a Storm Topology, the user can now specify the amount of resources a
topology component (i.e. Spout or Bolt) is required to run a single instance of
the component. The user can specify the resource requirement for a topology
component by using the following API calls.
+
+### Setting Memory Requirement
+
+API to set component memory requirement:
+
+ public T setMemoryLoad(Number onHeap, Number offHeap)
+
+Parameters:
+* Number onHeap â The amount of on heap memory an instance of this component
will consume in megabytes
+* Number offHeap â The amount of off heap memory an instance of this
component will consume in megabytes
+
+The user also has to option to just specify the on heap memory requirement if
the component does not have an off heap memory need.
+
+ public T setMemoryLoad(Number onHeap)
+
+Parameters:
+* Number onHeap â The amount of on heap memory an instance of this component
will consume
+
+If no value is provided for offHeap, 0.0 will be used. If no value is provided
for onHeap, or if the API is never called for a component, the default value
will be used.
+
+Example of Usage:
+
+ SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
+ s1.setMemoryLoad(1024.0, 512.0);
+ builder.setBolt("exclaim1", new ExclamationBolt(), 3)
+ .shuffleGrouping("word").setMemoryLoad(512.0);
+
+The entire memory requested for this topology is 16.5 GB. That is from 10
spouts with 1GB on heap memory and 0.5 GB off heap memory each and 3 bolts with
0.5 GB on heap memory each.
+
+### Setting CPU Requirement
+
+
+API to set component CPU requirement:
+
+ public T setCPULoad(Double amount)
+
+Parameters:
+* Number amount â The amount of on CPU an instance of this component will
consume.
+
+Currently, the amount of CPU resources a component requires or is available on
a node is represented by a point system. CPU usage is a difficult concept to
define. Different CPU architectures perform differently depending on the task
at hand. They are so complex that expressing all of that in a single precise
portable number is impossible. Instead we take a convention over configuration
approach and are primarily concerned with rough level of CPU usage while still
providing the possibility to specify amounts more fine grained.
+
+By convention a CPU core typically will get 100 points. If you feel that your
processors are more or less powerful you can adjust this accordingly. Heavy
tasks that are CPU bound will get 100 points, as they can consume an entire
core. Medium tasks should get 50, light tasks 25, and tiny tasks 10. In some
cases you have a task that spawns other threads to help with processing. These
tasks may need to go above 100 points to express the amount of CPU they are
using. If these conventions are followed the common case for a single threaded
task the reported Capacity * 100 should be the number of CPU points that the
task needs.
+
+Example of Usage:
+
+ SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
+ s1.setCPULoad(15.0);
+ builder.setBolt("exclaim1", new ExclamationBolt(), 3)
+ .shuffleGrouping("word").setCPULoad(10.0);
+ builder.setBolt("exclaim2", new HeavyBolt(), 1)
+ .shuffleGrouping("exclaim1").setCPULoad(450.0);
+
+### Limiting the Heap Size per Worker (JVM) Process
+
+
+ public void setTopologyWorkerMaxHeapSize(Number size)
+
+Parameters:
+* Number size â The memory limit a worker process will be allocated in
megabytes
+
+The user can limit the amount of memory resources the resource aware scheduler
allocates to a single worker on a per topology basis by using the above API.
This API is in place so that the users can spread executors to multiple
workers. However, spreading executors to multiple workers may increase the
communication latency since executors will not be able to use Disruptor Queue
for intra-process communication.
+
+Example of Usage:
+
+ Config conf = new Config();
+ conf.setTopologyWorkerMaxHeapSize(512.0);
+
+### Setting Available Resources on Node
+
+A storm administrator can specify node resource availability by modifying the
*conf/storm.yaml* file located in the storm home directory of that node.
+
+A storm administrator can specify how much available memory a node has in
megabytes adding the following to *storm.yaml*
+
+ supervisor.memory.capacity.mb: [amount<Double>]
+
+A storm administrator can also specify how much available CPU resources a node
has available adding the following to *storm.yaml*
+
+ supervisor.cpu.capacity: [amount<Double>]
+
+
+Note: that the amount the user can specify for the available CPU is
represented using a point system like discussed earlier.
+
+Example of Usage:
+
+ supervisor.memory.capacity.mb: 20480.0
+ supervisor.cpu.capacity: 100.0
+
+
+### Other Configurations
+
+The user can set some default configurations for the Resource Aware Scheduler
in *conf/storm.yaml*:
+
+ //default value if on heap memory requirement is not specified for a
component
+ topology.component.resources.onheap.memory.mb: 128.0
+
+ //default value if off heap memory requirement is not specified for a
component
+ topology.component.resources.offheap.memory.mb: 0.0
+
+ //default value if CPU requirement is not specified for a component
+ topology.component.cpu.pcore.percent: 10.0
+
+ //default value for the max heap size for a worker
+ topology.worker.max.heap.size.mb: 768.0
+
+# Topology Priorities and Per User Resource
+
+The Resource Aware Scheduler or RAS also has multitenant capabilities since
many Storm users typically share a Storm cluster. Resource Aware Scheduler can
allocate resources on a per user basis. Each user can be guaranteed a certain
amount of resources to run his or her topologies and the Resource Aware
Scheduler will meet those guarantees when possible. When the Storm cluster has
extra free resources, Resource Aware Scheduler will to be able allocate
additional resources to user in a fair manner. The importance of topologies can
also vary. Topologies can be used for actual production or just
experimentation, thus Resource Aware Scheduler will take into account the
importance of a topology when determining the order in which to schedule
topologies or when to evict topologies
+
+## Setup
+
+The resource guarantees of a user can be specified
*conf/user-resource-pools.yaml*. Specify the resource guarantees of a user in
the following format:
+
+ resource.aware.scheduler.user.pools:
+ [UserId]
+ cpu: [Amount of Guarantee CPU Resources]
+ memory: [Amount of Guarantee Memory Resources]
+
+An example of what *user-resource-pools.yaml* can look like:
+
+ resource.aware.scheduler.user.pools:
+ jerry:
+ cpu: 1000
+ memory: 8192.0
+ derek:
+ cpu: 10000.0
+ memory: 32768
+ bobby:
+ cpu: 5000.0
+ memory: 16384.0
+
+Please note that the specified amount of Guaranteed CPU and Memory can be
either a integer or double
+
+## API Overview
+### Specifying Topology Priority
+The range of topology priorities can range form 0-29. The topologies
priorities will be partitioned into several priority levels that may contain a
range of priorities.
+For example we can create a priority level mapping:
+
+ PRODUCTION => 0 â 9
+ STAGING => 10 â 19
+ DEV => 20 â 29
+
+Thus, each priority level contains 10 sub priorities. Users can set the
priority level of a topology by using the following API
+
+ conf.setTopologyPriority(int priority)
+
+Parameters:
+* priority â an integer representing the priority of the topology
+
+Please note that the 0-29 range is not a hard limit. Thus, a user can set a
priority number that is higher than 29. However, the property of higher the
priority number, lower the importance still holds
+
+### Specifying Scheduling Strategy:
+
+A user can specify on a per topology basis what scheduling strategy to use.
Users can implement the IStrategy interface and define new strategies to
schedule specific topologies. This pluggable interface was created since we
realize different topologies might have different scheduling needs. A user can
set the topology strategy within the topology definition by using the API:
+
+ public void setTopologyStrategy(Class<? extends IStrategy> clazz)
+
+Parameters:
+* clazz â The strategy class that implements the IStrategy interface
+
+Example Usage:
+
+
conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+
+A default scheduling is provided. The DefaultResourceAwareStrategy is
implemented based off the scheduling algorithm in the original paper describing
resource aware scheduling in Storm:
+
+http://web.engr.illinois.edu/~bpeng/files/r-storm.pdf
+
+### Specifying Topology Prioritization Strategy
+
+The order of scheduling is a pluggable interface in which a user could define
a strategy that prioritizes topologies. For a user to define his or her own
prioritization strategy, he or she needs to implement the
ISchedulingPriorityStrategy interface. A user can set the scheduling priority
strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to
point to the class that implements the strategy. For instance:
+
+ resource.aware.scheduler.priority.strategy:
"backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
+
+A default strategy will be provided. The following explains how the default
scheduling priority strategy works.
+
+**DefaultSchedulingPriorityStrategy**
+
+The order of scheduling should be based on the distance between a userâs
current resource allocation and his or her guaranteed allocation. We should
prioritize the users who are the furthest away from their resource guarantee.
The difficulty of this problem is that a user may have multiple resource
guarantees, and another user can have another set of resource guarantees, so
how can we compare them in a fair manner? Let's use the average percentage of
resource guarantees satisfied as a method of comparison.
+
+For example:
+
+|User|Resource Guarantee|Resource Allocated|
+|----|------------------|------------------|
+|A|<10 CPU, 50GB>|<2 CPU, 40 GB>|
+|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>|
+
+User Aâs average percentage satisfied of resource guarantee:
+
+(2/10+40/50)/2 = 0.5
+
+User Bâs average percentage satisfied of resource guarantee:
+
+(15/20+10/25)/2 = 0.575
+
+Thus, in this example User A has a smaller average percentage of his or her
resource guarantee satisfied than User B. Thus, User A should get priority to
be allocated more resource, i.e., schedule a topology submitted by User A.
+
+When scheduling, RAS sorts users by the average percentage satisfied of
resource guarantee and schedule topologies from users based on that ordering
starting from the users with the lowest average percentage satisfied of
resource guarantee. When a userâs resource guarantee is completely
satisfied, the userâs average percentage satisfied of resource guarantee will
be greater than or equal to 1.
+
+### Specifying Eviction Strategy
+The eviction strategy is used when there are not enough free resources in the
cluster to schedule new topologies. If the cluster is full, we need a mechanism
to evict topologies so that user resource guarantees can be met and additional
resource can be shared fairly among users. The strategy for evicting topologies
is also a pluggable interface in which the user can implement his or her own
topology eviction strategy. For a user to implement his or her own eviction
strategy, he or she needs to implement the IEvictionStrategy Interface and set
*Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented
strategy class. For instance:
+
+ resource.aware.scheduler.eviction.strategy:
"backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
+
+A default eviction strategy is provided. The following explains how the
default topology eviction strategy works
+
+**DefaultEvictionStrategy**
+
+
+To determine if topology eviction should occur we should take into account the
priority of the topology that we are trying to schedule and whether the
resource guarantees for the owner of the topology have been met.
+
+We should never evict a topology from a user that does not have his or her
resource guarantees satisfied. The following flow chart should describe the
logic for the eviction process.
+
+
Modified:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/STORM-UI-REST-API.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/STORM-UI-REST-API.md?rev=1735652&r1=1735651&r2=1735652&view=diff
==============================================================================
---
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/STORM-UI-REST-API.md
(original)
+++
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/STORM-UI-REST-API.md
Fri Mar 18 17:50:56 2016
@@ -1,10 +1,9 @@
---
-title: Storm REST API
+title: Storm UI REST API
layout: documentation
documentation: true
---
-# Storm UI REST API
The Storm UI daemon provides a REST API that allows you to interact with a
Storm cluster, which includes retrieving
metrics data and configuration information as well as management operations
such as starting or stopping topologies.
@@ -90,7 +89,6 @@ Response fields:
|Field |Value|Description
|--- |--- |---
|stormVersion|String| Storm version|
-|nimbusUptime|String| Shows how long the cluster is running|
|supervisors|Integer| Number of supervisors running|
|topologies| Integer| Number of topologies running|
|slotsTotal| Integer|Total number of available worker slots|
@@ -104,7 +102,6 @@ Sample response:
```json
{
"stormVersion": "0.9.2-incubating-SNAPSHOT",
- "nimbusUptime": "3m 53s",
"supervisors": 1,
"slotsTotal": 4,
"slotsUsed": 3,
@@ -125,8 +122,13 @@ Response fields:
|id| String | Supervisor's id|
|host| String| Supervisor's host name|
|uptime| String| Shows how long the supervisor is running|
+|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
|slotsTotal| Integer| Total number of available worker slots for this
supervisor|
|slotsUsed| Integer| Number of worker slots used on this supervisor|
+|totalMem| Double| Total memory capacity on this supervisor|
+|totalCpu| Double| Total CPU capacity on this supervisor|
+|usedMem| Double| Used memory capacity on this supervisor|
+|usedCpu| Double| Used CPU capacity on this supervisor|
Sample response:
@@ -137,13 +139,74 @@ Sample response:
"id": "0b879808-2a26-442b-8f7d-23101e0c3696",
"host": "10.11.1.7",
"uptime": "5m 58s",
+ "uptimeSeconds": 358,
"slotsTotal": 4,
- "slotsUsed": 3
+ "slotsUsed": 3,
+ "totalMem": 3000,
+ "totalCpu": 400,
+ "usedMem": 1280,
+ "usedCPU": 160
+ }
+ ],
+ "schedulerDisplayResource": true
+}
+```
+
+### /api/v1/nimbus/summary (GET)
+
+Returns summary information for all nimbus hosts.
+
+Response fields:
+
+|Field |Value|Description|
+|--- |--- |---
+|host| String | Nimbus' host name|
+|port| int| Nimbus' port number|
+|status| String| Possible values are Leader, Not a Leader, Dead|
+|nimbusUpTime| String| Shows since how long the nimbus has been running|
+|nimbusUpTimeSeconds| String| Shows since how long the nimbus has been running
in seconds|
+|nimbusLogLink| String| Logviewer url to view the nimbus.log|
+|version| String| Version of storm this nimbus host is running|
+
+Sample response:
+
+```json
+{
+ "nimbuses":[
+ {
+ "host":"192.168.202.1",
+ "port":6627,
+ "nimbusLogLink":"http:\/\/192.168.202.1:8000\/log?file=nimbus.log",
+ "status":Leader,
+ "version":"0.10.0-SNAPSHOT",
+ "nimbusUpTime":"3m 33s",
+ "nimbusUpTimeSeconds":"213"
}
]
}
```
+### /api/v1/history/summary (GET)
+
+Returns a list of all running topologies' IDs submitted by the current user.
+
+Response fields:
+
+|Field |Value | Description|
+|--- |--- |---
+|topo-history| List| List of Topologies' IDs|
+
+Sample response:
+
+```json
+{
+ "topo-history":[
+ "wc6-1-1446571009",
+ "wc8-2-1446587178"
+ ]
+}
+```
+
### /api/v1/topology/summary (GET)
Returns summary information for all topologies.
@@ -156,9 +219,19 @@ Response fields:
|name| String| Topology Name|
|status| String| Topology Status|
|uptime| String| Shows how long the topology is running|
+|uptimeSeconds| Integer| Shows how long the topology is running in seconds|
|tasksTotal| Integer |Total number of tasks for this topology|
|workersTotal| Integer |Number of workers used for this topology|
|executorsTotal| Integer |Number of executors used for this topology|
+|replicationCount| Integer |Number of nimbus hosts on which this topology code
is replicated|
+|requestedMemOnHeap| Double|Requested On-Heap Memory by User (MB)
+|requestedMemOffHeap| Double|Requested Off-Heap Memory by User (MB)|
+|requestedTotalMem| Double|Requested Total Memory by User (MB)|
+|requestedCpu| Double|Requested CPU by User (%)|
+|assignedMemOnHeap| Double|Assigned On-Heap Memory by Scheduler (MB)|
+|assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)|
+|assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)|
+|assignedCpu| Double|Assigned CPU by Scheduler (%)|
Sample response:
@@ -170,11 +243,55 @@ Sample response:
"name": "WordCount3",
"status": "ACTIVE",
"uptime": "6m 5s",
+ "uptimeSeconds": 365,
"tasksTotal": 28,
"workersTotal": 3,
- "executorsTotal": 28
+ "executorsTotal": 28,
+ "replicationCount": 1,
+ "requestedMemOnHeap": 640,
+ "requestedMemOffHeap": 128,
+ "requestedTotalMem": 768,
+ "requestedCpu": 80,
+ "assignedMemOnHeap": 640,
+ "assignedMemOffHeap": 128,
+ "assignedTotalMem": 768,
+ "assignedCpu": 80
}
]
+ "schedulerDisplayResource": true
+}
+```
+
+### /api/v1/topology-workers/:id (GET)
+
+Returns the worker' information (host and port) for a topology.
+
+Response fields:
+
+|Field |Value | Description|
+|--- |--- |---
+|hostPortList| List| Workers' information for a topology|
+|name| Integer| Logviewer Port|
+
+Sample response:
+
+```json
+{
+ "hostPortList":[
+ {
+ "host":"192.168.202.2",
+ "port":6701
+ },
+ {
+ "host":"192.168.202.2",
+ "port":6702
+ },
+ {
+ "host":"192.168.202.3",
+ "port":6700
+ }
+ ],
+ "logviewerPort":8000
}
```
@@ -198,12 +315,14 @@ Response fields:
|id| String| Topology Id|
|name| String |Topology Name|
|uptime| String |How long the topology has been running|
+|uptimeSeconds| Integer |How long the topology has been running in seconds|
|status| String |Current status of the topology, e.g. "ACTIVE"|
|tasksTotal| Integer |Total number of tasks for this topology|
|workersTotal| Integer |Number of workers used for this topology|
|executorsTotal| Integer |Number of executors used for this topology|
|msgTimeout| Integer | Number of seconds a tuple has before the spout
considers it failed |
|windowHint| String | window param value in "hh mm ss" format. Default value
is "All Time"|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource
information|
|topologyStats| Array | Array of all the topology related stats per time
window|
|topologyStats.windowPretty| String |Duration passed in HH:MM:SS format|
|topologyStats.window| String |User requested time window for metrics|
@@ -237,6 +356,7 @@ Response fields:
|bolts.errorLapsedSecs| Integer |Number of seconds elapsed since that last
error happened in a bolt|
|bolts.errorWorkerLogLink| String | Link to the worker log that reported the
exception |
|bolts.emitted| Long |Number of tuples emitted|
+|replicationCount| Integer |Number of nimbus hosts on which this topology code
is replicated|
Examples:
@@ -258,8 +378,10 @@ Sample response:
"tasksTotal": 28,
"executorsTotal": 28,
"uptime": "29m 19s",
+ "uptimeSeconds": 1759,
"msgTimeout": 30,
"windowHint": "10m 0s",
+ "schedulerDisplayResource": true,
"topologyStats": [
{
"windowPretty": "10m 0s",
@@ -371,7 +493,9 @@ Sample response:
"nimbus.inbox.jar.expiration.secs": 3600,
"drpc.worker.threads": 64,
"topology.worker.shared.thread.pool.size": 4,
- "nimbus.host": "hw10843.local",
+ "nimbus.seeds": [
+ "hw10843.local"
+ ],
"storm.messaging.netty.min_wait_ms": 100,
"storm.zookeeper.port": 2181,
"transactional.zookeeper.port": null,
@@ -381,7 +505,8 @@ Sample response:
"storm.zookeeper.retry.intervalceiling.millis": 30000,
"supervisor.enable": true,
"storm.messaging.netty.server_worker_threads": 1
- }
+ },
+ "replicationCount": 1
}
```
@@ -407,7 +532,7 @@ Response fields:
|windowHint| String | window param value in "hh mm ss" format. Default value
is "All Time"|
|executors| Integer |Number of executor tasks in the component|
|componentErrors| Array of Errors | List of component errors|
-|componentErrors.time| Long | Timestamp when the exception occurred |
+|componentErrors.errorTime| Long | Timestamp when the exception occurred
(Prior to 0.11.0, this field was named 'time'.)|
|componentErrors.errorHost| String | host name for the error|
|componentErrors.errorPort| String | port for the error|
|componentErrors.error| String |Shows the error happened in a component|
@@ -430,6 +555,10 @@ Response fields:
|boltStats.processLatency| String (double value returned in String format)
|Average time of the bolt to ack a message after it was received|
|boltStats.acked| Long |Number of messages acked|
|boltStats.failed| Long |Number of messages failed|
+|profilingAndDebuggingCapable| Boolean |true if there is support for Profiling
and Debugging Actions|
+|profileActionEnabled| Boolean |true if worker profiling (Java Flight
Recorder) is enabled|
+|profilerActive| Array |Array of currently active Profiler Actions|
+
Examples:
@@ -448,7 +577,7 @@ Sample response:
"componentType": "spout",
"windowHint": "10m 0s",
"executors": 5,
- "componentErrors":[{"time": 1406006074000,
+ "componentErrors":[{"errorTime": 1406006074000,
"errorHost": "10.11.1.70",
"errorPort": 6701,
"errorWorkerLogLink":
"http://10.11.1.7:8000/log?file=worker-6701.log",
@@ -458,6 +587,16 @@ Sample response:
"topologyId": "WordCount3-1-1402960825",
"tasks": 5,
"window": "600",
+ "profilerActive": [
+ {
+ "host": "10.11.1.70",
+ "port": "6701",
+
"dumplink":"http:\/\/10.11.1.70:8000\/dumps\/ex-1-1452718803\/10.11.1.70%3A6701",
+ "timestamp":"576328"
+ }
+ ],
+ "profilingAndDebuggingCapable": true,
+ "profileActionEnabled": true,
"spoutSummary": [
{
"windowPretty": "10m 0s",
@@ -524,6 +663,7 @@ Sample response:
"host": "10.11.1.7",
"acked": 0,
"uptime": "43m 4s",
+ "uptimeSeconds": 2584,
"id": "[24-24]",
"failed": 0
},
@@ -536,6 +676,7 @@ Sample response:
"host": "10.11.1.7",
"acked": 0,
"uptime": "42m 57s",
+ "uptimeSeconds": 2577,
"id": "[25-25]",
"failed": 0
},
@@ -548,6 +689,7 @@ Sample response:
"host": "10.11.1.7",
"acked": 0,
"uptime": "42m 57s",
+ "uptimeSeconds": 2577,
"id": "[26-26]",
"failed": 0
},
@@ -560,6 +702,7 @@ Sample response:
"host": "10.11.1.7",
"acked": 0,
"uptime": "43m 4s",
+ "uptimeSeconds": 2584,
"id": "[27-27]",
"failed": 0
},
@@ -572,6 +715,7 @@ Sample response:
"host": "10.11.1.7",
"acked": 0,
"uptime": "42m 57s",
+ "uptimeSeconds": 2577,
"id": "[28-28]",
"failed": 0
}
@@ -579,6 +723,201 @@ Sample response:
}
```
+## Profiling and Debugging GET Operations
+
+### /api/v1/topology/:id/profiling/start/:host-port/:timeout (GET)
+
+Request to start profiler on worker with timeout. Returns status and link to
profiler artifacts for worker.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+|timeout |String (required)| Time out for profiler to stop in minutes |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+|timeout | String | Requested timeout
+|dumplink | String | Link to logviewer URL for worker profiler documents.|
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/10
+2.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/5
+3.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/20
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+ "timeout": "10",
+ "dumplink":
"http:\/\/10.11.1.7:8000\/dumps\/wordcount-1-1446614150\/10.11.1.7%3A6701"
+}
+```
+
+### /api/v1/topology/:id/profiling/dumpprofile/:host-port (GET)
+
+Request to dump profiler recording on worker. Returns status and worker id for
the request.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpprofile/10.11.1.7:6701
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+}
+```
+
+### /api/v1/topology/:id/profiling/stop/:host-port (GET)
+
+Request to stop profiler on worker. Returns status and worker id for the
request.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/stop/10.11.1.7:6701
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+}
+```
+
+### /api/v1/topology/:id/profiling/dumpjstack/:host-port (GET)
+
+Request to dump jstack on worker. Returns status and worker id for the request.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpjstack/10.11.1.7:6701
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+}
+```
+
+### /api/v1/topology/:id/profiling/dumpheap/:host-port (GET)
+
+Request to dump heap (jmap) on worker. Returns status and worker id for the
request.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpheap/10.11.1.7:6701
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+}
+```
+
+### /api/v1/topology/:id/profiling/restartworker/:host-port (GET)
+
+Request to request the worker. Returns status and worker id for the request.
+
+|Parameter |Value |Description |
+|----------|--------|-------------|
+|id |String (required)| Topology Id |
+|host-port |String (required)| Worker Id |
+
+Response fields:
+
+|Field |Value |Description|
+|----- |----- |-----------|
+|id | String | Worker id|
+|status | String | Response Status |
+
+Examples:
+
+```no-highlight
+1.
http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/restartworker/10.11.1.7:6701
+```
+
+Sample response:
+
+```json
+{
+ "status": "ok",
+ "id": "10.11.1.7:6701",
+}
+```
+
## POST Operations
### /api/v1/topology/:id/activate (POST)
Added:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/State-checkpointing.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/State-checkpointing.md?rev=1735652&view=auto
==============================================================================
---
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/State-checkpointing.md
(added)
+++
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/State-checkpointing.md
Fri Mar 18 17:50:56 2016
@@ -0,0 +1,160 @@
+---
+title: Storm State Management
+layout: documentation
+documentation: true
+---
+# State support in core storm
+Storm core has abstractions for bolts to save and retrieve the state of its
operations. There is a default in-memory
+based state implementation and also a Redis backed implementation that
provides state persistence.
+
+## State management
+Bolts that requires its state to be managed and persisted by the framework
should implement the `IStatefulBolt` interface or
+extend the `BaseStatefulBolt` and implement `void initState(T state)` method.
The `initState` method is invoked by the framework
+during the bolt initialization with the previously saved state of the bolt.
This is invoked after prepare but before the bolt starts
+processing any tuples.
+
+Currently the only kind of `State` implementation that is supported is
`KeyValueState` which provides key-value mapping.
+
+For example a word count bolt could use the key value state abstraction for
the word counts as follows.
+
+1. Extend the BaseStatefulBolt and type parameterize it with KeyValueState
which would store the mapping of word to count.
+2. The bolt gets initialized with its previously saved state in the init
method. This will contain the word counts
+last committed by the framework during the previous run.
+3. In the execute method, update the word count.
+
+ ```java
+ public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String,
Long>> {
+ private KeyValueState<String, Long> wordCounts;
+ private OutputCollector collector;
+ ...
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
+ this.collector = collector;
+ }
+ @Override
+ public void initState(KeyValueState<String, Long> state) {
+ wordCounts = state;
+ }
+ @Override
+ public void execute(Tuple tuple) {
+ String word = tuple.getString(0);
+ Integer count = wordCounts.get(word, 0);
+ count++;
+ wordCounts.put(word, count);
+ collector.emit(tuple, new Values(word, count));
+ collector.ack(tuple);
+ }
+ ...
+ }
+ ```
+4. The framework periodically checkpoints the state of the bolt (default every
second). The frequency
+can be changed by setting the storm config
`topology.state.checkpoint.interval.ms`
+5. For state persistence, use a state provider that supports persistence by
setting the `topology.state.provider` in the
+storm config. E.g. for using Redis based key-value state implementation set
`topology.state.provider:
org.apache.storm.redis.state.RedisKeyValueStateProvider`
+in storm.yaml. The provider implementation jar should be in the class path,
which in this case means putting the `storm-redis-*.jar`
+in the extlib directory.
+6. The state provider properties can be overridden by setting
`topology.state.provider.config`. For Redis state this is a
+json config with the following properties.
+
+ ```
+ {
+ "keyClass": "Optional fully qualified class name of the Key type.",
+ "valueClass": "Optional fully qualified class name of the Value type.",
+ "keySerializerClass": "Optional Key serializer implementation class.",
+ "valueSerializerClass": "Optional Value Serializer implementation class.",
+ "jedisPoolConfig": {
+ "host": "localhost",
+ "port": 6379,
+ "timeout": 2000,
+ "database": 0,
+ "password": "xyz"
+ }
+ }
+ ```
+
+## Checkpoint mechanism
+Checkpoint is triggered by an internal checkpoint spout at the specified
`topology.state.checkpoint.interval.ms`. If there is
+at-least one `IStatefulBolt` in the topology, the checkpoint spout is
automatically added by the topology builder . For stateful topologies,
+the topology builder wraps the `IStatefulBolt` in a `StatefulBoltExecutor`
which handles the state commits on receiving the checkpoint tuples.
+The non stateful bolts are wrapped in a `CheckpointTupleForwarder` which just
forwards the checkpoint tuples so that the checkpoint tuples
+can flow through the topology DAG. The checkpoint tuples flow through a
separate internal stream namely `$checkpoint`. The topology builder
+wires the checkpoint stream across the whole topology with the checkpoint
spout at the root.
+
+```
+ default default default
+[spout1] ---------------> [statefulbolt1] ----------> [bolt1]
--------------> [statefulbolt2]
+ | ----------> -------------->
+ | ($chpt) ($chpt)
+ |
+[$checkpointspout] _______| ($chpt)
+```
+
+At checkpoint intervals the checkpoint tuples are emitted by the checkpoint
spout. On receiving a checkpoint tuple, the state of the bolt
+is saved and then the checkpoint tuple is forwarded to the next component.
Each bolt waits for the checkpoint to arrive on all its input
+streams before it saves its state so that the state represents a consistent
state across the topology. Once the checkpoint spout receives
+ACK from all the bolts, the state commit is complete and the transaction is
recorded as committed by the checkpoint spout.
+
+The state commit works like a three phase commit protocol with a prepare and
commit phase so that the state across the topology is saved
+in a consistent and atomic manner.
+
+### Recovery
+The recovery phase is triggered when the topology is started for the first
time. If the previous transaction was not successfully
+prepared, a `rollback` message is sent across the topology so that if a bolt
has some prepared transactions it can be discarded.
+If the previous transaction was prepared successfully but not committed, a
`commit` message is sent across the topology so that
+the prepared transactions can be committed. After these steps are complete,
the bolts are initialized with the state.
+
+The recovery is also triggered if one of the bolts fails to acknowledge the
checkpoint message or say a worker crashed in
+the middle. Thus when the worker is restarted by the supervisor, the
checkpoint mechanism makes sure that the bolt gets
+initialized with its previous state and the checkpointing continues from the
point where it left off.
+
+### Guarantee
+Storm relies on the acking mechanism to replay tuples in case of failures. It
is possible that the state is committed
+but the worker crashes before acking the tuples. In this case the tuples are
replayed causing duplicate state updates.
+Also currently the StatefulBoltExecutor continues to process the tuples from a
stream after it has received a checkpoint
+tuple on one stream while waiting for checkpoint to arrive on other input
streams for saving the state. This can also cause
+duplicate state updates during recovery.
+
+The state abstraction does not eliminate duplicate evaluations and currently
provides only at-least once guarantee.
+
+In order to provide the at-least once guarantee, all bolts in a stateful
topology are expected to anchor the tuples while emitting and ack the input
tuples once its processed. For non-stateful bolts, the anchoring/acking can be
automatically managed by extending the `BaseBasicBolt`. Stateful bolts are
expected to anchor tuples while emitting and ack the tuple after processing
like in the `WordCountBolt` example in the State management section above.
+
+### IStateful bolt hooks
+IStateful bolt interface provides hook methods where in the stateful bolts
could implement some custom actions.
+```java
+ /**
+ * This is a hook for the component to perform some actions just before the
+ * framework commits its state.
+ */
+ void preCommit(long txid);
+
+ /**
+ * This is a hook for the component to perform some actions just before the
+ * framework prepares its state.
+ */
+ void prePrepare(long txid);
+
+ /**
+ * This is a hook for the component to perform some actions just before the
+ * framework rolls back the prepared state.
+ */
+ void preRollback();
+```
+This is optional and stateful bolts are not expected to provide any
implementation. This is provided so that other
+system level components can be built on top of the stateful abstractions where
we might want to take some actions before the
+stateful bolt's state is prepared, committed or rolled back.
+
+## Providing custom state implementations
+Currently the only kind of `State` implementation supported is `KeyValueState`
which provides key-value mapping.
+
+Custom state implementations should provide implementations for the methods
defined in the `org.apache.storm.State` interface.
+These are the `void prepareCommit(long txid)`, `void commit(long txid)`,
`rollback()` methods. `commit()` method is optional
+and is useful if the bolt manages the state on its own. This is currently used
only by the internal system bolts,
+for e.g. the CheckpointSpout to save its state.
+
+`KeyValueState` implementation should also implement the methods defined in
the `org.apache.storm.state.KeyValueState` interface.
+
+### State provider
+The framework instantiates the state via the corresponding `StateProvider`
implementation. A custom state should also provide
+a `StateProvider` implementation which can load and return the state based on
the namespace. Each state belongs to a unique namespace.
+The namespace is typically unique per task so that each task can have its own
state. The StateProvider and the corresponding
+State implementation should be available in the class path of Storm (by
placing them in the extlib directory).
Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Windowing.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Windowing.md?rev=1735652&view=auto
==============================================================================
--- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Windowing.md
(added)
+++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/Windowing.md
Fri Mar 18 17:50:56 2016
@@ -0,0 +1,239 @@
+---
+title: Windowing Support in Core Storm
+layout: documentation
+documentation: true
+---
+
+Storm core has support for processing a group of tuples that falls within a
window. Windows are specified with the
+following two parameters,
+
+1. Window length - the length or duration of the window
+2. Sliding interval - the interval at which the windowing slides
+
+## Sliding Window
+
+Tuples are grouped in windows and window slides every sliding interval. A
tuple can belong to more than one window.
+
+For example a time duration based sliding window with length 10 secs and
sliding interval of 5 seconds.
+
+```
+| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
+0 5 10 15 -> time
+
+|<------- w1 -------->|
+ |------------ w2 ------->|
+```
+
+The window is evaluated every 5 seconds and some of the tuples in the first
window overlaps with the second one.
+
+
+## Tumbling Window
+
+Tuples are grouped in a single window based on time or count. Any tuple
belongs to only one of the windows.
+
+For example a time duration based tumbling window with length 5 secs.
+
+```
+| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
+0 5 10 15 -> time
+ w1 w2 w3
+```
+
+The window is evaluated every five seconds and none of the windows overlap.
+
+Storm supports specifying the window length and sliding intervals as a count
of the number of tuples or as a time duration.
+
+The bolt interface `IWindowedBolt` is implemented by bolts that needs
windowing support.
+
+```java
+public interface IWindowedBolt extends IComponent {
+ void prepare(Map stormConf, TopologyContext context, OutputCollector
collector);
+ /**
+ * Process tuples falling within the window and optionally emit
+ * new tuples based on the tuples in the input window.
+ */
+ void execute(TupleWindow inputWindow);
+ void cleanup();
+}
+```
+
+Every time the window activates, the `execute` method is invoked. The
TupleWindow parameter gives access to the current tuples
+in the window, the tuples that expired and the new tuples that are added since
last window was computed which will be useful
+for efficient windowing computations.
+
+Bolts that needs windowing support typically would extend `BaseWindowedBolt`
which has the apis for specifying the
+window length and sliding intervals.
+
+E.g.
+
+```java
+public class SlidingWindowBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ for(Tuple tuple: inputWindow.get()) {
+ // do the windowing computation
+ ...
+ }
+ // emit the results
+ collector.emit(new Values(computedValue));
+ }
+}
+
+public static void main(String[] args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new RandomSentenceSpout(), 1);
+ builder.setBolt("slidingwindowbolt",
+ new SlidingWindowBolt().withWindow(new Count(30), new
Count(10)),
+ 1).shuffleGrouping("spout");
+ Config conf = new Config();
+ conf.setDebug(true);
+ conf.setNumWorkers(1);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
+
+}
+```
+
+The following window configurations are supported.
+
+```java
+withWindow(Count windowLength, Count slidingInterval)
+Tuple count based sliding window that slides after `slidingInterval` number of
tuples.
+
+withWindow(Count windowLength)
+Tuple count based window that slides with every incoming tuple.
+
+withWindow(Count windowLength, Duration slidingInterval)
+Tuple count based sliding window that slides after `slidingInterval` time
duration.
+
+withWindow(Duration windowLength, Duration slidingInterval)
+Time duration based sliding window that slides after `slidingInterval` time
duration.
+
+withWindow(Duration windowLength)
+Time duration based window that slides with every incoming tuple.
+
+withWindow(Duration windowLength, Count slidingInterval)
+Time duration based sliding window configuration that slides after
`slidingInterval` number of tuples.
+
+withTumblingWindow(BaseWindowedBolt.Count count)
+Count based tumbling window that tumbles after the specified count of tuples.
+
+withTumblingWindow(BaseWindowedBolt.Duration duration)
+Time duration based tumbling window that tumbles after the specified time
duration.
+
+```
+
+## Tuple timestamp and out of order tuples
+By default the timestamp tracked in the window is the time when the tuple is
processed by the bolt. The window calculations
+are performed based on the processing timestamp. Storm has support for
tracking windows based on the source generated timestamp.
+
+```java
+/**
+* Specify a field in the tuple that represents the timestamp as a long value.
If this
+* field is not present in the incoming tuple, an {@link
IllegalArgumentException} will be thrown.
+*
+* @param fieldName the name of the field that contains the timestamp
+*/
+public BaseWindowedBolt withTimestampField(String fieldName)
+```
+
+The value for the above `fieldName` will be looked up from the incoming tuple
and considered for windowing calculations.
+If the field is not present in the tuple an exception will be thrown. Along
with the timestamp field name, a time lag parameter
+can also be specified which indicates the max time limit for tuples with out
of order timestamps.
+
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05`
no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`,
it will be treated as a late tuple and not processed. Currently the late
+ tuples are just logged in the worker log files at INFO level.
+
+```java
+/**
+* Specify the maximum time lag of the tuple timestamp in milliseconds. It
means that the tuple timestamps
+* cannot be out of order by more than this amount.
+*
+* @param duration the max lag duration
+*/
+public BaseWindowedBolt withLag(Duration duration)
+```
+
+### Watermarks
+For processing tuples with timestamp field, storm internally computes
watermarks based on the incoming tuple timestamp. Watermark is
+the minimum of the latest tuple timestamps (minus the lag) across all the
input streams. At a higher level this is similar to the watermark concept
+used by Flink and Google's MillWheel for tracking event based timestamps.
+
+Periodically (default every sec), the watermark timestamps are emitted and
this is considered as the clock tick for the window calculation if
+tuple based timestamps are in use. The interval at which watermarks are
emitted can be changed with the below api.
+
+```java
+/**
+* Specify the watermark event generation interval. For tuple based timestamps,
watermark events
+* are used to track the progress of time
+*
+* @param interval the interval at which watermark events are generated
+*/
+public BaseWindowedBolt withWatermarkInterval(Duration interval)
+```
+
+
+When a watermark is received, all windows up to that timestamp will be
evaluated.
+
+For example, consider tuple timestamp based processing with following window
parameters,
+
+`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s,
max lag = 5s`
+
+```
+|-----|-----|-----|-----|-----|-----|-----|
+0 10 20 30 40 50 60 70
+````
+
+Current ts = `09:00:00`
+
+Tuples `e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26),
e6(6:00:36)` are received between `9:00:00` and `9:00:01`
+
+At time t = `09:00:01`, watermark w1 = `6:00:31` is emitted since no tuples
earlier than `6:00:31` can arrive.
+
+Three windows will be evaluated. The first window end ts (06:00:10) is
computed by taking the earliest event timestamp (06:00:03)
+and computing the ceiling based on the sliding interval (10s).
+
+1. `5:59:50 - 06:00:10` with tuples e1, e2, e3
+2. `6:00:00 - 06:00:20` with tuples e1, e2, e3, e4
+3. `6:00:10 - 06:00:30` with tuples e4, e5
+
+e6 is not evaluated since watermark timestamp `6:00:31` is older than the
tuple ts `6:00:36`.
+
+Tuples `e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` are received
between `9:00:01` and `9:00:02`
+
+At time t = `09:00:02` another watermark w2 = `08:00:34` is emitted since no
tuples earlier than `8:00:34` can arrive now.
+
+Three windows will be evaluated,
+
+1. `6:00:20 - 06:00:40` with tuples e5, e6 (from earlier batch)
+2. `6:00:30 - 06:00:50` with tuple e6 (from earlier batch)
+3. `8:00:10 - 08:00:30` with tuples e7, e8, e9
+
+e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time
`8:00:34`.
+
+The window calculation considers the time gaps and computes the windows based
on the tuple timestamp.
+
+## Guarentees
+The windowing functionality in storm core currently provides at-least once
guarentee. The values emitted from the bolts
+`execute(TupleWindow inputWindow)` method are automatically anchored to all
the tuples in the inputWindow. The downstream
+bolts are expected to ack the received tuple (i.e the tuple emitted from the
windowed bolt) to complete the tuple tree.
+If not the tuples will be replayed and the windowing computation will be
re-evaluated.
+
+The tuples in the window are automatically acked when the expire, i.e. when
they fall out of the window after
+`windowLength + slidingInterval`. Note that the configuration
`topology.message.timeout.secs` should be sufficiently more
+than `windowLength + slidingInterval` for time based windows; otherwise the
tuples will timeout and get replayed and can result
+in duplicate evaluations. For count based windows, the configuration should be
adjusted such that `windowLength + slidingInterval`
+tuples can be received within the timeout period.
+
+## Example topology
+An example toplogy `SlidingWindowTopology` shows how to use the apis to
compute a sliding window sum and a tumbling window
+average.
+
Added:
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/cgroups_in_storm.md
URL:
http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/cgroups_in_storm.md?rev=1735652&view=auto
==============================================================================
---
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/cgroups_in_storm.md
(added)
+++
storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/cgroups_in_storm.md
Fri Mar 18 17:50:56 2016
@@ -0,0 +1,71 @@
+---
+title: CGroup Enforcement
+layout: documentation
+documentation: true
+---
+
+# CGroups in Storm
+
+CGroups are used by Storm to limit the resource usage of workers to guarantee
fairness and QOS.
+
+**Please note: CGroups is currently supported only on Linux platforms (kernel
version 2.6.24 and above)**
+
+## Setup
+
+To use CGroups make sure to install cgroups and configure cgroups correctly.
For more information about setting up and configuring, please visit:
+
+https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ch-Using_Control_Groups.html
+
+A sample/default cgconfig.conf file is supplied in the <stormroot>/conf
directory. The contents are as follows:
+
+```
+mount {
+ cpuset = /cgroup/cpuset;
+ cpu = /cgroup/storm_resources;
+ cpuacct = /cgroup/cpuacct;
+ memory = /cgroup/storm_resources;
+ devices = /cgroup/devices;
+ freezer = /cgroup/freezer;
+ net_cls = /cgroup/net_cls;
+ blkio = /cgroup/blkio;
+}
+
+group storm {
+ perm {
+ task {
+ uid = 500;
+ gid = 500;
+ }
+ admin {
+ uid = 500;
+ gid = 500;
+ }
+ }
+ cpu {
+ }
+}
+```
+
+For a more detailed explanation of the format and configs for the
cgconfig.conf file, please visit:
+
+https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ch-Using_Control_Groups.html#The_cgconfig.conf_File
+
+# Settings Related To CGroups in Storm
+
+| Setting | Function
|
+|-------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| storm.cgroup.enable | This config is used to set whether or
not cgroups will be used. Set "true" to enable use of cgroups. Set "false" to
not use cgroups. When this config is set to false, unit tests related to
cgroups will be skipped. Default set to "false"
|
+| storm.cgroup.hierarchy.dir | The path to the cgroup hierarchy that storm
will use. Default set to "/cgroup/storm_resources"
|
+| storm.cgroup.resources | A list of subsystems that will be regulated
by CGroups. Default set to cpu and memory. Currently only cpu and memory are
supported
|
+| storm.supervisor.cgroup.rootdir | The root cgroup used by the
supervisor. The path to the cgroup will be
\<storm.cgroup.hierarchy.dir>/\<storm.supervisor.cgroup.rootdir>. Default set
to "storm"
|
+| storm.cgroup.cgexec.cmd | Absolute path to the cgexec command
used to launch workers within a cgroup. Default set to "/bin/cgexec"
|
+| storm.worker.cgroup.memory.mb.limit | The memory limit in MB for each
worker. This can be set on a per supervisor node basis. This config is used
to set the cgroup config memory.limit_in_bytes. For more details about
memory.limit_in_bytes, please visit:
https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/sec-memory.html.
Please note, if you are using the Resource Aware Scheduler, please do NOT
set this config as this config will override the values calculated by the
Resource Aware Scheduler |
+| storm.worker.cgroup.cpu.limit | The cpu share for each worker. This
can be set on a per supervisor node basis. This config is used to set the
cgroup config cpu.share. For more details about cpu.share, please visit:
https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/sec-cpu.html.
Please note, if you are using the Resource Aware Scheduler, please do NOT set
this config as this config will override the values calculated by the Resource
Aware Scheduler. |
+
+Since limiting CPU usage via cpu.shares only limits the proportional CPU usage
of a process, to limit the amount of CPU usage of all the worker processes on a
supervisor node, please set the config supervisor.cpu.capacity. Where each
increment represents 1% of a core thus if a user sets supervisor.cpu.capacity:
200, the user is indicating the use of 2 cores.
+
+## Integration with Resource Aware Scheduler
+
+CGroups can be used in conjunction with the Resource Aware Scheduler. CGroups
will then enforce the resource usage of workers as allocated by the Resource
Aware Scheduler. To use cgroups with the Resource Aware Scheduler, simply
enable cgroups and be sure NOT to set storm.worker.cgroup.memory.mb.limit and
storm.worker.cgroup.cpu.limit configs.
+
+