Merge remote-tracking branch 'upstream/master' into STORM-166

Conflicts:
        conf/defaults.yaml
        pom.xml
        storm-core/src/clj/backtype/storm/cluster.clj
        storm-core/src/clj/backtype/storm/daemon/nimbus.clj
        storm-core/src/clj/backtype/storm/thrift.clj
        storm-core/src/clj/backtype/storm/ui/core.clj
        storm-core/src/jvm/backtype/storm/Config.java
        storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
        storm-core/test/clj/backtype/storm/nimbus_test.clj
        storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
        storm-core/test/clj/backtype/storm/utils_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b0da168
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b0da168
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b0da168

Branch: refs/heads/0.11.x-branch
Commit: 6b0da1689525507d2f40823abb50de3a4a1f9607
Parents: 88e70a8 23e630b
Author: Parth Brahmbhatt <[email protected]>
Authored: Tue Dec 16 18:14:32 2014 -0800
Committer: Parth Brahmbhatt <[email protected]>
Committed: Tue Dec 16 18:14:32 2014 -0800

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 CHANGELOG.md                                    |   24 +-
 DEVELOPER.md                                    |    4 +
 LICENSE                                         |   14 +-
 README.markdown                                 |    4 +
 SECURITY.md                                     |  356 +-
 STORM-UI-REST-API.md                            |   17 +-
 bin/storm                                       |   15 +-
 bin/storm.cmd                                   |   11 +-
 conf/defaults.yaml                              |   45 +-
 conf/jaas_digest.conf                           |    8 +-
 conf/jaas_kerberos.conf                         |   15 +
 doap_Storm.rdf                                  |   57 +
 docs/README.md                                  |   38 +
 docs/_config.yml                                |   11 +
 docs/_includes/footer.html                      |   16 +
 docs/_includes/head.html                        |   31 +
 docs/_includes/header.html                      |   25 +
 docs/_layouts/about.html                        |   40 +
 docs/_layouts/default.html                      |   19 +
 docs/_layouts/documentation.html                |   16 +
 docs/_layouts/page.html                         |   14 +
 docs/_layouts/post.html                         |   15 +
 docs/_posts/2012-08-02-storm080-released.md     |  120 +
 docs/_posts/2012-09-06-storm081-released.md     |   47 +
 docs/_posts/2013-01-11-storm082-released.md     |   82 +
 docs/_posts/2013-12-08-storm090-released.md     |  127 +
 docs/_posts/2014-04-10-storm-logo-contest.md    |   66 +
 docs/_posts/2014-04-17-logo-pforrest.md         |   10 +
 docs/_posts/2014-04-17-logo-squinones.md        |    9 +
 docs/_posts/2014-04-19-logo-ssuleman.md         |    8 +
 docs/_posts/2014-04-21-logo-rmarshall.md        |   12 +
 docs/_posts/2014-04-22-logo-zsayari.md          |    9 +
 docs/_posts/2014-04-23-logo-abartos.md          |   15 +
 docs/_posts/2014-04-27-logo-cboustead.md        |   12 +
 docs/_posts/2014-04-27-logo-sasili.md           |   10 +
 docs/_posts/2014-04-29-logo-jlee1.md            |   10 +
 docs/_posts/2014-04-29-logo-jlee2.md            |   10 +
 docs/_posts/2014-04-29-logo-jlee3.md            |   10 +
 docs/_posts/2014-05-27-round1-results.md        |   38 +
 docs/_posts/2014-06-17-contest-results.md       |   24 +
 docs/_posts/2014-06-25-storm092-released.md     |  137 +
 .../2014-10-20-storm093-release-candidate.md    |   11 +
 docs/_posts/2014-11-25-storm093-released.md     |  164 +
 docs/_sass/_syntax-highlighting.scss            |   70 +
 docs/about.md                                   |    7 +
 docs/about/deployment.md                        |    9 +
 docs/about/fault-tolerant.md                    |    9 +
 docs/about/free-and-open-source.md              |   15 +
 docs/about/guarantees-data-processing.md        |   10 +
 docs/about/integrates.md                        |   13 +
 docs/about/multi-language.md                    |    9 +
 docs/about/scalable.md                          |   10 +
 docs/about/simple-api.md                        |   15 +
 docs/assets/css/bootstrap-theme.css             |  470 ++
 docs/assets/css/bootstrap-theme.css.map         |    1 +
 docs/assets/css/bootstrap-theme.min.css         |    5 +
 docs/assets/css/bootstrap.css                   | 6332 ++++++++++++++++++
 docs/assets/css/bootstrap.css.map               |    1 +
 docs/assets/css/bootstrap.min.css               |    5 +
 docs/assets/css/theme.css                       |   18 +
 docs/assets/favicon.ico                         |  Bin 0 -> 1150 bytes
 .../fonts/glyphicons-halflings-regular.eot      |  Bin 0 -> 20335 bytes
 .../fonts/glyphicons-halflings-regular.svg      |  229 +
 .../fonts/glyphicons-halflings-regular.ttf      |  Bin 0 -> 41280 bytes
 .../fonts/glyphicons-halflings-regular.woff     |  Bin 0 -> 23320 bytes
 docs/assets/js/bootstrap.js                     | 2320 +++++++
 docs/assets/js/bootstrap.min.js                 |    7 +
 docs/assets/js/npm.js                           |   13 +
 docs/css/main.scss                              |   47 +
 docs/doc-index.html                             |   11 +
 .../Acking-framework-implementation.md          |   38 +
 docs/documentation/Clojure-DSL.md               |  266 +
 docs/documentation/Command-line-client.md       |  102 +
 docs/documentation/Common-patterns.md           |   88 +
 docs/documentation/Concepts.md                  |  117 +
 docs/documentation/Configuration.md             |   31 +
 docs/documentation/Contributing-to-Storm.md     |   33 +
 .../Creating-a-new-Storm-project.md             |   27 +
 .../DSLs-and-multilang-adapters.md              |   11 +
 ...Defining-a-non-jvm-language-dsl-for-storm.md |   38 +
 docs/documentation/Distributed-RPC.md           |  199 +
 docs/documentation/Documentation.md             |   52 +
 docs/documentation/FAQ.md                       |  123 +
 docs/documentation/Fault-tolerance.md           |   30 +
 .../Guaranteeing-message-processing.md          |  181 +
 docs/documentation/Home.md                      |   69 +
 docs/documentation/Hooks.md                     |    9 +
 docs/documentation/Implementation-docs.md       |   20 +
 .../Installing-native-dependencies.md           |   38 +
 docs/documentation/Kestrel-and-Storm.md         |  200 +
 docs/documentation/Lifecycle-of-a-topology.md   |   82 +
 docs/documentation/Local-mode.md                |   29 +
 docs/documentation/Maven.md                     |   56 +
 .../Message-passing-implementation.md           |   30 +
 docs/documentation/Metrics.md                   |   36 +
 docs/documentation/Multilang-protocol.md        |  223 +
 docs/documentation/Powered-By.md                |  925 +++
 docs/documentation/Project-ideas.md             |    6 +
 docs/documentation/Rationale.md                 |   33 +
 ...unning-topologies-on-a-production-cluster.md |   77 +
 .../Serialization-(prior-to-0.6.0).md           |   52 +
 docs/documentation/Serialization.md             |   62 +
 docs/documentation/Serializers.md               |    4 +
 .../documentation/Setting-up-a-Storm-cluster.md |   85 +
 .../Setting-up-a-Storm-project-in-Eclipse.md    |    1 +
 .../Setting-up-development-environment.md       |   41 +
 docs/documentation/Spout-implementations.md     |   10 +
 ...guage-protocol-(versions-0.7.0-and-below).md |  124 +
 docs/documentation/Structure-of-the-codebase.md |  142 +
 .../Support-for-non-java-languages.md           |    9 +
 docs/documentation/Transactional-topologies.md  |  361 +
 docs/documentation/Trident-API-Overview.md      |  312 +
 docs/documentation/Trident-spouts.md            |   44 +
 docs/documentation/Trident-state.md             |  331 +
 docs/documentation/Trident-tutorial.md          |  254 +
 docs/documentation/Troubleshooting.md           |  145 +
 docs/documentation/Tutorial.md                  |  312 +
 ...nding-the-parallelism-of-a-Storm-topology.md |  123 +
 .../Using-non-JVM-languages-with-Storm.md       |   52 +
 docs/documentation/images/ack_tree.png          |  Bin 0 -> 31463 bytes
 docs/documentation/images/batched-stream.png    |  Bin 0 -> 66336 bytes
 docs/documentation/images/drpc-workflow.png     |  Bin 0 -> 66199 bytes
 .../images/eclipse-project-properties.png       |  Bin 0 -> 80810 bytes
 .../images/example-of-a-running-topology.png    |  Bin 0 -> 81430 bytes
 docs/documentation/images/grouping.png          |  Bin 0 -> 39701 bytes
 .../images/ld-library-path-eclipse-linux.png    |  Bin 0 -> 114597 bytes
 ...onships-worker-processes-executors-tasks.png |  Bin 0 -> 54804 bytes
 docs/documentation/images/spout-vs-state.png    |  Bin 0 -> 24804 bytes
 docs/documentation/images/storm-cluster.png     |  Bin 0 -> 34604 bytes
 docs/documentation/images/topology-tasks.png    |  Bin 0 -> 45960 bytes
 docs/documentation/images/topology.png          |  Bin 0 -> 23147 bytes
 .../images/transactional-batches.png            |  Bin 0 -> 23293 bytes
 .../images/transactional-commit-flow.png        |  Bin 0 -> 17725 bytes
 .../images/transactional-design-2.png           |  Bin 0 -> 13537 bytes
 .../images/transactional-spout-structure.png    |  Bin 0 -> 25067 bytes
 docs/documentation/images/trident-to-storm1.png |  Bin 0 -> 67173 bytes
 docs/documentation/images/trident-to-storm2.png |  Bin 0 -> 68943 bytes
 docs/documentation/images/tuple-dag.png         |  Bin 0 -> 18849 bytes
 docs/documentation/images/tuple_tree.png        |  Bin 0 -> 58186 bytes
 docs/downloads.html                             |  155 +
 docs/feed.xml                                   |   30 +
 docs/images/bullet.gif                          |  Bin 0 -> 82 bytes
 docs/images/download.png                        |  Bin 0 -> 16272 bytes
 docs/images/incubator-logo.png                  |  Bin 0 -> 11651 bytes
 .../logocontest/abartos/stationery_mockup.jpg   |  Bin 0 -> 146498 bytes
 docs/images/logocontest/abartos/storm_logo.png  |  Bin 0 -> 153974 bytes
 docs/images/logocontest/abartos/storm_logo2.png |  Bin 0 -> 115425 bytes
 docs/images/logocontest/abartos/storm_logo3.png |  Bin 0 -> 94950 bytes
 .../images/logocontest/cboustead/storm_logo.png |  Bin 0 -> 67149 bytes
 .../logocontest/cboustead/storm_logo1.png       |  Bin 0 -> 16327 bytes
 docs/images/logocontest/jlee1/storm_logo.jpg    |  Bin 0 -> 189382 bytes
 docs/images/logocontest/jlee2/storm_logo.jpg    |  Bin 0 -> 155666 bytes
 docs/images/logocontest/jlee3/storm_logo.jpg    |  Bin 0 -> 158134 bytes
 docs/images/logocontest/pforrest/storm1.png     |  Bin 0 -> 84569 bytes
 .../pforrest/storm_logo_composite.png           |  Bin 0 -> 139223 bytes
 .../rmarshall/StormLogo_Horizontal.png          |  Bin 0 -> 16481 bytes
 .../rmarshall/StormLogo_Horizontal_NoColour.png |  Bin 0 -> 14358 bytes
 .../logocontest/rmarshall/StormLogo_Square.png  |  Bin 0 -> 14392 bytes
 docs/images/logocontest/sasili/storm_logo.png   |  Bin 0 -> 92196 bytes
 .../images/logocontest/squinones/storm_logo.png |  Bin 0 -> 203263 bytes
 .../logocontest/squinones/storm_logo1.png       |  Bin 0 -> 53325 bytes
 docs/images/logocontest/ssuleman/storm_logo.png |  Bin 0 -> 95509 bytes
 docs/images/logocontest/storm_logo_winner.png   |  Bin 0 -> 34490 bytes
 docs/images/logocontest/zsayari/storm_logo.png  |  Bin 0 -> 120794 bytes
 docs/images/logos/8digits.png                   |  Bin 0 -> 19557 bytes
 docs/images/logos/Yahoo_Japan_logo.png          |  Bin 0 -> 3707 bytes
 docs/images/logos/aeris.png                     |  Bin 0 -> 6268 bytes
 docs/images/logos/alibaba.jpg                   |  Bin 0 -> 43703 bytes
 docs/images/logos/baidu.jpeg                    |  Bin 0 -> 3413 bytes
 docs/images/logos/cerner.gif                    |  Bin 0 -> 2591 bytes
 docs/images/logos/flipboard.jpeg                |  Bin 0 -> 2909 bytes
 docs/images/logos/fullcontact.png               |  Bin 0 -> 24567 bytes
 docs/images/logos/groupon.jpg                   |  Bin 0 -> 41413 bytes
 docs/images/logos/holidaycheck.png              |  Bin 0 -> 3129 bytes
 docs/images/logos/idexx.gif                     |  Bin 0 -> 38689 bytes
 docs/images/logos/mercadolibre.png              |  Bin 0 -> 73388 bytes
 docs/images/logos/navisite.jpg                  |  Bin 0 -> 9358 bytes
 docs/images/logos/ooyala.gif                    |  Bin 0 -> 7830 bytes
 docs/images/logos/parc.png                      |  Bin 0 -> 7256 bytes
 docs/images/logos/quicklizard.png               |  Bin 0 -> 5667 bytes
 docs/images/logos/rocketfuel.png                |  Bin 0 -> 9719 bytes
 docs/images/logos/rubicon.png                   |  Bin 0 -> 10663 bytes
 docs/images/logos/spiderio.png                  |  Bin 0 -> 46790 bytes
 docs/images/logos/spotify.jpeg                  |  Bin 0 -> 3282 bytes
 docs/images/logos/taobao.gif                    |  Bin 0 -> 3262 bytes
 docs/images/logos/twitter.png                   |  Bin 0 -> 4392 bytes
 docs/images/logos/weatherchannel.gif            |  Bin 0 -> 3425 bytes
 docs/images/logos/webmd.jpg                     |  Bin 0 -> 6193 bytes
 docs/images/logos/yelp.png                      |  Bin 0 -> 98431 bytes
 docs/images/mailinglist.png                     |  Bin 0 -> 4245 bytes
 docs/images/storm_header.png                    |  Bin 0 -> 17291 bytes
 docs/images/storm_logo_tagline_color copy.png   |  Bin 0 -> 67928 bytes
 docs/images/storm_logo_tagline_color.png        |  Bin 0 -> 33568 bytes
 docs/images/top_bg.gif                          |  Bin 0 -> 113 bytes
 docs/images/topology.png                        |  Bin 0 -> 59837 bytes
 docs/images/ui_topology_viz.png                 |  Bin 0 -> 112831 bytes
 docs/index.html                                 |  104 +
 docs/news.html                                  |   12 +
 .../multilang/resources/asyncSplitsentence.js   |   18 +
 .../multilang/resources/randomsentence.js       |   18 +
 .../multilang/resources/splitsentence.js        |   18 +
 .../storm-starter/multilang/resources/storm.js  |   38 +-
 .../storm-starter/multilang/resources/storm.py  |   87 +-
 .../storm-starter/multilang/resources/storm.rb  |   90 +-
 examples/storm-starter/pom.xml                  |    2 +-
 .../src/jvm/storm/starter/util/StormRunner.java |    3 +-
 external/storm-hbase/pom.xml                    |    5 +-
 .../trident/mapper/TridentHBaseMapper.java      |   18 +
 .../hbase/trident/state/HBaseMapState.java      |   17 +
 external/storm-hdfs/pom.xml                     |    4 +-
 .../hdfs/bolt/format/DefaultSequenceFormat.java |   17 +
 .../storm/hdfs/bolt/format/SequenceFormat.java  |   17 +
 .../hdfs/bolt/rotation/TimedRotationPolicy.java |   17 +
 .../hdfs/common/rotation/MoveFileAction.java    |   17 +
 .../hdfs/common/rotation/RotationAction.java    |   17 +
 .../apache/storm/hdfs/trident/HdfsState.java    |   17 +
 .../storm/hdfs/trident/HdfsStateFactory.java    |   17 +
 .../apache/storm/hdfs/trident/HdfsUpdater.java  |   17 +
 .../trident/format/DefaultSequenceFormat.java   |   17 +
 .../hdfs/trident/format/SequenceFormat.java     |   17 +
 .../trident/rotation/TimedRotationPolicy.java   |   17 +
 .../storm/hdfs/trident/FixedBatchSpout.java     |   17 +
 .../storm/hdfs/trident/TridentFileTopology.java |   17 +
 .../hdfs/trident/TridentSequenceTopology.java   |   17 +
 external/storm-kafka/README.md                  |   13 +-
 external/storm-kafka/pom.xml                    |    2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |   63 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   11 +-
 .../jvm/storm/kafka/UpdateOffsetException.java  |   22 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |    7 +-
 logback/cluster.xml                             |    4 +-
 logback/worker.xml                              |   41 +
 pom.xml                                         |   51 +-
 .../maven-shade-clojure-transformer/pom.xml     |    4 +-
 storm-core/pom.xml                              |  307 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |    4 +
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |    4 +-
 .../src/clj/backtype/storm/MockAutoCred.clj     |   58 +
 storm-core/src/clj/backtype/storm/bootstrap.clj |    5 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  171 +-
 .../storm/command/upload_credentials.clj        |   35 +
 storm-core/src/clj/backtype/storm/config.clj    |   40 +-
 .../backtype/storm/daemon/builtin_metrics.clj   |   21 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   18 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |  174 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  137 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |  271 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  249 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  183 +-
 .../src/clj/backtype/storm/daemon/task.clj      |    7 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  109 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   11 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   16 +-
 storm-core/src/clj/backtype/storm/testing.clj   |   56 +-
 storm-core/src/clj/backtype/storm/testing4j.clj |   28 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   24 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  174 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   69 +-
 storm-core/src/clj/backtype/storm/util.clj      |   95 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |   21 +-
 storm-core/src/clj/storm/trident/testing.clj    |    1 +
 .../src/dev/drpc-simple-acl-test-scenario.yaml  |   11 +
 storm-core/src/dev/resources/storm.js           |   24 +
 storm-core/src/dev/resources/storm.py           |   25 +-
 storm-core/src/dev/resources/storm.rb           |   55 +-
 storm-core/src/dev/resources/tester_bolt.js     |   17 +
 storm-core/src/dev/resources/tester_spout.js    |   17 +
 storm-core/src/jvm/backtype/storm/Config.java   |  547 +-
 .../jvm/backtype/storm/ConfigValidation.java    |  156 +-
 .../src/jvm/backtype/storm/Constants.java       |    3 +-
 .../backtype/storm/ICredentialsListener.java    |   32 +
 .../src/jvm/backtype/storm/ILocalCluster.java   |    2 +
 .../src/jvm/backtype/storm/StormSubmitter.java  |  164 +-
 .../storm/drpc/DRPCInvocationsClient.java       |   91 +-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |  100 +-
 .../jvm/backtype/storm/drpc/ReturnResults.java  |   35 +-
 .../storm/generated/AuthorizationException.java |  345 +
 .../backtype/storm/generated/Credentials.java   |  390 ++
 .../storm/generated/DistributedRPC.java         |  110 +-
 .../generated/DistributedRPCInvocations.java    |  352 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 3006 ++++++++-
 .../backtype/storm/generated/SubmitOptions.java |   98 +-
 .../backtype/storm/generated/TopologyInfo.java  |  192 +-
 .../storm/generated/TopologySummary.java        |  192 +-
 .../backtype/storm/messaging/netty/Client.java  |   73 +-
 .../storm/messaging/netty/ControlMessage.java   |    4 +-
 .../storm/messaging/netty/MessageDecoder.java   |   32 +-
 .../storm/messaging/netty/MessageEncoder.java   |    4 +
 .../storm/messaging/netty/SaslMessageToken.java |   99 +
 .../storm/messaging/netty/SaslNettyClient.java  |  166 +
 .../messaging/netty/SaslNettyClientState.java   |   31 +
 .../storm/messaging/netty/SaslNettyServer.java  |  165 +
 .../messaging/netty/SaslNettyServerState.java   |   31 +
 .../messaging/netty/SaslStormClientHandler.java |  158 +
 .../netty/SaslStormServerAuthorizeHandler.java  |   83 +
 .../messaging/netty/SaslStormServerHandler.java |  155 +
 .../storm/messaging/netty/SaslUtils.java        |   74 +
 .../backtype/storm/messaging/netty/Server.java  |  104 +-
 .../netty/StormClientPipelineFactory.java       |   12 +-
 .../messaging/netty/StormServerHandler.java     |    2 +-
 .../netty/StormServerPipelineFactory.java       |   20 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |   13 +
 .../scheduler/multitenant/DefaultPool.java      |  219 +
 .../storm/scheduler/multitenant/FreePool.java   |  125 +
 .../scheduler/multitenant/IsolatedPool.java     |  346 +
 .../multitenant/MultitenantScheduler.java       |   98 +
 .../storm/scheduler/multitenant/Node.java       |  343 +
 .../storm/scheduler/multitenant/NodePool.java   |  296 +
 .../storm/security/INimbusCredentialPlugin.java |   47 +
 .../backtype/storm/security/auth/AuthUtils.java |  228 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |   87 +
 .../security/auth/DefaultPrincipalToLocal.java  |   43 +
 .../storm/security/auth/IAutoCredentials.java   |   55 +
 .../security/auth/ICredentialsRenewer.java      |   41 +
 .../auth/IGroupMappingServiceProvider.java      |   42 +
 .../security/auth/IHttpCredentialsPlugin.java   |   50 +
 .../storm/security/auth/IPrincipalToLocal.java  |   41 +
 .../storm/security/auth/ITransportPlugin.java   |   14 +-
 .../security/auth/KerberosPrincipalToLocal.java |   45 +
 .../storm/security/auth/ReqContext.java         |   10 +-
 .../security/auth/SaslTransportPlugin.java      |   44 +-
 .../security/auth/ShellBasedGroupsMapping.java  |   94 +
 .../security/auth/SimpleTransportPlugin.java    |   61 +-
 .../security/auth/SingleUserPrincipal.java      |   56 +
 .../storm/security/auth/TBackoffConnect.java    |   77 +
 .../storm/security/auth/ThriftClient.java       |   85 +-
 .../security/auth/ThriftConnectionType.java     |   77 +
 .../storm/security/auth/ThriftServer.java       |   19 +-
 .../auth/authorizer/DRPCAuthorizerBase.java     |   46 +
 .../authorizer/DRPCSimpleACLAuthorizer.java     |  157 +
 .../auth/authorizer/DenyAuthorizer.java         |    4 +-
 .../auth/authorizer/NoopAuthorizer.java         |    6 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    |  131 +
 .../authorizer/SimpleWhitelistAuthorizer.java   |   70 +
 .../auth/digest/DigestSaslTransportPlugin.java  |    1 +
 .../storm/security/auth/hadoop/AutoHDFS.java    |  262 +
 .../storm/security/auth/kerberos/AutoTGT.java   |  281 +
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |  108 +
 .../kerberos/AutoTGTKrb5LoginModuleTest.java    |   44 +
 .../auth/kerberos/ClientCallbackHandler.java    |  104 +
 .../kerberos/KerberosSaslTransportPlugin.java   |  206 +
 .../auth/kerberos/ServerCallbackHandler.java    |   86 +
 .../auth/kerberos/jaas_kerberos_cluster.conf    |   31 +
 .../auth/kerberos/jaas_kerberos_launcher.conf   |   12 +
 .../jvm/backtype/storm/spout/ShellSpout.java    |   65 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  211 +-
 .../storm/testing/CompleteTopologyParam.java    |   22 +-
 .../testing/ForwardingMetricsConsumer.java      |   95 +
 .../storm/testing/PythonShellMetricsBolt.java   |   17 +
 .../storm/testing/PythonShellMetricsSpout.java  |   17 +
 .../testing/SingleUserSimpleTransport.java      |   37 +
 .../state/TestTransactionalState.java           |   47 +
 .../transactional/state/TransactionalState.java |   56 +-
 .../storm/ui/InvalidRequestException.java       |   17 +
 .../jvm/backtype/storm/utils/DRPCClient.java    |   63 +-
 .../backtype/storm/utils/DisruptorQueue.java    |   56 +-
 .../jvm/backtype/storm/utils/LocalState.java    |   44 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   10 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    |  498 ++
 .../src/jvm/backtype/storm/utils/TestUtils.java |   34 +
 .../src/jvm/backtype/storm/utils/Utils.java     |  130 +-
 .../backtype/storm/utils/ZookeeperAuthInfo.java |    9 +-
 .../storm/utils/ZookeeperServerCnxnFactory.java |   84 +
 .../trident/drpc/ReturnResultsReducer.java      |   13 +-
 .../topology/state/TestTransactionalState.java  |   47 +
 .../topology/state/TransactionalState.java      |   58 +-
 storm-core/src/multilang/js/storm.js            |   19 +-
 storm-core/src/multilang/py/storm.py            |   25 +-
 storm-core/src/multilang/rb/storm.rb            |   55 +-
 .../src/native/worker-launcher/.autom4te.cfg    |   42 +
 .../worker-launcher/.deps/worker-launcher.Po    |    1 +
 .../src/native/worker-launcher/Makefile.am      |   32 +
 .../src/native/worker-launcher/configure.ac     |   50 +
 .../native/worker-launcher/impl/configuration.c |  340 +
 .../native/worker-launcher/impl/configuration.h |   45 +
 .../src/native/worker-launcher/impl/main.c      |  210 +
 .../worker-launcher/impl/worker-launcher.c      |  779 +++
 .../worker-launcher/impl/worker-launcher.h      |  129 +
 .../worker-launcher/test/test-worker-launcher.c |  340 +
 storm-core/src/py/__init__.py                   |   16 +
 storm-core/src/py/storm/DistributedRPC-remote   |   18 +
 storm-core/src/py/storm/DistributedRPC.py       |   37 +-
 .../py/storm/DistributedRPCInvocations-remote   |   18 +
 .../src/py/storm/DistributedRPCInvocations.py   |   96 +-
 storm-core/src/py/storm/Nimbus-remote           |   25 +
 storm-core/src/py/storm/Nimbus.py               |  652 +-
 storm-core/src/py/storm/__init__.py             |   16 +
 storm-core/src/py/storm/constants.py            |   16 +
 storm-core/src/py/storm/ttypes.py               | 1243 +++-
 storm-core/src/storm.thrift                     |   58 +-
 storm-core/src/ui/public/component.html         |   21 +-
 storm-core/src/ui/public/css/style.css          |    9 +
 storm-core/src/ui/public/favicon.ico            |  Bin 0 -> 18280 bytes
 storm-core/src/ui/public/index.html             |   12 +-
 storm-core/src/ui/public/js/script.js           |    3 +-
 .../public/templates/anti-forgery-template.html |   19 +
 .../public/templates/index-page-template.html   |   12 +
 .../templates/topology-page-template.html       |   23 +-
 .../src/ui/public/templates/user-template.html  |   25 +
 storm-core/src/ui/public/topology.html          |   21 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   93 +-
 .../test/clj/backtype/storm/config_test.clj     |   11 +
 .../test/clj/backtype/storm/drpc_test.clj       |   14 +-
 .../clj/backtype/storm/local_state_test.clj     |   14 +-
 .../test/clj/backtype/storm/logviewer_test.clj  |  187 +
 .../storm/messaging/netty_integration_test.clj  |    4 +-
 .../storm/messaging/netty_unit_test.clj         |   32 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |  375 +-
 .../scheduler/multitenant_scheduler_test.clj    |  831 +++
 .../storm/security/auth/AuthUtils_test.clj      |   16 +-
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   40 +
 .../storm/security/auth/ThriftClient_test.clj   |   28 +-
 .../storm/security/auth/ThriftServer_test.clj   |    8 +-
 .../backtype/storm/security/auth/auth_test.clj  |  373 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |  226 +
 .../security/auth/auto_login_module_test.clj    |   91 +
 .../storm/security/auth/drpc-auth-alice.jaas    |    5 +
 .../storm/security/auth/drpc-auth-bob.jaas      |    5 +
 .../storm/security/auth/drpc-auth-charlie.jaas  |    5 +
 .../storm/security/auth/drpc-auth-server.jaas   |    6 +
 .../storm/security/auth/drpc_auth_test.clj      |  315 +
 .../storm/security/auth/nimbus_auth_test.clj    |  181 +
 .../test/clj/backtype/storm/submitter_test.clj  |   75 +
 .../test/clj/backtype/storm/supervisor_test.clj |  180 +-
 .../test/clj/backtype/storm/testing4j_test.clj  |   32 +
 .../clj/backtype/storm/transactional_test.clj   |   27 +-
 .../utils/ZookeeperServerCnxnFactory_test.clj   |   35 +
 .../test/clj/backtype/storm/utils_test.clj      |   56 +-
 .../test/clj/storm/trident/state_test.clj       |   25 +-
 .../storm/utils/DisruptorQueueTest.java         |   25 +-
 storm-dist/binary/LICENSE                       |   15 +-
 storm-dist/binary/pom.xml                       |    2 +-
 storm-dist/binary/src/main/assembly/binary.xml  |    4 +
 storm-dist/source/pom.xml                       |    2 +-
 435 files changed, 39442 insertions(+), 1664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index b875323,d03c2c9..7757b24
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -35,17 -37,22 +37,27 @@@ storm.zookeeper.auth.password: nul
  storm.cluster.mode: "distributed" # can be distributed or local
  storm.local.mode.zmq: false
  storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
+ storm.principal.tolocal: 
"backtype.storm.security.auth.DefaultPrincipalToLocal"
+ storm.group.mapping.service: 
"backtype.storm.security.auth.ShellBasedGroupsMapping"
  storm.messaging.transport: "backtype.storm.messaging.netty.Context"
+ storm.nimbus.retry.times: 5
+ storm.nimbus.retry.interval.millis: 2000
+ storm.nimbus.retry.intervalceiling.millis: 60000
+ storm.auth.simple-white-list.users: []
+ storm.auth.simple-acl.users: []
+ storm.auth.simple-acl.users.commands: []
+ storm.auth.simple-acl.admins: []
  storm.meta.serialization.delegate: 
"backtype.storm.serialization.DefaultSerializationDelegate"
 +storm.codedistributor.class: 
"backtype.storm.torrent.BitTorrentCodeDistributor"
 +
 +### bittorrent configuration
 +bittorrent.port: 6969
 +bittorrent.max.upload.rate: 0.0
 +bittorrent.max.download.rate: 0.0
  
  ### nimbus.* configs are for the master
 -nimbus.host: "localhost"
  nimbus.thrift.port: 6627
+ nimbus.thrift.threads: 64
  nimbus.thrift.max_buffer_size: 1048576
  nimbus.childopts: "-Xmx1024m"
  nimbus.task.timeout.secs: 30
@@@ -58,8 -64,7 +70,9 @@@ nimbus.task.launch.secs: 12
  nimbus.reassign: true
  nimbus.file.copy.expiration.secs: 600
  nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
 +min.replication.count: 0
 +max.replication.wait.time.sec: 0
+ nimbus.credential.renewers.freq.secs: 600
  
  ### ui.* configs are for the master
  ui.port: 8080
@@@ -98,13 -121,16 +129,16 @@@ supervisor.monitor.frequency.secs: 
  #how frequently the supervisor heartbeats to the cluster state (for nimbus)
  supervisor.heartbeat.frequency.secs: 5
  supervisor.enable: true
 +supervisor.bittorrent.seed.duration: 0
+ supervisor.supervisors: []
+ supervisor.supervisors.commands: []
  
 -
  ### worker.* configs are for task workers
  worker.childopts: "-Xmx768m"
+ worker.gc.childopts: ""
  worker.heartbeat.frequency.secs: 1
  
- # control how many worker receiver threads we need per worker 
+ # control how many worker receiver threads we need per worker
  topology.worker.receiver.thread.count: 1
  
  task.heartbeat.frequency.secs: 3

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 2fcac56,5645b9a..e075ac0
--- a/pom.xml
+++ b/pom.xml
@@@ -208,8 -209,10 +209,10 @@@
          <reply.version>0.3.0</reply.version>
          <conjure.version>2.1.3</conjure.version>
          <zookeeper.version>3.4.6</zookeeper.version>
 +        <bittorrent.version>1.4</bittorrent.version>
- 
+         <conjure.version>2.1.3</conjure.version>
+         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
+         <clojure-contrib.version>1.2.0</clojure-contrib.version>
 -
      </properties>
  
      <profiles>

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 3db84ff,8ead710..840575b
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -160,14 -172,14 +175,16 @@@
  (def SUPERVISORS-ROOT "supervisors")
  (def WORKERBEATS-ROOT "workerbeats")
  (def ERRORS-ROOT "errors")
 +(def CODE-DISTRIBUTOR-ROOT "code-distributor")
+ (def CREDENTIALS-ROOT "credentials")
  
  (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
  (def STORMS-SUBTREE (str "/" STORMS-ROOT))
  (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
  (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
  (def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
 +(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+ (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
  
  (defn supervisor-path
    [id]
@@@ -251,22 -263,22 +272,24 @@@
          supervisors-callback (atom nil)
          assignments-callback (atom nil)
          storm-base-callback (atom {})
 +        code-distributor-callback (atom nil)
+         credentials-callback (atom {})
          state-id (register
-                    cluster-state
-                    (fn [type path]
-                      (let [[subtree & args] (tokenize-path path)]
-                        (condp = subtree
+                   cluster-state
+                   (fn [type path]
+                     (let [[subtree & args] (tokenize-path path)]
+                       (condp = subtree
                           ASSIGNMENTS-ROOT (if (empty? args)
-                                             (issue-callback! 
assignments-callback)
-                                             (issue-map-callback! 
assignment-info-callback (first args)))
+                                              (issue-callback! 
assignments-callback)
+                                              (issue-map-callback! 
assignment-info-callback (first args)))
                           SUPERVISORS-ROOT (issue-callback! 
supervisors-callback)
 +                         CODE-DISTRIBUTOR-ROOT (issue-callback! 
code-distributor-callback)
                           STORMS-ROOT (issue-map-callback! storm-base-callback 
(first args))
+                          CREDENTIALS-ROOT (issue-map-callback! 
credentials-callback (first args))
                           ;; this should never happen
                           (exit-process! 30 "Unknown callback for subtree " 
subtree args)))))]
 -    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
 +    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE]]
-       (mkdirs cluster-state p))
+       (mkdirs cluster-state p acls))
      (reify
        StormClusterState
  
@@@ -404,49 -407,56 +428,62 @@@
  
        (set-assignment!
          [this storm-id info]
-         (set-data cluster-state (assignment-path storm-id) (Utils/serialize 
info)))
+         (set-data cluster-state (assignment-path storm-id) (Utils/serialize 
info) acls))
  
 +      (setup-code-distributor!
 +        [this storm-id info]
-         (mkdirs cluster-state (code-distributor-path storm-id))
-         (mkdirs cluster-state (str (code-distributor-path storm-id) "/" 
info)))
++        (mkdirs cluster-state (code-distributor-path storm-id) acl)
++        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" info) 
acl))
 +
        (remove-storm!
          [this storm-id]
          (delete-node cluster-state (assignment-path storm-id))
 +        (delete-node cluster-state (code-distributor-path storm-id))
+         (delete-node cluster-state (credentials-path storm-id))
          (remove-storm-base! this storm-id))
  
+       (set-credentials!
+          [this storm-id creds topo-conf]
+          (let [topo-acls (mk-topo-only-acls topo-conf)
+                path (credentials-path storm-id)]
+            (set-data cluster-state path (Utils/serialize creds) topo-acls)))
+ 
+       (credentials
+         [this storm-id callback]
+         (when callback
+           (swap! credentials-callback assoc storm-id callback))
+         (maybe-deserialize (get-data cluster-state (credentials-path 
storm-id) (not-nil? callback))))
+ 
        (report-error
-         [this storm-id component-id node port error]
-         (let [path (error-path storm-id component-id)
-               data {:time-secs (current-time-secs) :error (stringify-error 
error) :host node :port port}
-               _ (mkdirs cluster-state path)
-               _ (create-sequential cluster-state (str path "/e") 
(Utils/serialize data))
-               to-kill (->> (get-children cluster-state path false)
-                            (sort-by parse-error-path)
-                            reverse
-                            (drop 10))]
-           (doseq [k to-kill]
-             (delete-node cluster-state (str path "/" k)))))
+          [this storm-id component-id node port error]
+          (let [path (error-path storm-id component-id)
+                data {:time-secs (current-time-secs) :error (stringify-error 
error) :host node :port port}
+                _ (mkdirs cluster-state path acls)
+                _ (create-sequential cluster-state (str path "/e") 
(Utils/serialize data) acls)
+                to-kill (->> (get-children cluster-state path false)
+                             (sort-by parse-error-path)
+                             reverse
+                             (drop 10))]
+            (doseq [k to-kill]
+              (delete-node cluster-state (str path "/" k)))))
  
        (errors
-         [this storm-id component-id]
-         (let [path (error-path storm-id component-id)
-               _ (mkdirs cluster-state path)
-               children (get-children cluster-state path false)
-               errors (dofor [c children]
-                             (let [data (-> (get-data cluster-state (str path 
"/" c) false)
-                                            maybe-deserialize)]
-                               (when data
-                                 (struct TaskError (:error data) (:time-secs 
data) (:host data) (:port data))
-                                 )))
-               ]
-           (->> (filter not-nil? errors)
-                (sort-by (comp - :time-secs)))))
- 
+          [this storm-id component-id]
+          (let [path (error-path storm-id component-id)
+                errors (if (exists-node? cluster-state path false)
+                         (dofor [c (get-children cluster-state path false)]
+                           (let [data (-> (get-data cluster-state (str path 
"/" c) false)
+                                          maybe-deserialize)]
+                             (when data
+                               (struct TaskError (:error data) (:time-secs 
data) (:host data) (:port data))
+                               )))
+                         ())
+                ]
+            (->> (filter not-nil? errors)
+                 (sort-by (comp - :time-secs)))))
+       
        (disconnect
-         [this]
+          [this]
          (unregister cluster-state state-id)
          (when solo?
            (close cluster-state))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 2afcbec,1fbf7f0..9f5b1a6
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -14,20 -14,19 +14,21 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns backtype.storm.daemon.nimbus
-   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
-   (:import [org.apache.thrift.protocol TBinaryProtocol 
TBinaryProtocol$Factory])
-   (:import [org.apache.thrift.exception])
-   (:import [org.apache.thrift.transport TNonblockingServerTransport 
TNonblockingServerSocket])
-   (:import [java.nio ByteBuffer])
+   (:import [java.nio ByteBuffer]
+            [java.util Collections])
    (:import [java.io FileNotFoundException])
 +  (:import [java.net InetAddress])
    (:import [java.nio.channels Channels WritableByteChannel])
+   (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType 
ReqContext AuthUtils])
    (:use [backtype.storm.scheduler.DefaultScheduler])
    (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot 
TopologyDetails
              Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl 
DefaultScheduler ExecutorDetails])
 +  (:use [backtype.storm bootstrap util zookeeper])
+   (:import [backtype.storm.generated AuthorizationException])
+   (:use [backtype.storm bootstrap util])
    (:use [backtype.storm.config :only [validate-configs-with-schemas]])
    (:use [backtype.storm.daemon common])
+   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
    (:gen-class
      :methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] 
void]]))
  
@@@ -60,31 -59,22 +61,40 @@@
      scheduler
      ))
  
 +(defmulti mk-bt-tracker cluster-mode)
 +(defmulti sync-code cluster-mode)
 +
 +;;TODO we should try genclass for zkLeaderElector and just set 
NIMBUS-LEADER-ELECTOR-CLASS in defaults.yaml
 +(defn mk-leader-elector [conf]
 +  (if (conf NIMBUS-LEADER-ELECTOR-CLASS)
 +    (do (log-message "Using custom Leade elector: " (conf 
NIMBUS-LEADER-ELECTOR-CLASS))
 +      (-> (conf NIMBUS-LEADER-ELECTOR-CLASS) new-instance))
 +    (zk-leader-elector conf)))
 +
 +(defnk is-leader [nimbus :throw-exception true]
 +  (let [leader-elector (:leader-elector nimbus)]
 +    (if (.isLeader leader-elector) true
 +      (if throw-exception
 +        (let [leader-address (.getLeader leader-elector)]
 +          (throw (RuntimeException. (str "not a leader, current leader is " 
leader-address))))))))
 +
+ (def NIMBUS-ZK-ACLS
+   [(first ZooDefs$Ids/CREATOR_ALL_ACL) 
+    (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) 
ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+ 
  (defn nimbus-data [conf inimbus]
    (let [forced-scheduler (.getForcedScheduler inimbus)]
      {:conf conf
 +     :host-port-info (str (.getCanonicalHostName (InetAddress/getLocalHost)) 
":" (conf NIMBUS-THRIFT-PORT))
       :inimbus inimbus
+      :authorization-handler (mk-authorization-handler (conf 
NIMBUS-AUTHORIZER) conf)
       :submitted-count (atom 0)
-      :storm-cluster-state (cluster/mk-storm-cluster-state conf)
+      :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
+                                                                        
(Utils/isZkAuthenticationConfiguredStormServer
+                                                                          conf)
+                                                                        
NIMBUS-ZK-ACLS))
       :submit-lock (Object.)
+      :cred-update-lock (Object.)
       :heartbeats-cache (atom {})
       :downloaders (file-cache-map conf)
       :uploaders (file-cache-map conf)
@@@ -95,8 -85,9 +105,11 @@@
                                   (exit-process! 20 "Error when processing an 
event")
                                   ))
       :scheduler (mk-scheduler conf inimbus)
 +     :leader-elector (mk-leader-elector conf)
 +     :bt-tracker (mk-bt-tracker conf)
+      :id->sched-status (atom {})
+      :cred-renewers (AuthUtils/GetCredentialRenewers conf)
+      :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
       }))
  
  (defn inbox [nimbus]
@@@ -321,8 -312,9 +334,9 @@@
        [(.getNodeId slot) (.getPort slot)]
        )))
  
 -(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
 +(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
    (let [stormroot (master-stormdist-root conf storm-id)]
+    (log-message "nimbus file location:" stormroot)
     (FileUtils/forceMkdir (File. stormroot))
     (FileUtils/cleanDirectory (File. stormroot))
     (setup-jar conf tmp-jar-location stormroot)
@@@ -944,15 -979,12 +1030,16 @@@
  (defserverfn service-handler [conf inimbus]
    (.prepare inimbus conf (master-inimbus-dir conf))
    (log-message "Starting Nimbus with conf " conf)
-   (let [nimbus (nimbus-data conf inimbus)]
+   (let [nimbus (nimbus-data conf inimbus)
+        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
      (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) 
conf)
 +    (.addToLeaderLockQueue (:leader-elector nimbus))
      (cleanup-corrupt-topologies! nimbus)
 -    (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 -      (transition! nimbus storm-id :startup))
 +    ;register call back for code-distributor
 +    (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf 
nimbus)))
 +    (when (is-leader nimbus :throw-exception false)
 +      (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 +        (transition! nimbus storm-id :startup)))
      (schedule-recurring (:timer nimbus)
                          0
                          (conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -968,23 -1000,21 +1055,29 @@@
                          (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                          (fn []
                            (clean-inbox (inbox nimbus) (conf 
NIMBUS-INBOX-JAR-EXPIRATION-SECS))
 -                          ))    
 +                          ))
- 
++    ;;schedule nimbus code sync thread to sync code from other nimbuses.
 +    (schedule-recurring (:timer nimbus)
 +      0
 +      (conf NIMBUS-CODE-SYNC-FREQ-SECS)
 +      (fn []
 +        (sync-code conf nimbus)
 +        ))
 +
+     (schedule-recurring (:timer nimbus)
+                         0
+                         (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+                         (fn []
+                           (renew-credentials nimbus)))
 -
      (reify Nimbus$Iface
        (^void submitTopologyWithOpts
          [this ^String storm-name ^String uploadedJarLocation ^String 
serializedConf ^StormTopology topology
           ^SubmitOptions submitOptions]
          (try
 +          (is-leader nimbus)
            (assert (not-nil? submitOptions))
            (validate-topology-name! storm-name)
+           (check-authorization! nimbus storm-name nil "submitTopology")
            (check-storm-active! nimbus storm-name false)
            (let [topo-conf (from-json serializedConf)]
              (try
@@@ -997,25 -1027,48 +1090,50 @@@
                         topology))
            (swap! (:submitted-count nimbus) inc)
            (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" 
(current-time-secs))
-                 storm-conf (normalize-conf
+                 credentials (.get_creds submitOptions)
+                 credentials (when credentials (.get_creds credentials))
+                 topo-conf (from-json serializedConf)
+                 storm-conf-submitted (normalize-conf
                              conf
-                             (-> serializedConf
-                                 from-json
-                                 (assoc STORM-ID storm-id)
+                             (-> topo-conf
+                               (assoc STORM-ID storm-id)
                                (assoc TOPOLOGY-NAME storm-name))
                              topology)
+                 req (ReqContext/context)
+                 principal (.principal req)
+                 submitter-principal (if principal (.toString principal))
+                 submitter-user (.toLocal principal-to-local principal)
+                 topo-acl (distinct (remove nil? (conj (.get 
storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
+                 storm-conf (-> storm-conf-submitted
+                                (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if 
submitter-principal submitter-principal ""))
+                                (assoc TOPOLOGY-SUBMITTER-USER (if 
submitter-user submitter-user "")) ;Don't let the user set who we launch as
+                                (assoc TOPOLOGY-USERS topo-acl)
+                                (assoc STORM-ZOOKEEPER-SUPERACL (.get conf 
STORM-ZOOKEEPER-SUPERACL)))
+                 storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer 
conf)
+                                 storm-conf
+                                 (dissoc storm-conf 
STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
                  total-storm-conf (merge conf storm-conf)
                  topology (normalize-topology total-storm-conf topology)
 -
 -                storm-cluster-state (:storm-cluster-state nimbus)]
 +                storm-cluster-state (:storm-cluster-state nimbus)
 +                host-port-info (:host-port-info nimbus) ]
+             (when credentials (doseq [nimbus-autocred-plugin 
(:nimbus-autocred-plugins nimbus)]
+               (.populateCredentials nimbus-autocred-plugin credentials 
(Collections/unmodifiableMap storm-conf))))
+             (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? 
submitter-user) (.isEmpty (.trim submitter-user)))) 
+               (throw (AuthorizationException. "Could not determine the user 
to run this topology as.")))
              (system-topology! total-storm-conf topology) ;; this validates 
the structure of the topology
+             (validate-topology-size topo-conf conf topology)
+             (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
+                        (not (Utils/isZkAuthenticationConfiguredTopology 
storm-conf)))
+                 (throw (IllegalArgumentException. "The cluster is configured 
for zookeeper authentication, but no payload was provided.")))
              (log-message "Received topology submission for " storm-name " 
with conf " storm-conf)
              ;; lock protects against multiple topologies being submitted at 
once and
              ;; cleanup thread killing topology in b/w assignment and starting 
the topology
              (locking (:submit-lock nimbus)
+               ;;cred-update-lock is not needed here because creds are being 
added for the first time.
+               (.set-credentials! storm-cluster-state storm-id credentials 
storm-conf)
 -              (setup-storm-code conf storm-id uploadedJarLocation storm-conf 
topology)
 +              (setup-storm-code nimbus conf storm-id uploadedJarLocation 
storm-conf topology)
 +              (.setup-code-distributor! storm-cluster-state storm-id 
host-port-info)
 +              (wait-for-desired-code-replication nimbus conf storm-id)
                (.setup-heartbeats! storm-cluster-state storm-id)
                (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE 
:inactive
                                                TopologyInitialStatus/ACTIVE 
:active}]
@@@ -1207,9 -1303,6 +1368,8 @@@
          (.disconnect (:storm-cluster-state nimbus))
          (.cleanup (:downloaders nimbus))
          (.cleanup (:uploaders nimbus))
 +        (.close (:leader-elector nimbus))
-         (log-message (:bt-tracker nimbus))
 +        (if (:bt-tracker nimbus) (.close (:bt-tracker nimbus) (:conf nimbus)))
          (log-message "Shut down master")
          )
        DaemonCommon

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 5de19c4,d82fd12..c96dd88
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -14,10 -14,10 +14,11 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns backtype.storm.daemon.supervisor
+   (:import [java.io OutputStreamWriter BufferedWriter IOException])
    (:import [backtype.storm.scheduler ISupervisor]
             [java.net JarURLConnection]
 -           [java.net URI])
 +           [java.net URI]
 +           [java.io File])
    (:use [backtype.storm bootstrap])
    (:use [backtype.storm.daemon common])
    (:require [backtype.storm.daemon [worker :as worker]])
@@@ -264,12 -335,10 +338,13 @@@
           ". State: " state
           ", Heartbeat: " (pr-str heartbeat))
          (shutdown-worker supervisor id)
 +        (if (:bt-tracker supervisor)
 +          (.cleanup (:bt-tracker supervisor) id))
          ))
 +
      (doseq [id (vals new-worker-ids)]
-       (local-mkdirs (worker-pids-root conf id)))
+       (local-mkdirs (worker-pids-root conf id))
+       (local-mkdirs (worker-heartbeats-root conf id)))
      (.put local-state LS-APPROVED-WORKERS
            (merge
             (select-keys (.get local-state LS-APPROVED-WORKERS)
@@@ -457,29 -526,49 +532,56 @@@
    (.shutdown supervisor)
    )
  
- ;; distributed implementation
+ (defn setup-storm-code-dir [conf storm-conf dir]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+   (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
  
+ ;; distributed implementation
  (defmethod download-storm-code
 -    :distributed [conf storm-id master-code-dir]
 +    :distributed [conf storm-id master-code-dir supervisor]
      ;; Downloading to permanent location is atomic
      (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
 -          stormroot (supervisor-stormdist-root conf storm-id)]
 +          stormroot (supervisor-stormdist-root conf storm-id)
 +          master-meta-file-path (master-storm-metafile-path master-code-dir)
 +          supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
        (FileUtils/forceMkdir (File. tmproot))
 -      
 -      (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) 
(supervisor-stormjar-path tmproot))
 -      (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) 
(supervisor-stormcode-path tmproot))
 -      (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) 
(supervisor-stormconf-path tmproot))
 +      (Utils/downloadFromMaster conf master-meta-file-path 
supervisor-meta-file-path)
 +      (if (:bt-tracker supervisor)
 +        (.download (:bt-tracker supervisor) storm-id (File. 
supervisor-meta-file-path)))
        (extract-dir-from-jar (supervisor-stormjar-path tmproot) 
RESOURCES-SUBDIR tmproot)
 +      (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. 
stormroot)))
        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-       ))
+       (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) 
stormroot)      
+     ))
+ 
+ (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
+   (let [file (get-log-metadata-file storm-id port)]
+     ;;run worker as user needs the directory to have special permissions
+     ;; or it is insecure
+     (when (and (not (conf SUPERVISOR-RUN-WORKER-AS-USER))
+                (not (.exists (.getParentFile file))))
+       (.mkdirs (.getParentFile file)))
+     (let [writer (java.io.FileWriter. file)
+         yaml (Yaml.)]
+       (try
+         (.dump yaml data writer)
+         (finally
+           (.close writer))))))
+ 
+ (defn write-log-metadata! [storm-conf user worker-id storm-id port conf]
+   (let [data {TOPOLOGY-SUBMITTER-USER user
+               "worker-id" worker-id
+               LOGS-USERS (sort (distinct (remove nil?
+                                            (concat
+                                              (storm-conf LOGS-USERS)
+                                              (storm-conf TOPOLOGY-USERS)))))}]
+     (write-log-metadata-to-yaml-file! storm-id port data conf)))
  
 +(defmethod mk-bt-tracker :distributed [conf]
 +  (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
 +    (.prepare code-distributor conf)
 +    code-distributor))
 +
  (defn jlp [stormroot conf]
    (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
          os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/thrift.clj
index 640401f,ce0a5ff..2b860b3
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@@ -26,9 -26,8 +26,8 @@@
    (:import [backtype.storm.grouping CustomStreamGrouping])
    (:import [backtype.storm.topology TopologyBuilder])
    (:import [backtype.storm.clojure RichShellBolt RichShellSpout])
-   (:import [org.apache.thrift.protocol TBinaryProtocol TProtocol])
-   (:import [org.apache.thrift.transport TTransport TFramedTransport TSocket])
+   (:import [org.apache.thrift.transport TTransport])
 -  (:use [backtype.storm util config log]))
 +  (:use [backtype.storm util config log zookeeper]))
  
  (defn instantiate-java-object
    [^JavaObject obj]

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 75bf746,3dc7396..12a396e
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -18,12 -18,12 +18,12 @@@
    (:use compojure.core)
    (:use ring.middleware.reload)
    (:use [hiccup core page-helpers])
 -  (:use [backtype.storm config util log])
 +  (:use [backtype.storm config util log zookeeper])
    (:use [backtype.storm.ui helpers])
-   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID 
ACKER-INIT-STREAM-ID
-                                               ACKER-ACK-STREAM-ID 
ACKER-FAIL-STREAM-ID system-id?]]])
-   (:use [ring.adapter.jetty :only [run-jetty]])
-   (:use [clojure.string :only [trim]])
+   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID 
ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
+                                               ACKER-FAIL-STREAM-ID system-id? 
mk-authorization-handler]]])
+   (:use [ring.middleware.anti-forgery])
+   (:use [clojure.string :only [blank? lower-case trim]])
    (:import [backtype.storm.utils Utils])
    (:import [backtype.storm.generated ExecutorSpecificStats
              ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
@@@ -43,15 -48,24 +49,29 @@@
  
  (defmacro with-nimbus
    [nimbus-sym & body]
 -  `(thrift/with-nimbus-connection
 -     [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* 
NIMBUS-THRIFT-PORT)]
 -     ~@body))
 +  `(let [leader-elector# (zk-leader-elector *STORM-CONF*)
 +    leader-nimbus# (.getLeader leader-elector#)
 +    host# (.getHost leader-nimbus#)
 +    port# (.getPort leader-nimbus#)
 +    no-op# (.close leader-elector#)]
 +  (thrift/with-nimbus-connection
 +     [~nimbus-sym host# port#]
 +     ~@body)))
  
+ (defn assert-authorized-user
+   ([servlet-request op]
+     (assert-authorized-user servlet-request op nil))
+   ([servlet-request op topology-conf]
+      (if http-creds-handler (.populateContext http-creds-handler 
(ReqContext/context) servlet-request))
+      (if *UI-ACL-HANDLER*
+        (let [context (ReqContext/context)]
+          (if-not (.permit *UI-ACL-HANDLER* context op topology-conf)
+            (let [principal (.principal context)
+                  user (if principal (.getName principal) "unknown")]
+              (throw (AuthorizationException.
+                      (str "UI request '" op "' for '"
+                           user "' user is not authorized")))))))))
+ 
  (defn get-filled-stats
    [summs]
    (->> summs
@@@ -268,13 -282,11 +288,14 @@@
                (bolt-comp-summs id))]
      (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) 
ret)))
  
- (defn worker-log-link [host port]
-   (url-format "http://%s:%s/log?file=worker-%s.log";
-               host (*STORM-CONF* LOGVIEWER-PORT) port))
+ (defn worker-log-link [host port topology-id]
+   (let [fname (logs-filename topology-id port)]
+     (url-format (str "http://%s:%s/log?file=%s";)
+           host (*STORM-CONF* LOGVIEWER-PORT) fname)))
  
 +(defn nimbus-log-link [host port]
 +  (url-format "http://%s:%s/log?file=nimbus.log"; host (*STORM-CONF* 
LOGVIEWER-PORT) port))
 +
  (defn compute-executor-capacity
    [^ExecutorSummary e]
    (let [stats (.get_stats e)
@@@ -503,23 -517,8 +526,23 @@@
          "slotsUsed"  used-slots
          "slotsFree" free-slots
          "executorsTotal" total-executors
-         "tasksTotal" total-tasks})))
+         "tasksTotal" total-tasks })))
  
 +(defn nimbus-summary
 +  ([]
 +    (let [leader-elector (zk-leader-elector *STORM-CONF*)
 +          nimbus-hosts (.getAllNimbuses leader-elector)
 +          no-op (.close leader-elector)]
 +      (nimbus-summary nimbus-hosts)))
 +  ([nimbuses]
 +    {"nimbuses"
 +     (for [^NimbusInfo n nimbuses]
 +       {
 +        "host" (.getHost n)
 +        "port" (.getPort n)
 +        "nimbusLogLink" (nimbus-log-link (.getHost n) (.getPort n))
 +        "isLeader" (.isLeader n)})}))
 +
  (defn supervisor-summary
    ([]
     (with-nimbus nimbus
@@@ -867,21 -880,28 +904,31 @@@
    (GET "/api/v1/cluster/configuration" [& m]
         (json-response (cluster-configuration)
                        (:callback m) :serialize-fn identity))
-   (GET "/api/v1/cluster/summary" [& m]
-        (json-response (cluster-summary) (:callback m)))
+   (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
+        (let [user (.getUserName http-creds-handler servlet-request)]
+          (assert-authorized-user servlet-request "getClusterInfo")
+          (json-response (cluster-summary user) (:callback m))))
 +  (GET "/api/v1/nimbus/summary" [& m]
++    (assert-authorized-user servlet-request "getClusterInfo")
 +    (json-response (nimbus-summary) (:callback m)))
-   (GET "/api/v1/supervisor/summary" [& m]
+   (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & 
m]
+        (assert-authorized-user servlet-request "getClusterInfo")
         (json-response (supervisor-summary) (:callback m)))
-   (GET "/api/v1/topology/summary" [& m]
+   (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
+        (assert-authorized-user servlet-request "getClusterInfo")
         (json-response (all-topologies-summary) (:callback m)))
-   (GET  "/api/v1/topology/:id" [id & m]
-           (json-response (topology-page id (:window m) (check-include-sys? 
(:sys m))) (:callback m)))
+   (GET  "/api/v1/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
+         (let [user (.getUserName http-creds-handler servlet-request)]
+           (assert-authorized-user servlet-request "getTopology" 
(topology-config id))
+           (json-response (topology-page id (:window m) (check-include-sys? 
(:sys m)) user) (:callback m))))
    (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies 
servlet-request]} id & m]
-        (json-response (mk-visualization-data id (:window m) 
(check-include-sys? (:sys m))) (:callback m)))
-   (GET "/api/v1/topology/:id/component/:component" [id component & m]
-         (json-response (component-page id component (:window m) 
(check-include-sys? (:sys m))) (:callback m)))
-   (POST "/api/v1/topology/:id/activate" [id]
+         (assert-authorized-user servlet-request "getTopology" 
(topology-config id))
+         (json-response (mk-visualization-data id (:window m) 
(check-include-sys? (:sys m))) (:callback m)))
+   (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies 
servlet-request]} id component & m]
+        (let [user (.getUserName http-creds-handler servlet-request)]
+          (assert-authorized-user servlet-request "getTopology" 
(topology-config id))
+          (json-response (component-page id component (:window m) 
(check-include-sys? (:sys m)) user) (:callback m))))
+   (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies 
servlet-request]} id]
      (with-nimbus nimbus
        (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
              name (.get_name tplg)]

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/zookeeper.clj
index a675990,eea71be..a7d43ac
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@@ -214,91 -211,3 +215,91 @@@
  (defn shutdown-inprocess-zookeeper
    [handle]
    (.shutdown handle))
 +
 +(defn- to-NimbusInfo [^Participant participant]
 +  (let
 +    [id (if (clojure.string/blank? (.getId participant))
 +          (throw (RuntimeException. "No nimbus leader participant host found, 
have you started your nimbus hosts?"))
 +          (.getId participant))
 +     server (first (.split id ":"))
 +     port (Integer/parseInt (last (.split id ":")))
 +     is-leader (.isLeader participant)]
 +    (NimbusInfo. server port is-leader)))
 +
 +(defn leader-latch-listener-impl
 +  "Leader latch listener that will be invoked when we either gain or lose 
leadership"
 +  [conf zk leader-latch]
 +  (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
 +        STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
 +    (reify LeaderLatchListener
 +      (^void isLeader[this]
 +        (log-message (str hostname "gained leadership, checking if it has all 
the topology code locally."))
 +        (let [active-topology-ids (set (get-children zk STORMS-ROOT false))
 +              local-topology-ids (set (.list (File. (master-stormdist-root 
conf))))
 +              diff-topology (first (set-delta active-topology-ids 
local-topology-ids))]
 +        (log-message "active-topology-ids [" (clojure.string/join "," 
active-topology-ids)
 +                          "] local-topology-ids [" (clojure.string/join "," 
local-topology-ids)
 +                          "] diff-topology [" (clojure.string/join "," 
diff-topology) "]")
 +        (if (empty? diff-topology)
 +          (log-message " Accepting leadership, all active topology found 
localy.")
 +          (do
 +            (log-message " code for all active topologies not available 
locally, giving up leadership.")
 +            (.close leader-latch)))))
 +      (^void notLeader[this]
 +        (log-message (str hostname " lost leadership."))))))
 +
 +(defn zk-leader-elector
 +  "Zookeeper Implementation of ILeaderElector."
 +  [conf]
 +  (let [servers (conf STORM-ZOOKEEPER-SERVERS)
 +        zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf conf)
 +        leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
 +        id (str (.getCanonicalHostName (InetAddress/getLocalHost)) ":" (conf 
NIMBUS-THRIFT-PORT))
 +        leader-latch (atom (LeaderLatch. zk leader-lock-path id))
 +        leader-latch-listener (atom (leader-latch-listener-impl conf zk 
@leader-latch))
 +        ]
 +    (reify ILeaderElector
 +      (prepare [this conf]
 +        (log-message "no-op for zookeeper implementation"))
 +
 +      (^void addToLeaderLockQueue [this]
 +        (let [state (.getState @leader-latch)]
 +        ;if this latch is already closed, we need to create new instance.
 +        (if (.equals LeaderLatch$State/CLOSED state)
 +          (do
 +            (reset! leader-latch (LeaderLatch. zk leader-lock-path id))
 +            (reset! leader-latch-listener (leader-latch-listener-impl conf zk 
@leader-latch))
 +            (log-message "LeaderLatch was in closed state. Resetted the 
leaderLatch and listeners.")
 +            ))
 +
 +        ;Only if the latch is not already started we invoke start.
 +        (if (.equals LeaderLatch$State/LATENT state)
 +          (do
 +            (.addListener @leader-latch @leader-latch-listener)
 +            (.start @leader-latch)
 +            (log-message "Queued up for leader lock."))
 +          (log-message "Node already in queue for leader lock."))))
 +
 +      (^void removeFromLeaderLockQueue [this]
 +        ;Only started latches can be closed.
 +        (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch))
 +          (do
 +            (.close @leader-latch)
 +            (log-message "Removed from leader lock queue."))
 +          (log-message "leader latch is not started so no 
removeFromLeaderLockQueue needed.")))
 +
 +      (^boolean isLeader [this]
 +        (.hasLeadership @leader-latch))
 +
 +      (^NimbusInfo getLeader [this]
 +        (to-NimbusInfo (.getLeader @leader-latch)))
 +
 +      (^List getAllNimbuses [this]
 +        (let [participants (.getParticipants @leader-latch)]
 +          (map (fn [^Participant participant]
 +                 (to-NimbusInfo participant))
 +            participants)))
 +
 +      (^void close[this]
 +        (log-message "closing zookeeper connection of leader elector.")
-         (.close zk)))))
++        (.close zk)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 7a140b6,c680354..16216ea
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -227,6 -283,36 +283,30 @@@ public class Config extends HashMap<Str
      public static final Object STORM_ID_SCHEMA = String.class;
  
      /**
+      * The number of times to retry a Nimbus operation.
+      */
+     public static final String 
STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
+     public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
+ 
+     /**
+      * The starting interval between exponential backoff retries of a Nimbus 
operation.
+      */
+     public static final String 
STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis";
+     public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA = 
Number.class;
+ 
+     /**
+      * The ceiling of the interval between retries of a client connect to 
Nimbus operation.
+      */
+     public static final String 
STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis";
+     public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA = 
Number.class;
+ 
+     /**
 -     * The host that the master server is running on.
 -     */
 -    public static final String NIMBUS_HOST = "nimbus.host";
 -    public static final Object NIMBUS_HOST_SCHEMA = String.class;
 -
 -    /**
+      * The Nimbus transport plug-in for Thrift client/server communication
+      */
+     public static final String NIMBUS_THRIFT_TRANSPORT_PLUGIN = 
"nimbus.thrift.transport";
+     public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = 
String.class;
+ 
+     /**
       * Which port the Thrift interface of Nimbus should run on. Clients should
       * connect to this port to upload jars and submit topologies.
       */
@@@ -863,67 -1224,38 +1224,97 @@@
       * to backtype.storm.scheduler.IsolationScheduler to make use of the 
isolation scheduler.
       */
      public static final String ISOLATION_SCHEDULER_MACHINES = 
"isolation.scheduler.machines";
-     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = 
Map.class;
+     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = 
ConfigValidation.MapOfStringToNumberValidator;
+ 
+     /**
+      * A map from the user name to the number of machines that should that 
user is allowed to use. Set storm.scheduler
+      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+      */
+     public static final String MULTITENANT_SCHEDULER_USER_POOLS = 
"multitenant.scheduler.user.pools";
+     public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = 
ConfigValidation.MapOfStringToNumberValidator;
+ 
+     /**
+      * The number of machines that should be used by this topology to isolate 
it from all others. Set storm.scheduler
+      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+      */
+     public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
+     public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = 
Number.class;
+ 
+     /**
+      * HDFS information, used to get the delegation token on behalf of the 
topology
+      * submitter user and renew the tokens. see {@link 
backtype.storm.security.auth.hadoop.AutoHDFS}
+      * kerberos principal name with realm should be provided.
+      */
+     public static final Object TOPOLOGY_HDFS_PRINCIPAL = "topology.hdfs.user";
+     public static final Object TOPOLOGY_HDFS_PRINCIPAL_SCHEMA = String.class;
+ 
+     /**
+      * The HDFS URI to be used by AutoHDFS.java to grab the delegation token 
on topology
+      * submitter user's behalf by the nimbus. If this is not provided the 
default URI provided
+      * in the hdfs configuration files will be used.
+      */
+     public static final Object TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
+     public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class;
  
 +    /**
 +     * Which implementation of {@link backtype.storm.nimbus.ICodeDistributor} 
should be used by storm for code
 +     * distribution.
 +     */
 +    public static final String STORM_CODE_DISTRIBUTOR_CLASS = 
"storm.codedistributor.class";
 +    public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = 
String.class;
 +
 +    /**
 +     * Which port the BitTorrent tracker should bind to.
 +     */
 +    public static final String BITTORRENT_PORT = "bittorrent.port";
 +    public static final Object BITTORRENT_PORT_SCHEMA = Number.class;
 +
 +    /**
 +     * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
 +     */
 +    public static final String BITTORRENT_MAX_UPLOAD_RATE = 
"bittorrent.max.upload.rate";
 +    public static final Object BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = 
Number.class;
 +
 +    /**
 +     * Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
 +     */
 +    public static final String BITTORRENT_MAX_DOWNLOAD_RATE = 
"bittorrent.max.download.rate";
 +    public static final Object BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = 
Number.class;
 +
 +    /**
 +     * Time in seconds that a supervisor should seed after completing a 
topology torrent download.
 +     * A value of 0 will disable seeding (download only). A value of -1 
indicates that the supervisor
 +     * should seed indefinitely (until the topology is killed).
 +     */
 +    public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = 
"supervisor.bittorrent.seed.duration";
 +    public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = 
Number.class;
 +
 +    /**
 +     * Minimum number of nimbus hosts where the code must be replicated 
before leader nimbus
 +     * is allowed to perform topology activation tasks like setting up 
heartbeats/assignments
 +     * and marking the topology as active. default is 0.
 +     */
 +    public static final String MIN_REPLICATION_COUNT = 
"min.replication.count";
 +    public static final Object MIN_REPLICATION_COUNT_SCHEMA = Number.class;
 +
 +    /**
 +     * Maximum wait time for the nimbus host replication to achieve the 
min.replication.count.
 +     * Once this time is elapsed nimbus will go ahead and perform topology 
activation tasks even
 +     * if required min.replication.count is not achieved. The default is 0 
seconds, a value of
 +     * -1 indicates to wait for ever.
 +     */
 +    public static final String MAX_REPLICATION_WAIT_TIME_SEC = 
"max.replication.wait.time.sec";
 +    public static final Object MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA = 
Number.class;
 +
 +
 +    /**
 +     * How often nimbus should wake the cleanup thread to clean the inbox.
 +     * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
 +     */
 +    public static final String NIMBUS_CODE_SYNC_FREQ_SECS = 
"nimbus.code.sync.freq.secs";
 +    public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 +
 +
      public static void setClasspath(Map conf, String cp) {
          conf.put(Config.TOPOLOGY_CLASSPATH, cp);
      }

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index b96e5b5,273e232..da10a4f
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -17,19 -17,15 +17,20 @@@
   */
  package backtype.storm.utils;
  
 -import backtype.storm.Config;
++
 +import backtype.storm.generated.Nimbus;
 +import backtype.storm.nimbus.ILeaderElector;
 +import backtype.storm.nimbus.NimbusInfo;
  import backtype.storm.security.auth.ThriftClient;
+ import backtype.storm.security.auth.ThriftConnectionType;
 -import backtype.storm.generated.Nimbus;
 -import java.util.Map;
 +import clojure.lang.IFn;
 +import clojure.lang.PersistentArrayMap;
  import org.apache.thrift.transport.TTransportException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import java.net.InetSocketAddress;
 +import java.util.Map;
 +
  public class NimbusClient extends ThriftClient {
      private Nimbus.Client _client;
      private static final Logger LOG = 
LoggerFactory.getLogger(NimbusClient.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index f483dc7,6e8458a..d392397
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -249,18 -255,8 +255,18 @@@ public class Utils 
          return ret;
      }
  
-     public static void downloadFromMaster(Map conf, String file, String 
localFile) throws IOException, TException {
+     public static void downloadFromMaster(Map conf, String file, String 
localFile) throws AuthorizationException, IOException, TException {
          NimbusClient client = NimbusClient.getConfiguredClient(conf);
 +        download(client, file, localFile);
 +    }
 +
 +    public static void downloadFromHost(Map conf, String file, String 
localFile, String host, int port) throws IOException, TException {
 +        //TODO : instead of null as last arg we probably need some real 
timeout, check what is the default and if its ok to reuse.
 +        NimbusClient client = new NimbusClient (conf, host, port, null);
 +        download(client, file, localFile);
 +    }
 +
 +    private static void download(NimbusClient client, String file, String 
localFile) throws IOException, TException {
          String id = client.getClient().beginFileDownload(file);
          WritableByteChannel out = Channels.newChannel(new 
FileOutputStream(localFile));
          while(true) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/index.html
index 4e74494,6fac19a..a0bb3d8
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@@ -59,9 -58,8 +61,9 @@@ $(document).ready(function() 
              });
          }
      });
-     var template = $.get("/templates/index-page-template.html");
+     var uiUser = $("#ui-user");
      var clusterSummary = $("#cluster-summary");
 +    var nimbusSummary = $("#nimbus-summary");
      var topologySummary = $("#topology-summary");
      var supervisorSummary = $("#supervisor-summary");
      var config = $("#nimbus-configuration");

http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------

Reply via email to