http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/Resource_Aware_Scheduler_overview.md ---------------------------------------------------------------------- diff --git a/docs/Resource_Aware_Scheduler_overview.md b/docs/Resource_Aware_Scheduler_overview.md new file mode 100644 index 0000000..591b37c --- /dev/null +++ b/docs/Resource_Aware_Scheduler_overview.md @@ -0,0 +1,232 @@ +--- +title: Resource Aware Scheduler +layout: documentation +documentation: true +--- +# Introduction + +The purpose of this document is to provide a description of the Resource Aware Scheduler for the Storm distributed real-time computation system. This document will provide you with both a high level description of the resource aware scheduler in Storm + +## Using Resource Aware Scheduler + +The user can switch to using the Resource Aware Scheduler by setting the following in *conf/storm.yaml* + + storm.scheduler: âorg.apache.storm.scheduler.resource.ResourceAwareSchedulerâ + + +## API Overview + +For a Storm Topology, the user can now specify the amount of resources a topology component (i.e. Spout or Bolt) is required to run a single instance of the component. The user can specify the resource requirement for a topology component by using the following API calls. + +### Setting Memory Requirement + +API to set component memory requirement: + + public T setMemoryLoad(Number onHeap, Number offHeap) + +Parameters: +* Number onHeap â The amount of on heap memory an instance of this component will consume in megabytes +* Number offHeap â The amount of off heap memory an instance of this component will consume in megabytes + +The user also has to option to just specify the on heap memory requirement if the component does not have an off heap memory need. + + public T setMemoryLoad(Number onHeap) + +Parameters: +* Number onHeap â The amount of on heap memory an instance of this component will consume + +If no value is provided for offHeap, 0.0 will be used. If no value is provided for onHeap, or if the API is never called for a component, the default value will be used. + +Example of Usage: + + SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10); + s1.setMemoryLoad(1024.0, 512.0); + builder.setBolt("exclaim1", new ExclamationBolt(), 3) + .shuffleGrouping("word").setMemoryLoad(512.0); + +The entire memory requested for this topology is 16.5 GB. That is from 10 spouts with 1GB on heap memory and 0.5 GB off heap memory each and 3 bolts with 0.5 GB on heap memory each. + +### Setting CPU Requirement + + +API to set component CPU requirement: + + public T setCPULoad(Double amount) + +Parameters: +* Number amount â The amount of on CPU an instance of this component will consume. + +Currently, the amount of CPU resources a component requires or is available on a node is represented by a point system. CPU usage is a difficult concept to define. Different CPU architectures perform differently depending on the task at hand. They are so complex that expressing all of that in a single precise portable number is impossible. Instead we take a convention over configuration approach and are primarily concerned with rough level of CPU usage while still providing the possibility to specify amounts more fine grained. + +By convention a CPU core typically will get 100 points. If you feel that your processors are more or less powerful you can adjust this accordingly. Heavy tasks that are CPU bound will get 100 points, as they can consume an entire core. Medium tasks should get 50, light tasks 25, and tiny tasks 10. In some cases you have a task that spawns other threads to help with processing. These tasks may need to go above 100 points to express the amount of CPU they are using. If these conventions are followed the common case for a single threaded task the reported Capacity * 100 should be the number of CPU points that the task needs. + +Example of Usage: + + SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10); + s1.setCPULoad(15.0); + builder.setBolt("exclaim1", new ExclamationBolt(), 3) + .shuffleGrouping("word").setCPULoad(10.0); + builder.setBolt("exclaim2", new HeavyBolt(), 1) + .shuffleGrouping("exclaim1").setCPULoad(450.0); + +### Limiting the Heap Size per Worker (JVM) Process + + + public void setTopologyWorkerMaxHeapSize(Number size) + +Parameters: +* Number size â The memory limit a worker process will be allocated in megabytes + +The user can limit the amount of memory resources the resource aware scheduler allocates to a single worker on a per topology basis by using the above API. This API is in place so that the users can spread executors to multiple workers. However, spreading executors to multiple workers may increase the communication latency since executors will not be able to use Disruptor Queue for intra-process communication. + +Example of Usage: + + Config conf = new Config(); + conf.setTopologyWorkerMaxHeapSize(512.0); + +### Setting Available Resources on Node + +A storm administrator can specify node resource availability by modifying the *conf/storm.yaml* file located in the storm home directory of that node. + +A storm administrator can specify how much available memory a node has in megabytes adding the following to *storm.yaml* + + supervisor.memory.capacity.mb: [amount<Double>] + +A storm administrator can also specify how much available CPU resources a node has available adding the following to *storm.yaml* + + supervisor.cpu.capacity: [amount<Double>] + + +Note: that the amount the user can specify for the available CPU is represented using a point system like discussed earlier. + +Example of Usage: + + supervisor.memory.capacity.mb: 20480.0 + supervisor.cpu.capacity: 100.0 + + +### Other Configurations + +The user can set some default configurations for the Resource Aware Scheduler in *conf/storm.yaml*: + + //default value if on heap memory requirement is not specified for a component + topology.component.resources.onheap.memory.mb: 128.0 + + //default value if off heap memory requirement is not specified for a component + topology.component.resources.offheap.memory.mb: 0.0 + + //default value if CPU requirement is not specified for a component + topology.component.cpu.pcore.percent: 10.0 + + //default value for the max heap size for a worker + topology.worker.max.heap.size.mb: 768.0 + +# Topology Priorities and Per User Resource + +The Resource Aware Scheduler or RAS also has multitenant capabilities since many Storm users typically share a Storm cluster. Resource Aware Scheduler can allocate resources on a per user basis. Each user can be guaranteed a certain amount of resources to run his or her topologies and the Resource Aware Scheduler will meet those guarantees when possible. When the Storm cluster has extra free resources, Resource Aware Scheduler will to be able allocate additional resources to user in a fair manner. The importance of topologies can also vary. Topologies can be used for actual production or just experimentation, thus Resource Aware Scheduler will take into account the importance of a topology when determining the order in which to schedule topologies or when to evict topologies + +## Setup + +The resource guarantees of a user can be specified *conf/user-resource-pools.yaml*. Specify the resource guarantees of a user in the following format: + + resource.aware.scheduler.user.pools: + [UserId] + cpu: [Amount of Guarantee CPU Resources] + memory: [Amount of Guarantee Memory Resources] + +An example of what *user-resource-pools.yaml* can look like: + + resource.aware.scheduler.user.pools: + jerry: + cpu: 1000 + memory: 8192.0 + derek: + cpu: 10000.0 + memory: 32768 + bobby: + cpu: 5000.0 + memory: 16384.0 + +Please note that the specified amount of Guaranteed CPU and Memory can be either a integer or double + +## API Overview +### Specifying Topology Priority +The range of topology priorities can range form 0-29. The topologies priorities will be partitioned into several priority levels that may contain a range of priorities. +For example we can create a priority level mapping: + + PRODUCTION => 0 â 9 + STAGING => 10 â 19 + DEV => 20 â 29 + +Thus, each priority level contains 10 sub priorities. Users can set the priority level of a topology by using the following API + + conf.setTopologyPriority(int priority) + +Parameters: +* priority â an integer representing the priority of the topology + +Please note that the 0-29 range is not a hard limit. Thus, a user can set a priority number that is higher than 29. However, the property of higher the priority number, lower the importance still holds + +### Specifying Scheduling Strategy: + +A user can specify on a per topology basis what scheduling strategy to use. Users can implement the IStrategy interface and define new strategies to schedule specific topologies. This pluggable interface was created since we realize different topologies might have different scheduling needs. A user can set the topology strategy within the topology definition by using the API: + + public void setTopologyStrategy(Class<? extends IStrategy> clazz) + +Parameters: +* clazz â The strategy class that implements the IStrategy interface + +Example Usage: + + conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class); + +A default scheduling is provided. The DefaultResourceAwareStrategy is implemented based off the scheduling algorithm in the original paper describing resource aware scheduling in Storm: + +http://web.engr.illinois.edu/~bpeng/files/r-storm.pdf + +### Specifying Topology Prioritization Strategy + +The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies. For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance: + + resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy" + +A default strategy will be provided. The following explains how the default scheduling priority strategy works. + +**DefaultSchedulingPriorityStrategy** + +The order of scheduling should be based on the distance between a userâs current resource allocation and his or her guaranteed allocation. We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner? Let's use the average percentage of resource guarantees satisfied as a method of comparison. + +For example: + +|User|Resource Guarantee|Resource Allocated| +|----|------------------|------------------| +|A|<10 CPU, 50GB>|<2 CPU, 40 GB>| +|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>| + +User Aâs average percentage satisfied of resource guarantee: + +(2/10+40/50)/2 = 0.5 + +User Bâs average percentage satisfied of resource guarantee: + +(15/20+10/25)/2 = 0.575 + +Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B. Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A. + +When scheduling, RAS sorts users by the average percentage satisfied of resource guarantee and schedule topologies from users based on that ordering starting from the users with the lowest average percentage satisfied of resource guarantee. When a userâs resource guarantee is completely satisfied, the userâs average percentage satisfied of resource guarantee will be greater than or equal to 1. + +### Specifying Eviction Strategy +The eviction strategy is used when there are not enough free resources in the cluster to schedule new topologies. If the cluster is full, we need a mechanism to evict topologies so that user resource guarantees can be met and additional resource can be shared fairly among users. The strategy for evicting topologies is also a pluggable interface in which the user can implement his or her own topology eviction strategy. For a user to implement his or her own eviction strategy, he or she needs to implement the IEvictionStrategy Interface and set *Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented strategy class. For instance: + + resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy" + +A default eviction strategy is provided. The following explains how the default topology eviction strategy works + +**DefaultEvictionStrategy** + + +To determine if topology eviction should occur we should take into account the priority of the topology that we are trying to schedule and whether the resource guarantees for the owner of the topology have been met. + +We should never evict a topology from a user that does not have his or her resource guarantees satisfied. The following flow chart should describe the logic for the eviction process. + +
http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/Running-topologies-on-a-production-cluster.md ---------------------------------------------------------------------- diff --git a/docs/Running-topologies-on-a-production-cluster.md b/docs/Running-topologies-on-a-production-cluster.md index 76c43dd..802fd2e 100644 --- a/docs/Running-topologies-on-a-production-cluster.md +++ b/docs/Running-topologies-on-a-production-cluster.md @@ -5,9 +5,9 @@ documentation: true --- Running topologies on a production cluster is similar to running in [Local mode](Local-mode.html). Here are the steps: -1) Define the topology (Use [TopologyBuilder](javadocs/backtype/storm/topology/TopologyBuilder.html) if defining using Java) +1) Define the topology (Use [TopologyBuilder](javadocs/org/apache/storm/topology/TopologyBuilder.html) if defining using Java) -2) Use [StormSubmitter](javadocs/backtype/storm/StormSubmitter.html) to submit the topology to the cluster. `StormSubmitter` takes as input the name of the topology, a configuration for the topology, and the topology itself. For example: +2) Use [StormSubmitter](javadocs/org/apache/storm/StormSubmitter.html) to submit the topology to the cluster. `StormSubmitter` takes as input the name of the topology, a configuration for the topology, and the topology itself. For example: ```java Config conf = new Config(); @@ -47,7 +47,7 @@ You can find out how to configure your `storm` client to talk to a Storm cluster ### Common configurations -There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found [here](javadocs/backtype/storm/Config.html). The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology: +There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found [here](javadocs/org/apache/storm/Config.html). The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology: 1. **Config.TOPOLOGY_WORKERS**: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads. 2. **Config.TOPOLOGY_ACKER_EXECUTORS**: This sets the number of executors that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them on [Guaranteeing message processing](Guaranteeing-message-processing.html). By not setting this variable or setting it as null, Storm will set the number of acker executors to be equal to the number of workers configured for this topology. If this variable is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability. http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/SECURITY.md ---------------------------------------------------------------------- diff --git a/docs/SECURITY.md b/docs/SECURITY.md index 353cb86..b3ec303 100644 --- a/docs/SECURITY.md +++ b/docs/SECURITY.md @@ -172,7 +172,7 @@ Each jaas file may have multiple sections for different interfaces being used. To enable Kerberos authentication in storm you need to set the following storm.yaml configs ```yaml -storm.thrift.transport: "backtype.storm.security.auth.kerberos.KerberosSaslTransportPlugin" +storm.thrift.transport: "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin" java.security.auth.login.config: "/path/to/jaas.conf" ``` @@ -275,7 +275,7 @@ Server { Nimbus also will translate the principal into a local user name, so that other services can use this name. To configure this for Kerberos authentication set ``` -storm.principal.tolocal: "backtype.storm.security.auth.KerberosPrincipalToLocal" +storm.principal.tolocal: "org.apache.storm.security.auth.KerberosPrincipalToLocal" ``` This only needs to be done on nimbus, but it will not hurt on any node. @@ -324,7 +324,7 @@ The end user can override this if they have a headless user that has a keytab. The preferred authorization plug-in for nimbus is The *SimpleACLAuthorizer*. To use the *SimpleACLAuthorizer*, set the following: ```yaml -nimbus.authorizer: "backtype.storm.security.auth.authorizer.SimpleACLAuthorizer" +nimbus.authorizer: "org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer" ``` DRPC has a separate authorizer configuration for it. Do not use SimpleACLAuthorizer for DRPC. @@ -348,7 +348,7 @@ To ensure isolation of users in multi-tenancy, there is need to run supervisors To support multi-tenancy better we have written a new scheduler. To enable this scheduler set. ```yaml -storm.scheduler: "backtype.storm.scheduler.multitenant.MultitenantScheduler" +storm.scheduler: "org.apache.storm.scheduler.multitenant.MultitenantScheduler" ``` Be aware that many of the features of this scheduler rely on storm authentication. Without them the scheduler will not know what the user is and will not isolate topologies properly. @@ -391,11 +391,11 @@ A storm client may submit requests on behalf of another user. For example, if a it can do so by leveraging the impersonation feature.In order to submit topology as some other user , you can use `StormSubmitter.submitTopologyAs` API. Alternatively you can use `NimbusClient.getConfiguredClientAs` to get a nimbus client as some other user and perform any nimbus action(i.e. kill/rebalance/activate/deactivate) using this client. -To ensure only authorized users can perform impersonation you should start nimbus with `nimbus.impersonation.authorizer` set to `backtype.storm.security.auth.authorizer.ImpersonationAuthorizer`. +To ensure only authorized users can perform impersonation you should start nimbus with `nimbus.impersonation.authorizer` set to `org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer`. The `ImpersonationAuthorizer` uses `nimbus.impersonation.acl` as the acl to authorize users. Following is a sample nimbus config for supporting impersonation: ```yaml -nimbus.impersonation.authorizer: backtype.storm.security.auth.authorizer.ImpersonationAuthorizer +nimbus.impersonation.authorizer: org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer nimbus.impersonation.acl: impersonating_user1: hosts: @@ -423,7 +423,7 @@ nimbus.impersonation.acl: Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services. Exposing this to all of the users can be a pain for them. To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway -and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to backtype.storm.security.auth.kerberos.AutoTGT. +and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to org.apache.storm.security.auth.kerberos.AutoTGT. nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user. nimbus.credential.renewers.freq.secs controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine. http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md index 26dbf07..bbed956 100644 --- a/docs/STORM-UI-REST-API.md +++ b/docs/STORM-UI-REST-API.md @@ -1,10 +1,9 @@ --- -title: Storm REST API +title: Storm UI REST API layout: documentation documentation: true --- -# Storm UI REST API The Storm UI daemon provides a REST API that allows you to interact with a Storm cluster, which includes retrieving metrics data and configuration information as well as management operations such as starting or stopping topologies. @@ -90,7 +89,6 @@ Response fields: |Field |Value|Description |--- |--- |--- |stormVersion|String| Storm version| -|nimbusUptime|String| Shows how long the cluster is running| |supervisors|Integer| Number of supervisors running| |topologies| Integer| Number of topologies running| |slotsTotal| Integer|Total number of available worker slots| @@ -104,7 +102,6 @@ Sample response: ```json { "stormVersion": "0.9.2-incubating-SNAPSHOT", - "nimbusUptime": "3m 53s", "supervisors": 1, "slotsTotal": 4, "slotsUsed": 3, @@ -125,8 +122,13 @@ Response fields: |id| String | Supervisor's id| |host| String| Supervisor's host name| |uptime| String| Shows how long the supervisor is running| +|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| |slotsTotal| Integer| Total number of available worker slots for this supervisor| |slotsUsed| Integer| Number of worker slots used on this supervisor| +|totalMem| Double| Total memory capacity on this supervisor| +|totalCpu| Double| Total CPU capacity on this supervisor| +|usedMem| Double| Used memory capacity on this supervisor| +|usedCpu| Double| Used CPU capacity on this supervisor| Sample response: @@ -137,13 +139,74 @@ Sample response: "id": "0b879808-2a26-442b-8f7d-23101e0c3696", "host": "10.11.1.7", "uptime": "5m 58s", + "uptimeSeconds": 358, "slotsTotal": 4, - "slotsUsed": 3 + "slotsUsed": 3, + "totalMem": 3000, + "totalCpu": 400, + "usedMem": 1280, + "usedCPU": 160 + } + ], + "schedulerDisplayResource": true +} +``` + +### /api/v1/nimbus/summary (GET) + +Returns summary information for all nimbus hosts. + +Response fields: + +|Field |Value|Description| +|--- |--- |--- +|host| String | Nimbus' host name| +|port| int| Nimbus' port number| +|status| String| Possible values are Leader, Not a Leader, Dead| +|nimbusUpTime| String| Shows since how long the nimbus has been running| +|nimbusUpTimeSeconds| String| Shows since how long the nimbus has been running in seconds| +|nimbusLogLink| String| Logviewer url to view the nimbus.log| +|version| String| Version of storm this nimbus host is running| + +Sample response: + +```json +{ + "nimbuses":[ + { + "host":"192.168.202.1", + "port":6627, + "nimbusLogLink":"http:\/\/192.168.202.1:8000\/log?file=nimbus.log", + "status":Leader, + "version":"0.10.0-SNAPSHOT", + "nimbusUpTime":"3m 33s", + "nimbusUpTimeSeconds":"213" } ] } ``` +### /api/v1/history/summary (GET) + +Returns a list of all running topologies' IDs submitted by the current user. + +Response fields: + +|Field |Value | Description| +|--- |--- |--- +|topo-history| List| List of Topologies' IDs| + +Sample response: + +```json +{ + "topo-history":[ + "wc6-1-1446571009", + "wc8-2-1446587178" + ] +} +``` + ### /api/v1/topology/summary (GET) Returns summary information for all topologies. @@ -156,9 +219,19 @@ Response fields: |name| String| Topology Name| |status| String| Topology Status| |uptime| String| Shows how long the topology is running| +|uptimeSeconds| Integer| Shows how long the topology is running in seconds| |tasksTotal| Integer |Total number of tasks for this topology| |workersTotal| Integer |Number of workers used for this topology| |executorsTotal| Integer |Number of executors used for this topology| +|replicationCount| Integer |Number of nimbus hosts on which this topology code is replicated| +|requestedMemOnHeap| Double|Requested On-Heap Memory by User (MB) +|requestedMemOffHeap| Double|Requested Off-Heap Memory by User (MB)| +|requestedTotalMem| Double|Requested Total Memory by User (MB)| +|requestedCpu| Double|Requested CPU by User (%)| +|assignedMemOnHeap| Double|Assigned On-Heap Memory by Scheduler (MB)| +|assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)| +|assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)| +|assignedCpu| Double|Assigned CPU by Scheduler (%)| Sample response: @@ -170,11 +243,55 @@ Sample response: "name": "WordCount3", "status": "ACTIVE", "uptime": "6m 5s", + "uptimeSeconds": 365, "tasksTotal": 28, "workersTotal": 3, - "executorsTotal": 28 + "executorsTotal": 28, + "replicationCount": 1, + "requestedMemOnHeap": 640, + "requestedMemOffHeap": 128, + "requestedTotalMem": 768, + "requestedCpu": 80, + "assignedMemOnHeap": 640, + "assignedMemOffHeap": 128, + "assignedTotalMem": 768, + "assignedCpu": 80 } ] + "schedulerDisplayResource": true +} +``` + +### /api/v1/topology-workers/:id (GET) + +Returns the worker' information (host and port) for a topology. + +Response fields: + +|Field |Value | Description| +|--- |--- |--- +|hostPortList| List| Workers' information for a topology| +|name| Integer| Logviewer Port| + +Sample response: + +```json +{ + "hostPortList":[ + { + "host":"192.168.202.2", + "port":6701 + }, + { + "host":"192.168.202.2", + "port":6702 + }, + { + "host":"192.168.202.3", + "port":6700 + } + ], + "logviewerPort":8000 } ``` @@ -198,12 +315,14 @@ Response fields: |id| String| Topology Id| |name| String |Topology Name| |uptime| String |How long the topology has been running| +|uptimeSeconds| Integer |How long the topology has been running in seconds| |status| String |Current status of the topology, e.g. "ACTIVE"| |tasksTotal| Integer |Total number of tasks for this topology| |workersTotal| Integer |Number of workers used for this topology| |executorsTotal| Integer |Number of executors used for this topology| |msgTimeout| Integer | Number of seconds a tuple has before the spout considers it failed | |windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| |topologyStats| Array | Array of all the topology related stats per time window| |topologyStats.windowPretty| String |Duration passed in HH:MM:SS format| |topologyStats.window| String |User requested time window for metrics| @@ -237,6 +356,7 @@ Response fields: |bolts.errorLapsedSecs| Integer |Number of seconds elapsed since that last error happened in a bolt| |bolts.errorWorkerLogLink| String | Link to the worker log that reported the exception | |bolts.emitted| Long |Number of tuples emitted| +|replicationCount| Integer |Number of nimbus hosts on which this topology code is replicated| Examples: @@ -258,8 +378,10 @@ Sample response: "tasksTotal": 28, "executorsTotal": 28, "uptime": "29m 19s", + "uptimeSeconds": 1759, "msgTimeout": 30, "windowHint": "10m 0s", + "schedulerDisplayResource": true, "topologyStats": [ { "windowPretty": "10m 0s", @@ -371,7 +493,9 @@ Sample response: "nimbus.inbox.jar.expiration.secs": 3600, "drpc.worker.threads": 64, "topology.worker.shared.thread.pool.size": 4, - "nimbus.host": "hw10843.local", + "nimbus.seeds": [ + "hw10843.local" + ], "storm.messaging.netty.min_wait_ms": 100, "storm.zookeeper.port": 2181, "transactional.zookeeper.port": null, @@ -381,7 +505,8 @@ Sample response: "storm.zookeeper.retry.intervalceiling.millis": 30000, "supervisor.enable": true, "storm.messaging.netty.server_worker_threads": 1 - } + }, + "replicationCount": 1 } ``` @@ -407,7 +532,7 @@ Response fields: |windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"| |executors| Integer |Number of executor tasks in the component| |componentErrors| Array of Errors | List of component errors| -|componentErrors.time| Long | Timestamp when the exception occurred | +|componentErrors.errorTime| Long | Timestamp when the exception occurred (Prior to 0.11.0, this field was named 'time'.)| |componentErrors.errorHost| String | host name for the error| |componentErrors.errorPort| String | port for the error| |componentErrors.error| String |Shows the error happened in a component| @@ -430,6 +555,10 @@ Response fields: |boltStats.processLatency| String (double value returned in String format) |Average time of the bolt to ack a message after it was received| |boltStats.acked| Long |Number of messages acked| |boltStats.failed| Long |Number of messages failed| +|profilingAndDebuggingCapable| Boolean |true if there is support for Profiling and Debugging Actions| +|profileActionEnabled| Boolean |true if worker profiling (Java Flight Recorder) is enabled| +|profilerActive| Array |Array of currently active Profiler Actions| + Examples: @@ -448,16 +577,26 @@ Sample response: "componentType": "spout", "windowHint": "10m 0s", "executors": 5, - "componentErrors":[{"time": 1406006074000, + "componentErrors":[{"errorTime": 1406006074000, "errorHost": "10.11.1.70", "errorPort": 6701, "errorWorkerLogLink": "http://10.11.1.7:8000/log?file=worker-6701.log", "errorLapsedSecs": 16, - "error": "java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: Some Error\n\tat backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)\n\tat backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)\n\tat backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)\n\tat backtype...more.." + "error": "java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: Some Error\n\tat org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)\n\tat org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)\n\tat org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)\n\tat backtype...more.." }], "topologyId": "WordCount3-1-1402960825", "tasks": 5, "window": "600", + "profilerActive": [ + { + "host": "10.11.1.70", + "port": "6701", + "dumplink":"http:\/\/10.11.1.70:8000\/dumps\/ex-1-1452718803\/10.11.1.70%3A6701", + "timestamp":"576328" + } + ], + "profilingAndDebuggingCapable": true, + "profileActionEnabled": true, "spoutSummary": [ { "windowPretty": "10m 0s", @@ -524,6 +663,7 @@ Sample response: "host": "10.11.1.7", "acked": 0, "uptime": "43m 4s", + "uptimeSeconds": 2584, "id": "[24-24]", "failed": 0 }, @@ -536,6 +676,7 @@ Sample response: "host": "10.11.1.7", "acked": 0, "uptime": "42m 57s", + "uptimeSeconds": 2577, "id": "[25-25]", "failed": 0 }, @@ -548,6 +689,7 @@ Sample response: "host": "10.11.1.7", "acked": 0, "uptime": "42m 57s", + "uptimeSeconds": 2577, "id": "[26-26]", "failed": 0 }, @@ -560,6 +702,7 @@ Sample response: "host": "10.11.1.7", "acked": 0, "uptime": "43m 4s", + "uptimeSeconds": 2584, "id": "[27-27]", "failed": 0 }, @@ -572,6 +715,7 @@ Sample response: "host": "10.11.1.7", "acked": 0, "uptime": "42m 57s", + "uptimeSeconds": 2577, "id": "[28-28]", "failed": 0 } @@ -579,6 +723,201 @@ Sample response: } ``` +## Profiling and Debugging GET Operations + +### /api/v1/topology/:id/profiling/start/:host-port/:timeout (GET) + +Request to start profiler on worker with timeout. Returns status and link to profiler artifacts for worker. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | +|timeout |String (required)| Time out for profiler to stop in minutes | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | +|timeout | String | Requested timeout +|dumplink | String | Link to logviewer URL for worker profiler documents.| + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/10 +2. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/5 +3. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/start/10.11.1.7:6701/20 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", + "timeout": "10", + "dumplink": "http:\/\/10.11.1.7:8000\/dumps\/wordcount-1-1446614150\/10.11.1.7%3A6701" +} +``` + +### /api/v1/topology/:id/profiling/dumpprofile/:host-port (GET) + +Request to dump profiler recording on worker. Returns status and worker id for the request. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpprofile/10.11.1.7:6701 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", +} +``` + +### /api/v1/topology/:id/profiling/stop/:host-port (GET) + +Request to stop profiler on worker. Returns status and worker id for the request. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/stop/10.11.1.7:6701 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", +} +``` + +### /api/v1/topology/:id/profiling/dumpjstack/:host-port (GET) + +Request to dump jstack on worker. Returns status and worker id for the request. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpjstack/10.11.1.7:6701 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", +} +``` + +### /api/v1/topology/:id/profiling/dumpheap/:host-port (GET) + +Request to dump heap (jmap) on worker. Returns status and worker id for the request. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/dumpheap/10.11.1.7:6701 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", +} +``` + +### /api/v1/topology/:id/profiling/restartworker/:host-port (GET) + +Request to request the worker. Returns status and worker id for the request. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|host-port |String (required)| Worker Id | + +Response fields: + +|Field |Value |Description| +|----- |----- |-----------| +|id | String | Worker id| +|status | String | Response Status | + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/wordcount-1-1446614150/profiling/restartworker/10.11.1.7:6701 +``` + +Sample response: + +```json +{ + "status": "ok", + "id": "10.11.1.7:6701", +} +``` + ## POST Operations ### /api/v1/topology/:id/activate (POST) @@ -673,6 +1012,6 @@ Sample response: ```json { "error": "Internal Server Error", - "errorMessage": "java.lang.NullPointerException\n\tat clojure.core$name.invoke(core.clj:1505)\n\tat backtype.storm.ui.core$component_page.invoke(core.clj:752)\n\tat backtype.storm.ui.core$fn__7766.invoke(core.clj:782)\n\tat compojure.core$make_route$fn__5755.invoke(core.clj:93)\n\tat compojure.core$if_route$fn__5743.invoke(core.clj:39)\n\tat compojure.core$if_method$fn__5736.invoke(core.clj:24)\n\tat compojure.core$routing$fn__5761.invoke(core.clj:106)\n\tat clojure.core$some.invoke(core.clj:2443)\n\tat compojure.core$routing.doInvoke(core.clj:106)\n\tat clojure.lang.RestFn.applyTo(RestFn.java:139)\n\tat clojure.core$apply.invoke(core.clj:619)\n\tat compojure.core$routes$fn__5765.invoke(core.clj:111)\n\tat ring.middleware.reload$wrap_reload$fn__6880.invoke(reload.clj:14)\n\tat backtype.storm.ui.core$catch_errors$fn__7800.invoke(core.clj:836)\n\tat ring.middleware.keyword_params$wrap_keyword_params$fn__6319.invoke(keyword_params.clj:27)\n\tat ring.middleware.nested_params$wrap_nest ed_params$fn__6358.invoke(nested_params.clj:65)\n\tat ring.middleware.params$wrap_params$fn__6291.invoke(params.clj:55)\n\tat ring.middleware.multipart_params$wrap_multipart_params$fn__6386.invoke(multipart_params.clj:103)\n\tat ring.middleware.flash$wrap_flash$fn__6675.invoke(flash.clj:14)\n\tat ring.middleware.session$wrap_session$fn__6664.invoke(session.clj:43)\n\tat ring.middleware.cookies$wrap_cookies$fn__6595.invoke(cookies.clj:160)\n\tat ring.adapter.jetty$proxy_handler$fn__6112.invoke(jetty.clj:16)\n\tat ring.adapter.jetty.proxy$org.mortbay.jetty.handler.AbstractHandler$0.handle(Unknown Source)\n\tat org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)\n\tat org.mortbay.jetty.Server.handle(Server.java:326)\n\tat org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)\n\tat org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)\n\tat org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)\n\tat org.mortb ay.jetty.HttpParser.parseAvailable(HttpParser.java:212)\n\tat org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)\n\tat org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)\n\tat org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)\n" + "errorMessage": "java.lang.NullPointerException\n\tat clojure.core$name.invoke(core.clj:1505)\n\tat org.apache.storm.ui.core$component_page.invoke(core.clj:752)\n\tat org.apache.storm.ui.core$fn__7766.invoke(core.clj:782)\n\tat compojure.core$make_route$fn__5755.invoke(core.clj:93)\n\tat compojure.core$if_route$fn__5743.invoke(core.clj:39)\n\tat compojure.core$if_method$fn__5736.invoke(core.clj:24)\n\tat compojure.core$routing$fn__5761.invoke(core.clj:106)\n\tat clojure.core$some.invoke(core.clj:2443)\n\tat compojure.core$routing.doInvoke(core.clj:106)\n\tat clojure.lang.RestFn.applyTo(RestFn.java:139)\n\tat clojure.core$apply.invoke(core.clj:619)\n\tat compojure.core$routes$fn__5765.invoke(core.clj:111)\n\tat ring.middleware.reload$wrap_reload$fn__6880.invoke(reload.clj:14)\n\tat org.apache.storm.ui.core$catch_errors$fn__7800.invoke(core.clj:836)\n\tat ring.middleware.keyword_params$wrap_keyword_params$fn__6319.invoke(keyword_params.clj:27)\n\tat ring.middleware.nested_params$wra p_nested_params$fn__6358.invoke(nested_params.clj:65)\n\tat ring.middleware.params$wrap_params$fn__6291.invoke(params.clj:55)\n\tat ring.middleware.multipart_params$wrap_multipart_params$fn__6386.invoke(multipart_params.clj:103)\n\tat ring.middleware.flash$wrap_flash$fn__6675.invoke(flash.clj:14)\n\tat ring.middleware.session$wrap_session$fn__6664.invoke(session.clj:43)\n\tat ring.middleware.cookies$wrap_cookies$fn__6595.invoke(cookies.clj:160)\n\tat ring.adapter.jetty$proxy_handler$fn__6112.invoke(jetty.clj:16)\n\tat ring.adapter.jetty.proxy$org.mortbay.jetty.handler.AbstractHandler$0.handle(Unknown Source)\n\tat org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)\n\tat org.mortbay.jetty.Server.handle(Server.java:326)\n\tat org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)\n\tat org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)\n\tat org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)\n\tat org .mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)\n\tat org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)\n\tat org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)\n\tat org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)\n" } ``` http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/Serialization.md ---------------------------------------------------------------------- diff --git a/docs/Serialization.md b/docs/Serialization.md index ac8efe1..c2e129b 100644 --- a/docs/Serialization.md +++ b/docs/Serialization.md @@ -41,7 +41,7 @@ topology.kryo.register: `com.mycompany.CustomType1` and `com.mycompany.CustomType3` will use the `FieldsSerializer`, whereas `com.mycompany.CustomType2` will use `com.mycompany.serializer.CustomType2Serializer` for serialization. -Storm provides helpers for registering serializers in a topology config. The [Config](javadocs/backtype/storm/Config.html) class has a method called `registerSerialization` that takes in a registration to add to the config. +Storm provides helpers for registering serializers in a topology config. The [Config](javadocs/org/apache/storm/Config.html) class has a method called `registerSerialization` that takes in a registration to add to the config. There's an advanced config called `Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS`. If you set this to true, Storm will ignore any serializations that are registered but do not have their code available on the classpath. Otherwise, Storm will throw errors when it can't find a serialization. This is useful if you run many topologies on a cluster that each have different serializations, but you want to declare all the serializations across all topologies in the `storm.yaml` files. http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/State-checkpointing.md ---------------------------------------------------------------------- diff --git a/docs/State-checkpointing.md b/docs/State-checkpointing.md new file mode 100644 index 0000000..7c59aad --- /dev/null +++ b/docs/State-checkpointing.md @@ -0,0 +1,160 @@ +--- +title: Storm State Management +layout: documentation +documentation: true +--- +# State support in core storm +Storm core has abstractions for bolts to save and retrieve the state of its operations. There is a default in-memory +based state implementation and also a Redis backed implementation that provides state persistence. + +## State management +Bolts that requires its state to be managed and persisted by the framework should implement the `IStatefulBolt` interface or +extend the `BaseStatefulBolt` and implement `void initState(T state)` method. The `initState` method is invoked by the framework +during the bolt initialization with the previously saved state of the bolt. This is invoked after prepare but before the bolt starts +processing any tuples. + +Currently the only kind of `State` implementation that is supported is `KeyValueState` which provides key-value mapping. + +For example a word count bolt could use the key value state abstraction for the word counts as follows. + +1. Extend the BaseStatefulBolt and type parameterize it with KeyValueState which would store the mapping of word to count. +2. The bolt gets initialized with its previously saved state in the init method. This will contain the word counts +last committed by the framework during the previous run. +3. In the execute method, update the word count. + + ```java + public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> { + private KeyValueState<String, Long> wordCounts; + private OutputCollector collector; + ... + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + @Override + public void initState(KeyValueState<String, Long> state) { + wordCounts = state; + } + @Override + public void execute(Tuple tuple) { + String word = tuple.getString(0); + Integer count = wordCounts.get(word, 0); + count++; + wordCounts.put(word, count); + collector.emit(tuple, new Values(word, count)); + collector.ack(tuple); + } + ... + } + ``` +4. The framework periodically checkpoints the state of the bolt (default every second). The frequency +can be changed by setting the storm config `topology.state.checkpoint.interval.ms` +5. For state persistence, use a state provider that supports persistence by setting the `topology.state.provider` in the +storm config. E.g. for using Redis based key-value state implementation set `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider` +in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the `storm-redis-*.jar` +in the extlib directory. +6. The state provider properties can be overridden by setting `topology.state.provider.config`. For Redis state this is a +json config with the following properties. + + ``` + { + "keyClass": "Optional fully qualified class name of the Key type.", + "valueClass": "Optional fully qualified class name of the Value type.", + "keySerializerClass": "Optional Key serializer implementation class.", + "valueSerializerClass": "Optional Value Serializer implementation class.", + "jedisPoolConfig": { + "host": "localhost", + "port": 6379, + "timeout": 2000, + "database": 0, + "password": "xyz" + } + } + ``` + +## Checkpoint mechanism +Checkpoint is triggered by an internal checkpoint spout at the specified `topology.state.checkpoint.interval.ms`. If there is +at-least one `IStatefulBolt` in the topology, the checkpoint spout is automatically added by the topology builder . For stateful topologies, +the topology builder wraps the `IStatefulBolt` in a `StatefulBoltExecutor` which handles the state commits on receiving the checkpoint tuples. +The non stateful bolts are wrapped in a `CheckpointTupleForwarder` which just forwards the checkpoint tuples so that the checkpoint tuples +can flow through the topology DAG. The checkpoint tuples flow through a separate internal stream namely `$checkpoint`. The topology builder +wires the checkpoint stream across the whole topology with the checkpoint spout at the root. + +``` + default default default +[spout1] ---------------> [statefulbolt1] ----------> [bolt1] --------------> [statefulbolt2] + | ----------> --------------> + | ($chpt) ($chpt) + | +[$checkpointspout] _______| ($chpt) +``` + +At checkpoint intervals the checkpoint tuples are emitted by the checkpoint spout. On receiving a checkpoint tuple, the state of the bolt +is saved and then the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all its input +streams before it saves its state so that the state represents a consistent state across the topology. Once the checkpoint spout receives +ACK from all the bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout. + +The state commit works like a three phase commit protocol with a prepare and commit phase so that the state across the topology is saved +in a consistent and atomic manner. + +### Recovery +The recovery phase is triggered when the topology is started for the first time. If the previous transaction was not successfully +prepared, a `rollback` message is sent across the topology so that if a bolt has some prepared transactions it can be discarded. +If the previous transaction was prepared successfully but not committed, a `commit` message is sent across the topology so that +the prepared transactions can be committed. After these steps are complete, the bolts are initialized with the state. + +The recovery is also triggered if one of the bolts fails to acknowledge the checkpoint message or say a worker crashed in +the middle. Thus when the worker is restarted by the supervisor, the checkpoint mechanism makes sure that the bolt gets +initialized with its previous state and the checkpointing continues from the point where it left off. + +### Guarantee +Storm relies on the acking mechanism to replay tuples in case of failures. It is possible that the state is committed +but the worker crashes before acking the tuples. In this case the tuples are replayed causing duplicate state updates. +Also currently the StatefulBoltExecutor continues to process the tuples from a stream after it has received a checkpoint +tuple on one stream while waiting for checkpoint to arrive on other input streams for saving the state. This can also cause +duplicate state updates during recovery. + +The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee. + +In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the `WordCountBolt` example in the State management section above. + +### IStateful bolt hooks +IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions. +```java + /** + * This is a hook for the component to perform some actions just before the + * framework commits its state. + */ + void preCommit(long txid); + + /** + * This is a hook for the component to perform some actions just before the + * framework prepares its state. + */ + void prePrepare(long txid); + + /** + * This is a hook for the component to perform some actions just before the + * framework rolls back the prepared state. + */ + void preRollback(); +``` +This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other +system level components can be built on top of the stateful abstractions where we might want to take some actions before the +stateful bolt's state is prepared, committed or rolled back. + +## Providing custom state implementations +Currently the only kind of `State` implementation supported is `KeyValueState` which provides key-value mapping. + +Custom state implementations should provide implementations for the methods defined in the `org.apache.storm.State` interface. +These are the `void prepareCommit(long txid)`, `void commit(long txid)`, `rollback()` methods. `commit()` method is optional +and is useful if the bolt manages the state on its own. This is currently used only by the internal system bolts, +for e.g. the CheckpointSpout to save its state. + +`KeyValueState` implementation should also implement the methods defined in the `org.apache.storm.state.KeyValueState` interface. + +### State provider +The framework instantiates the state via the corresponding `StateProvider` implementation. A custom state should also provide +a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace. +The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding +State implementation should be available in the class path of Storm (by placing them in the extlib directory). http://git-wip-us.apache.org/repos/asf/storm/blob/a107b93f/docs/Structure-of-the-codebase.md ---------------------------------------------------------------------- diff --git a/docs/Structure-of-the-codebase.md b/docs/Structure-of-the-codebase.md index 573a93c..a2c3d63 100644 --- a/docs/Structure-of-the-codebase.md +++ b/docs/Structure-of-the-codebase.md @@ -25,8 +25,8 @@ Spouts and bolts have the same Thrift definition, so let's just take a look at t The `ComponentObject` defines the implementation for the bolt. It can be one of three types: -1. A serialized java object (that implements [IBolt]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/task/IBolt.java)) -2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component. +1. A serialized java object (that implements [IBolt]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/task/IBolt.java)) +2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component. 3. A `JavaObject` structure which tells Storm the classname and constructor arguments to use to instantiate that bolt. This is useful if you want to define a topology in a non-JVM language. This way, you can make use of JVM-based spouts and bolts without having to create and serialize a Java object yourself. `ComponentCommon` defines everything else for this component. This includes: @@ -36,107 +36,107 @@ The `ComponentObject` defines the implementation for the bolt. It can be one of 3. The parallelism for this component 4. The component-specific [configuration](Configuration.html) for this component -Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](https://github.com/apache/storm/wiki/Guaranteeing-message-processing), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/common.clj#L279). +Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](https://github.com/apache/storm/wiki/Guaranteeing-message-processing), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/common.clj#L279). ### Java interfaces The interfaces for Storm are generally specified as Java interfaces. The main interfaces are: -1. [IRichBolt](javadocs/backtype/storm/topology/IRichBolt.html) -2. [IRichSpout](javadocs/backtype/storm/topology/IRichSpout.html) -3. [TopologyBuilder](javadocs/backtype/storm/topology/TopologyBuilder.html) +1. [IRichBolt](javadocs/org/apache/storm/topology/IRichBolt.html) +2. [IRichSpout](javadocs/org/apache/storm/topology/IRichSpout.html) +3. [TopologyBuilder](javadocs/org/apache/storm/topology/TopologyBuilder.html) The strategy for the majority of the interfaces is to: 1. Specify the interface using a Java interface 2. Provide a base class that provides default implementations when appropriate -You can see this strategy at work with the [BaseRichSpout](javadocs/backtype/storm/topology/base/BaseRichSpout.html) class. +You can see this strategy at work with the [BaseRichSpout](javadocs/org/apache/storm/topology/base/BaseRichSpout.html) class. Spouts and bolts are serialized into the Thrift definition of the topology as described above. -One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java#L205) of the `TopologyBuilder` code. +One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java#L205) of the `TopologyBuilder` code. ### Implementation Specifying all the functionality via Java interfaces ensures that every feature of Storm is available via Java. Moreso, the focus on Java interfaces ensures that the user experience from Java-land is pleasant as well. -The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [backtype.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/coordination), [backtype.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/drpc), and [backtype.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/transactional) packages. +The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [org.apache.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/coordination), [org.apache.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/drpc), and [org.apache.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/transactional) packages. Here's a summary of the purpose of the main Java packages and Clojure namespace: #### Java packages -[backtype.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here. +[org.apache.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here. -[backtype.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/drpc): Implementation of the DRPC higher level abstraction +[org.apache.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/drpc): Implementation of the DRPC higher level abstraction -[backtype.storm.generated]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions) +[org.apache.storm.generated]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions) -[backtype.storm.grouping]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/grouping): Contains interface for making custom stream groupings +[org.apache.storm.grouping]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/grouping): Contains interface for making custom stream groupings -[backtype.storm.hooks]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks). +[org.apache.storm.hooks]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks). -[backtype.storm.serialization]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/). +[org.apache.storm.serialization]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/). -[backtype.storm.spout]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages. +[org.apache.storm.spout]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages. -[backtype.storm.task]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime. +[org.apache.storm.task]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime. -[backtype.storm.testing]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests. +[org.apache.storm.testing]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests. -[backtype.storm.topology]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts. +[org.apache.storm.topology]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts. -[backtype.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/transactional): Implementation of transactional topologies. +[org.apache.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/transactional): Implementation of transactional topologies. -[backtype.storm.tuple]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/tuple): Implementation of Storm's tuple data model. +[org.apache.storm.tuple]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/tuple): Implementation of Storm's tuple data model. -[backtype.storm.utils]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/tuple): Data structures and miscellaneous utilities used throughout the codebase. +[org.apache.storm.utils]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/tuple): Data structures and miscellaneous utilities used throughout the codebase. #### Clojure namespaces -[backtype.storm.bootstrap]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/bootstrap.clj): Contains a helpful macro to import all the classes and namespaces that are used throughout the codebase. +[org.apache.storm.bootstrap]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/bootstrap.clj): Contains a helpful macro to import all the classes and namespaces that are used throughout the codebase. -[backtype.storm.clojure]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/clojure.clj): Implementation of the Clojure DSL for Storm. +[org.apache.storm.clojure]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/clojure.clj): Implementation of the Clojure DSL for Storm. -[backtype.storm.cluster]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/cluster.clj): All Zookeeper logic used in Storm daemons is encapsulated in this file. This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is mapped to the Zookeeper "filesystem" API. +[org.apache.storm.cluster]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/cluster.clj): All Zookeeper logic used in Storm daemons is encapsulated in this file. This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is mapped to the Zookeeper "filesystem" API. -[backtype.storm.command.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/command): These namespaces implement various commands for the `storm` command line client. These implementations are very short. +[org.apache.storm.command.*]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/command): These namespaces implement various commands for the `storm` command line client. These implementations are very short. -[backtype.storm.config]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/config.clj): Implementation of config reading/parsing code for Clojure. Also has utility functions for determining what local path nimbus/supervisor/daemons should be using for various things. e.g. the `master-inbox` function will return the local path that Nimbus should use when jars are uploaded to it. +[org.apache.storm.config]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/config.clj): Implementation of config reading/parsing code for Clojure. Also has utility functions for determining what local path nimbus/supervisor/daemons should be using for various things. e.g. the `master-inbox` function will return the local path that Nimbus should use when jars are uploaded to it. -[backtype.storm.daemon.acker]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/acker.clj): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing. +[org.apache.storm.daemon.acker]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/acker.clj): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing. -[backtype.storm.daemon.common]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/common.clj): Implementation of common functions used in Storm daemons, like getting the id for a topology based on the name, mapping a user's topology into the one that actually executes (with implicit acking streams and acker bolt added - see `system-topology!` function), and definitions for the various heartbeat and other structures persisted by Storm. +[org.apache.storm.daemon.common]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/common.clj): Implementation of common functions used in Storm daemons, like getting the id for a topology based on the name, mapping a user's topology into the one that actually executes (with implicit acking streams and acker bolt added - see `system-topology!` function), and definitions for the various heartbeat and other structures persisted by Storm. -[backtype.storm.daemon.drpc]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/drpc.clj): Implementation of the DRPC server for use with DRPC topologies. +[org.apache.storm.daemon.drpc]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/drpc.clj): Implementation of the DRPC server for use with DRPC topologies. -[backtype.storm.daemon.nimbus]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/nimbus.clj): Implementation of Nimbus. +[org.apache.storm.daemon.nimbus]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj): Implementation of Nimbus. -[backtype.storm.daemon.supervisor]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/supervisor.clj): Implementation of Supervisor. +[org.apache.storm.daemon.supervisor]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj): Implementation of Supervisor. -[backtype.storm.daemon.task]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/task.clj): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations. +[org.apache.storm.daemon.task]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/task.clj): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations. -[backtype.storm.daemon.worker]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/worker.clj): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching. +[org.apache.storm.daemon.worker]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/worker.clj): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching. -[backtype.storm.event]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/event.clj): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions. +[org.apache.storm.event]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/event.clj): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions. -[backtype.storm.log]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/log.clj): Defines the functions used to log messages to log4j. +[org.apache.storm.log]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/log.clj): Defines the functions used to log messages to log4j. -[backtype.storm.messaging.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses ZeroMQ. The generic interface is defined in protocol.clj. +[org.apache.storm.messaging.*]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses ZeroMQ. The generic interface is defined in protocol.clj. -[backtype.storm.stats]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/stats.clj): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities. +[org.apache.storm.stats]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/stats.clj): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities. -[backtype.storm.testing]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities. +[org.apache.storm.testing]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities. -[backtype.storm.thrift]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/thrift.clj): Clojure wrappers around the generated Thrift API to make working with Thrift structures more pleasant. +[org.apache.storm.thrift]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/thrift.clj): Clojure wrappers around the generated Thrift API to make working with Thrift structures more pleasant. -[backtype.storm.timer]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/timer.clj): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor. +[org.apache.storm.timer]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/timer.clj): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor. -[backtype.storm.ui.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data. +[org.apache.storm.ui.*]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data. -[backtype.storm.util]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/util.clj): Contains generic utility functions used throughout the code base. +[org.apache.storm.util]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/util.clj): Contains generic utility functions used throughout the code base. -[backtype.storm.zookeeper]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/zookeeper.clj): Clojure wrapper around the Zookeeper API and implements some "high-level" stuff like "mkdirs" and "delete-recursive". +[org.apache.storm.zookeeper]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/zookeeper.clj): Clojure wrapper around the Zookeeper API and implements some "high-level" stuff like "mkdirs" and "delete-recursive".
