Merge remote-tracking branch 'upstream/master' into STORM-166
Conflicts:
conf/defaults.yaml
pom.xml
storm-core/src/clj/backtype/storm/cluster.clj
storm-core/src/clj/backtype/storm/daemon/nimbus.clj
storm-core/src/clj/backtype/storm/thrift.clj
storm-core/src/clj/backtype/storm/ui/core.clj
storm-core/src/jvm/backtype/storm/Config.java
storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
storm-core/test/clj/backtype/storm/nimbus_test.clj
storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
storm-core/test/clj/backtype/storm/utils_test.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b0da168
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b0da168
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b0da168
Branch: refs/heads/0.11.x-branch
Commit: 6b0da1689525507d2f40823abb50de3a4a1f9607
Parents: 88e70a8 23e630b
Author: Parth Brahmbhatt <[email protected]>
Authored: Tue Dec 16 18:14:32 2014 -0800
Committer: Parth Brahmbhatt <[email protected]>
Committed: Tue Dec 16 18:14:32 2014 -0800
----------------------------------------------------------------------
.gitignore | 1 +
CHANGELOG.md | 24 +-
DEVELOPER.md | 4 +
LICENSE | 14 +-
README.markdown | 4 +
SECURITY.md | 356 +-
STORM-UI-REST-API.md | 17 +-
bin/storm | 15 +-
bin/storm.cmd | 11 +-
conf/defaults.yaml | 45 +-
conf/jaas_digest.conf | 8 +-
conf/jaas_kerberos.conf | 15 +
doap_Storm.rdf | 57 +
docs/README.md | 38 +
docs/_config.yml | 11 +
docs/_includes/footer.html | 16 +
docs/_includes/head.html | 31 +
docs/_includes/header.html | 25 +
docs/_layouts/about.html | 40 +
docs/_layouts/default.html | 19 +
docs/_layouts/documentation.html | 16 +
docs/_layouts/page.html | 14 +
docs/_layouts/post.html | 15 +
docs/_posts/2012-08-02-storm080-released.md | 120 +
docs/_posts/2012-09-06-storm081-released.md | 47 +
docs/_posts/2013-01-11-storm082-released.md | 82 +
docs/_posts/2013-12-08-storm090-released.md | 127 +
docs/_posts/2014-04-10-storm-logo-contest.md | 66 +
docs/_posts/2014-04-17-logo-pforrest.md | 10 +
docs/_posts/2014-04-17-logo-squinones.md | 9 +
docs/_posts/2014-04-19-logo-ssuleman.md | 8 +
docs/_posts/2014-04-21-logo-rmarshall.md | 12 +
docs/_posts/2014-04-22-logo-zsayari.md | 9 +
docs/_posts/2014-04-23-logo-abartos.md | 15 +
docs/_posts/2014-04-27-logo-cboustead.md | 12 +
docs/_posts/2014-04-27-logo-sasili.md | 10 +
docs/_posts/2014-04-29-logo-jlee1.md | 10 +
docs/_posts/2014-04-29-logo-jlee2.md | 10 +
docs/_posts/2014-04-29-logo-jlee3.md | 10 +
docs/_posts/2014-05-27-round1-results.md | 38 +
docs/_posts/2014-06-17-contest-results.md | 24 +
docs/_posts/2014-06-25-storm092-released.md | 137 +
.../2014-10-20-storm093-release-candidate.md | 11 +
docs/_posts/2014-11-25-storm093-released.md | 164 +
docs/_sass/_syntax-highlighting.scss | 70 +
docs/about.md | 7 +
docs/about/deployment.md | 9 +
docs/about/fault-tolerant.md | 9 +
docs/about/free-and-open-source.md | 15 +
docs/about/guarantees-data-processing.md | 10 +
docs/about/integrates.md | 13 +
docs/about/multi-language.md | 9 +
docs/about/scalable.md | 10 +
docs/about/simple-api.md | 15 +
docs/assets/css/bootstrap-theme.css | 470 ++
docs/assets/css/bootstrap-theme.css.map | 1 +
docs/assets/css/bootstrap-theme.min.css | 5 +
docs/assets/css/bootstrap.css | 6332 ++++++++++++++++++
docs/assets/css/bootstrap.css.map | 1 +
docs/assets/css/bootstrap.min.css | 5 +
docs/assets/css/theme.css | 18 +
docs/assets/favicon.ico | Bin 0 -> 1150 bytes
.../fonts/glyphicons-halflings-regular.eot | Bin 0 -> 20335 bytes
.../fonts/glyphicons-halflings-regular.svg | 229 +
.../fonts/glyphicons-halflings-regular.ttf | Bin 0 -> 41280 bytes
.../fonts/glyphicons-halflings-regular.woff | Bin 0 -> 23320 bytes
docs/assets/js/bootstrap.js | 2320 +++++++
docs/assets/js/bootstrap.min.js | 7 +
docs/assets/js/npm.js | 13 +
docs/css/main.scss | 47 +
docs/doc-index.html | 11 +
.../Acking-framework-implementation.md | 38 +
docs/documentation/Clojure-DSL.md | 266 +
docs/documentation/Command-line-client.md | 102 +
docs/documentation/Common-patterns.md | 88 +
docs/documentation/Concepts.md | 117 +
docs/documentation/Configuration.md | 31 +
docs/documentation/Contributing-to-Storm.md | 33 +
.../Creating-a-new-Storm-project.md | 27 +
.../DSLs-and-multilang-adapters.md | 11 +
...Defining-a-non-jvm-language-dsl-for-storm.md | 38 +
docs/documentation/Distributed-RPC.md | 199 +
docs/documentation/Documentation.md | 52 +
docs/documentation/FAQ.md | 123 +
docs/documentation/Fault-tolerance.md | 30 +
.../Guaranteeing-message-processing.md | 181 +
docs/documentation/Home.md | 69 +
docs/documentation/Hooks.md | 9 +
docs/documentation/Implementation-docs.md | 20 +
.../Installing-native-dependencies.md | 38 +
docs/documentation/Kestrel-and-Storm.md | 200 +
docs/documentation/Lifecycle-of-a-topology.md | 82 +
docs/documentation/Local-mode.md | 29 +
docs/documentation/Maven.md | 56 +
.../Message-passing-implementation.md | 30 +
docs/documentation/Metrics.md | 36 +
docs/documentation/Multilang-protocol.md | 223 +
docs/documentation/Powered-By.md | 925 +++
docs/documentation/Project-ideas.md | 6 +
docs/documentation/Rationale.md | 33 +
...unning-topologies-on-a-production-cluster.md | 77 +
.../Serialization-(prior-to-0.6.0).md | 52 +
docs/documentation/Serialization.md | 62 +
docs/documentation/Serializers.md | 4 +
.../documentation/Setting-up-a-Storm-cluster.md | 85 +
.../Setting-up-a-Storm-project-in-Eclipse.md | 1 +
.../Setting-up-development-environment.md | 41 +
docs/documentation/Spout-implementations.md | 10 +
...guage-protocol-(versions-0.7.0-and-below).md | 124 +
docs/documentation/Structure-of-the-codebase.md | 142 +
.../Support-for-non-java-languages.md | 9 +
docs/documentation/Transactional-topologies.md | 361 +
docs/documentation/Trident-API-Overview.md | 312 +
docs/documentation/Trident-spouts.md | 44 +
docs/documentation/Trident-state.md | 331 +
docs/documentation/Trident-tutorial.md | 254 +
docs/documentation/Troubleshooting.md | 145 +
docs/documentation/Tutorial.md | 312 +
...nding-the-parallelism-of-a-Storm-topology.md | 123 +
.../Using-non-JVM-languages-with-Storm.md | 52 +
docs/documentation/images/ack_tree.png | Bin 0 -> 31463 bytes
docs/documentation/images/batched-stream.png | Bin 0 -> 66336 bytes
docs/documentation/images/drpc-workflow.png | Bin 0 -> 66199 bytes
.../images/eclipse-project-properties.png | Bin 0 -> 80810 bytes
.../images/example-of-a-running-topology.png | Bin 0 -> 81430 bytes
docs/documentation/images/grouping.png | Bin 0 -> 39701 bytes
.../images/ld-library-path-eclipse-linux.png | Bin 0 -> 114597 bytes
...onships-worker-processes-executors-tasks.png | Bin 0 -> 54804 bytes
docs/documentation/images/spout-vs-state.png | Bin 0 -> 24804 bytes
docs/documentation/images/storm-cluster.png | Bin 0 -> 34604 bytes
docs/documentation/images/topology-tasks.png | Bin 0 -> 45960 bytes
docs/documentation/images/topology.png | Bin 0 -> 23147 bytes
.../images/transactional-batches.png | Bin 0 -> 23293 bytes
.../images/transactional-commit-flow.png | Bin 0 -> 17725 bytes
.../images/transactional-design-2.png | Bin 0 -> 13537 bytes
.../images/transactional-spout-structure.png | Bin 0 -> 25067 bytes
docs/documentation/images/trident-to-storm1.png | Bin 0 -> 67173 bytes
docs/documentation/images/trident-to-storm2.png | Bin 0 -> 68943 bytes
docs/documentation/images/tuple-dag.png | Bin 0 -> 18849 bytes
docs/documentation/images/tuple_tree.png | Bin 0 -> 58186 bytes
docs/downloads.html | 155 +
docs/feed.xml | 30 +
docs/images/bullet.gif | Bin 0 -> 82 bytes
docs/images/download.png | Bin 0 -> 16272 bytes
docs/images/incubator-logo.png | Bin 0 -> 11651 bytes
.../logocontest/abartos/stationery_mockup.jpg | Bin 0 -> 146498 bytes
docs/images/logocontest/abartos/storm_logo.png | Bin 0 -> 153974 bytes
docs/images/logocontest/abartos/storm_logo2.png | Bin 0 -> 115425 bytes
docs/images/logocontest/abartos/storm_logo3.png | Bin 0 -> 94950 bytes
.../images/logocontest/cboustead/storm_logo.png | Bin 0 -> 67149 bytes
.../logocontest/cboustead/storm_logo1.png | Bin 0 -> 16327 bytes
docs/images/logocontest/jlee1/storm_logo.jpg | Bin 0 -> 189382 bytes
docs/images/logocontest/jlee2/storm_logo.jpg | Bin 0 -> 155666 bytes
docs/images/logocontest/jlee3/storm_logo.jpg | Bin 0 -> 158134 bytes
docs/images/logocontest/pforrest/storm1.png | Bin 0 -> 84569 bytes
.../pforrest/storm_logo_composite.png | Bin 0 -> 139223 bytes
.../rmarshall/StormLogo_Horizontal.png | Bin 0 -> 16481 bytes
.../rmarshall/StormLogo_Horizontal_NoColour.png | Bin 0 -> 14358 bytes
.../logocontest/rmarshall/StormLogo_Square.png | Bin 0 -> 14392 bytes
docs/images/logocontest/sasili/storm_logo.png | Bin 0 -> 92196 bytes
.../images/logocontest/squinones/storm_logo.png | Bin 0 -> 203263 bytes
.../logocontest/squinones/storm_logo1.png | Bin 0 -> 53325 bytes
docs/images/logocontest/ssuleman/storm_logo.png | Bin 0 -> 95509 bytes
docs/images/logocontest/storm_logo_winner.png | Bin 0 -> 34490 bytes
docs/images/logocontest/zsayari/storm_logo.png | Bin 0 -> 120794 bytes
docs/images/logos/8digits.png | Bin 0 -> 19557 bytes
docs/images/logos/Yahoo_Japan_logo.png | Bin 0 -> 3707 bytes
docs/images/logos/aeris.png | Bin 0 -> 6268 bytes
docs/images/logos/alibaba.jpg | Bin 0 -> 43703 bytes
docs/images/logos/baidu.jpeg | Bin 0 -> 3413 bytes
docs/images/logos/cerner.gif | Bin 0 -> 2591 bytes
docs/images/logos/flipboard.jpeg | Bin 0 -> 2909 bytes
docs/images/logos/fullcontact.png | Bin 0 -> 24567 bytes
docs/images/logos/groupon.jpg | Bin 0 -> 41413 bytes
docs/images/logos/holidaycheck.png | Bin 0 -> 3129 bytes
docs/images/logos/idexx.gif | Bin 0 -> 38689 bytes
docs/images/logos/mercadolibre.png | Bin 0 -> 73388 bytes
docs/images/logos/navisite.jpg | Bin 0 -> 9358 bytes
docs/images/logos/ooyala.gif | Bin 0 -> 7830 bytes
docs/images/logos/parc.png | Bin 0 -> 7256 bytes
docs/images/logos/quicklizard.png | Bin 0 -> 5667 bytes
docs/images/logos/rocketfuel.png | Bin 0 -> 9719 bytes
docs/images/logos/rubicon.png | Bin 0 -> 10663 bytes
docs/images/logos/spiderio.png | Bin 0 -> 46790 bytes
docs/images/logos/spotify.jpeg | Bin 0 -> 3282 bytes
docs/images/logos/taobao.gif | Bin 0 -> 3262 bytes
docs/images/logos/twitter.png | Bin 0 -> 4392 bytes
docs/images/logos/weatherchannel.gif | Bin 0 -> 3425 bytes
docs/images/logos/webmd.jpg | Bin 0 -> 6193 bytes
docs/images/logos/yelp.png | Bin 0 -> 98431 bytes
docs/images/mailinglist.png | Bin 0 -> 4245 bytes
docs/images/storm_header.png | Bin 0 -> 17291 bytes
docs/images/storm_logo_tagline_color copy.png | Bin 0 -> 67928 bytes
docs/images/storm_logo_tagline_color.png | Bin 0 -> 33568 bytes
docs/images/top_bg.gif | Bin 0 -> 113 bytes
docs/images/topology.png | Bin 0 -> 59837 bytes
docs/images/ui_topology_viz.png | Bin 0 -> 112831 bytes
docs/index.html | 104 +
docs/news.html | 12 +
.../multilang/resources/asyncSplitsentence.js | 18 +
.../multilang/resources/randomsentence.js | 18 +
.../multilang/resources/splitsentence.js | 18 +
.../storm-starter/multilang/resources/storm.js | 38 +-
.../storm-starter/multilang/resources/storm.py | 87 +-
.../storm-starter/multilang/resources/storm.rb | 90 +-
examples/storm-starter/pom.xml | 2 +-
.../src/jvm/storm/starter/util/StormRunner.java | 3 +-
external/storm-hbase/pom.xml | 5 +-
.../trident/mapper/TridentHBaseMapper.java | 18 +
.../hbase/trident/state/HBaseMapState.java | 17 +
external/storm-hdfs/pom.xml | 4 +-
.../hdfs/bolt/format/DefaultSequenceFormat.java | 17 +
.../storm/hdfs/bolt/format/SequenceFormat.java | 17 +
.../hdfs/bolt/rotation/TimedRotationPolicy.java | 17 +
.../hdfs/common/rotation/MoveFileAction.java | 17 +
.../hdfs/common/rotation/RotationAction.java | 17 +
.../apache/storm/hdfs/trident/HdfsState.java | 17 +
.../storm/hdfs/trident/HdfsStateFactory.java | 17 +
.../apache/storm/hdfs/trident/HdfsUpdater.java | 17 +
.../trident/format/DefaultSequenceFormat.java | 17 +
.../hdfs/trident/format/SequenceFormat.java | 17 +
.../trident/rotation/TimedRotationPolicy.java | 17 +
.../storm/hdfs/trident/FixedBatchSpout.java | 17 +
.../storm/hdfs/trident/TridentFileTopology.java | 17 +
.../hdfs/trident/TridentSequenceTopology.java | 17 +
external/storm-kafka/README.md | 13 +-
external/storm-kafka/pom.xml | 2 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 63 +-
.../src/jvm/storm/kafka/PartitionManager.java | 11 +-
.../jvm/storm/kafka/UpdateOffsetException.java | 22 +
.../src/test/storm/kafka/KafkaUtilsTest.java | 7 +-
logback/cluster.xml | 4 +-
logback/worker.xml | 41 +
pom.xml | 51 +-
.../maven-shade-clojure-transformer/pom.xml | 4 +-
storm-core/pom.xml | 307 +-
.../src/clj/backtype/storm/LocalCluster.clj | 4 +
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 4 +-
.../src/clj/backtype/storm/MockAutoCred.clj | 58 +
storm-core/src/clj/backtype/storm/bootstrap.clj | 5 +-
storm-core/src/clj/backtype/storm/cluster.clj | 171 +-
.../storm/command/upload_credentials.clj | 35 +
storm-core/src/clj/backtype/storm/config.clj | 40 +-
.../backtype/storm/daemon/builtin_metrics.clj | 21 +-
.../src/clj/backtype/storm/daemon/common.clj | 18 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 174 +-
.../src/clj/backtype/storm/daemon/executor.clj | 137 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 271 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 249 +-
.../clj/backtype/storm/daemon/supervisor.clj | 183 +-
.../src/clj/backtype/storm/daemon/task.clj | 7 +-
.../src/clj/backtype/storm/daemon/worker.clj | 109 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 11 +-
.../src/clj/backtype/storm/messaging/loader.clj | 16 +-
storm-core/src/clj/backtype/storm/testing.clj | 56 +-
storm-core/src/clj/backtype/storm/testing4j.clj | 28 +-
storm-core/src/clj/backtype/storm/thrift.clj | 24 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 174 +-
.../src/clj/backtype/storm/ui/helpers.clj | 69 +-
storm-core/src/clj/backtype/storm/util.clj | 95 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 21 +-
storm-core/src/clj/storm/trident/testing.clj | 1 +
.../src/dev/drpc-simple-acl-test-scenario.yaml | 11 +
storm-core/src/dev/resources/storm.js | 24 +
storm-core/src/dev/resources/storm.py | 25 +-
storm-core/src/dev/resources/storm.rb | 55 +-
storm-core/src/dev/resources/tester_bolt.js | 17 +
storm-core/src/dev/resources/tester_spout.js | 17 +
storm-core/src/jvm/backtype/storm/Config.java | 547 +-
.../jvm/backtype/storm/ConfigValidation.java | 156 +-
.../src/jvm/backtype/storm/Constants.java | 3 +-
.../backtype/storm/ICredentialsListener.java | 32 +
.../src/jvm/backtype/storm/ILocalCluster.java | 2 +
.../src/jvm/backtype/storm/StormSubmitter.java | 164 +-
.../storm/drpc/DRPCInvocationsClient.java | 91 +-
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 100 +-
.../jvm/backtype/storm/drpc/ReturnResults.java | 35 +-
.../storm/generated/AuthorizationException.java | 345 +
.../backtype/storm/generated/Credentials.java | 390 ++
.../storm/generated/DistributedRPC.java | 110 +-
.../generated/DistributedRPCInvocations.java | 352 +-
.../jvm/backtype/storm/generated/Nimbus.java | 3006 ++++++++-
.../backtype/storm/generated/SubmitOptions.java | 98 +-
.../backtype/storm/generated/TopologyInfo.java | 192 +-
.../storm/generated/TopologySummary.java | 192 +-
.../backtype/storm/messaging/netty/Client.java | 73 +-
.../storm/messaging/netty/ControlMessage.java | 4 +-
.../storm/messaging/netty/MessageDecoder.java | 32 +-
.../storm/messaging/netty/MessageEncoder.java | 4 +
.../storm/messaging/netty/SaslMessageToken.java | 99 +
.../storm/messaging/netty/SaslNettyClient.java | 166 +
.../messaging/netty/SaslNettyClientState.java | 31 +
.../storm/messaging/netty/SaslNettyServer.java | 165 +
.../messaging/netty/SaslNettyServerState.java | 31 +
.../messaging/netty/SaslStormClientHandler.java | 158 +
.../netty/SaslStormServerAuthorizeHandler.java | 83 +
.../messaging/netty/SaslStormServerHandler.java | 155 +
.../storm/messaging/netty/SaslUtils.java | 74 +
.../backtype/storm/messaging/netty/Server.java | 104 +-
.../netty/StormClientPipelineFactory.java | 12 +-
.../messaging/netty/StormServerHandler.java | 2 +-
.../netty/StormServerPipelineFactory.java | 20 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 13 +
.../scheduler/multitenant/DefaultPool.java | 219 +
.../storm/scheduler/multitenant/FreePool.java | 125 +
.../scheduler/multitenant/IsolatedPool.java | 346 +
.../multitenant/MultitenantScheduler.java | 98 +
.../storm/scheduler/multitenant/Node.java | 343 +
.../storm/scheduler/multitenant/NodePool.java | 296 +
.../storm/security/INimbusCredentialPlugin.java | 47 +
.../backtype/storm/security/auth/AuthUtils.java | 228 +-
.../auth/DefaultHttpCredentialsPlugin.java | 87 +
.../security/auth/DefaultPrincipalToLocal.java | 43 +
.../storm/security/auth/IAutoCredentials.java | 55 +
.../security/auth/ICredentialsRenewer.java | 41 +
.../auth/IGroupMappingServiceProvider.java | 42 +
.../security/auth/IHttpCredentialsPlugin.java | 50 +
.../storm/security/auth/IPrincipalToLocal.java | 41 +
.../storm/security/auth/ITransportPlugin.java | 14 +-
.../security/auth/KerberosPrincipalToLocal.java | 45 +
.../storm/security/auth/ReqContext.java | 10 +-
.../security/auth/SaslTransportPlugin.java | 44 +-
.../security/auth/ShellBasedGroupsMapping.java | 94 +
.../security/auth/SimpleTransportPlugin.java | 61 +-
.../security/auth/SingleUserPrincipal.java | 56 +
.../storm/security/auth/TBackoffConnect.java | 77 +
.../storm/security/auth/ThriftClient.java | 85 +-
.../security/auth/ThriftConnectionType.java | 77 +
.../storm/security/auth/ThriftServer.java | 19 +-
.../auth/authorizer/DRPCAuthorizerBase.java | 46 +
.../authorizer/DRPCSimpleACLAuthorizer.java | 157 +
.../auth/authorizer/DenyAuthorizer.java | 4 +-
.../auth/authorizer/NoopAuthorizer.java | 6 +-
.../auth/authorizer/SimpleACLAuthorizer.java | 131 +
.../authorizer/SimpleWhitelistAuthorizer.java | 70 +
.../auth/digest/DigestSaslTransportPlugin.java | 1 +
.../storm/security/auth/hadoop/AutoHDFS.java | 262 +
.../storm/security/auth/kerberos/AutoTGT.java | 281 +
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 108 +
.../kerberos/AutoTGTKrb5LoginModuleTest.java | 44 +
.../auth/kerberos/ClientCallbackHandler.java | 104 +
.../kerberos/KerberosSaslTransportPlugin.java | 206 +
.../auth/kerberos/ServerCallbackHandler.java | 86 +
.../auth/kerberos/jaas_kerberos_cluster.conf | 31 +
.../auth/kerberos/jaas_kerberos_launcher.conf | 12 +
.../jvm/backtype/storm/spout/ShellSpout.java | 65 +-
.../src/jvm/backtype/storm/task/ShellBolt.java | 211 +-
.../storm/testing/CompleteTopologyParam.java | 22 +-
.../testing/ForwardingMetricsConsumer.java | 95 +
.../storm/testing/PythonShellMetricsBolt.java | 17 +
.../storm/testing/PythonShellMetricsSpout.java | 17 +
.../testing/SingleUserSimpleTransport.java | 37 +
.../state/TestTransactionalState.java | 47 +
.../transactional/state/TransactionalState.java | 56 +-
.../storm/ui/InvalidRequestException.java | 17 +
.../jvm/backtype/storm/utils/DRPCClient.java | 63 +-
.../backtype/storm/utils/DisruptorQueue.java | 56 +-
.../jvm/backtype/storm/utils/LocalState.java | 44 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 10 +-
.../jvm/backtype/storm/utils/ShellUtils.java | 498 ++
.../src/jvm/backtype/storm/utils/TestUtils.java | 34 +
.../src/jvm/backtype/storm/utils/Utils.java | 130 +-
.../backtype/storm/utils/ZookeeperAuthInfo.java | 9 +-
.../storm/utils/ZookeeperServerCnxnFactory.java | 84 +
.../trident/drpc/ReturnResultsReducer.java | 13 +-
.../topology/state/TestTransactionalState.java | 47 +
.../topology/state/TransactionalState.java | 58 +-
storm-core/src/multilang/js/storm.js | 19 +-
storm-core/src/multilang/py/storm.py | 25 +-
storm-core/src/multilang/rb/storm.rb | 55 +-
.../src/native/worker-launcher/.autom4te.cfg | 42 +
.../worker-launcher/.deps/worker-launcher.Po | 1 +
.../src/native/worker-launcher/Makefile.am | 32 +
.../src/native/worker-launcher/configure.ac | 50 +
.../native/worker-launcher/impl/configuration.c | 340 +
.../native/worker-launcher/impl/configuration.h | 45 +
.../src/native/worker-launcher/impl/main.c | 210 +
.../worker-launcher/impl/worker-launcher.c | 779 +++
.../worker-launcher/impl/worker-launcher.h | 129 +
.../worker-launcher/test/test-worker-launcher.c | 340 +
storm-core/src/py/__init__.py | 16 +
storm-core/src/py/storm/DistributedRPC-remote | 18 +
storm-core/src/py/storm/DistributedRPC.py | 37 +-
.../py/storm/DistributedRPCInvocations-remote | 18 +
.../src/py/storm/DistributedRPCInvocations.py | 96 +-
storm-core/src/py/storm/Nimbus-remote | 25 +
storm-core/src/py/storm/Nimbus.py | 652 +-
storm-core/src/py/storm/__init__.py | 16 +
storm-core/src/py/storm/constants.py | 16 +
storm-core/src/py/storm/ttypes.py | 1243 +++-
storm-core/src/storm.thrift | 58 +-
storm-core/src/ui/public/component.html | 21 +-
storm-core/src/ui/public/css/style.css | 9 +
storm-core/src/ui/public/favicon.ico | Bin 0 -> 18280 bytes
storm-core/src/ui/public/index.html | 12 +-
storm-core/src/ui/public/js/script.js | 3 +-
.../public/templates/anti-forgery-template.html | 19 +
.../public/templates/index-page-template.html | 12 +
.../templates/topology-page-template.html | 23 +-
.../src/ui/public/templates/user-template.html | 25 +
storm-core/src/ui/public/topology.html | 21 +-
.../test/clj/backtype/storm/cluster_test.clj | 93 +-
.../test/clj/backtype/storm/config_test.clj | 11 +
.../test/clj/backtype/storm/drpc_test.clj | 14 +-
.../clj/backtype/storm/local_state_test.clj | 14 +-
.../test/clj/backtype/storm/logviewer_test.clj | 187 +
.../storm/messaging/netty_integration_test.clj | 4 +-
.../storm/messaging/netty_unit_test.clj | 32 +-
.../test/clj/backtype/storm/nimbus_test.clj | 375 +-
.../scheduler/multitenant_scheduler_test.clj | 831 +++
.../storm/security/auth/AuthUtils_test.clj | 16 +-
.../auth/DefaultHttpCredentialsPlugin_test.clj | 40 +
.../storm/security/auth/ThriftClient_test.clj | 28 +-
.../storm/security/auth/ThriftServer_test.clj | 8 +-
.../backtype/storm/security/auth/auth_test.clj | 373 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 226 +
.../security/auth/auto_login_module_test.clj | 91 +
.../storm/security/auth/drpc-auth-alice.jaas | 5 +
.../storm/security/auth/drpc-auth-bob.jaas | 5 +
.../storm/security/auth/drpc-auth-charlie.jaas | 5 +
.../storm/security/auth/drpc-auth-server.jaas | 6 +
.../storm/security/auth/drpc_auth_test.clj | 315 +
.../storm/security/auth/nimbus_auth_test.clj | 181 +
.../test/clj/backtype/storm/submitter_test.clj | 75 +
.../test/clj/backtype/storm/supervisor_test.clj | 180 +-
.../test/clj/backtype/storm/testing4j_test.clj | 32 +
.../clj/backtype/storm/transactional_test.clj | 27 +-
.../utils/ZookeeperServerCnxnFactory_test.clj | 35 +
.../test/clj/backtype/storm/utils_test.clj | 56 +-
.../test/clj/storm/trident/state_test.clj | 25 +-
.../storm/utils/DisruptorQueueTest.java | 25 +-
storm-dist/binary/LICENSE | 15 +-
storm-dist/binary/pom.xml | 2 +-
storm-dist/binary/src/main/assembly/binary.xml | 4 +
storm-dist/source/pom.xml | 2 +-
435 files changed, 39442 insertions(+), 1664 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index b875323,d03c2c9..7757b24
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -35,17 -37,22 +37,27 @@@ storm.zookeeper.auth.password: nul
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
+ storm.principal.tolocal:
"backtype.storm.security.auth.DefaultPrincipalToLocal"
+ storm.group.mapping.service:
"backtype.storm.security.auth.ShellBasedGroupsMapping"
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
+ storm.nimbus.retry.times: 5
+ storm.nimbus.retry.interval.millis: 2000
+ storm.nimbus.retry.intervalceiling.millis: 60000
+ storm.auth.simple-white-list.users: []
+ storm.auth.simple-acl.users: []
+ storm.auth.simple-acl.users.commands: []
+ storm.auth.simple-acl.admins: []
storm.meta.serialization.delegate:
"backtype.storm.serialization.DefaultSerializationDelegate"
+storm.codedistributor.class:
"backtype.storm.torrent.BitTorrentCodeDistributor"
+
+### bittorrent configuration
+bittorrent.port: 6969
+bittorrent.max.upload.rate: 0.0
+bittorrent.max.download.rate: 0.0
### nimbus.* configs are for the master
-nimbus.host: "localhost"
nimbus.thrift.port: 6627
+ nimbus.thrift.threads: 64
nimbus.thrift.max_buffer_size: 1048576
nimbus.childopts: "-Xmx1024m"
nimbus.task.timeout.secs: 30
@@@ -58,8 -64,7 +70,9 @@@ nimbus.task.launch.secs: 12
nimbus.reassign: true
nimbus.file.copy.expiration.secs: 600
nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
+min.replication.count: 0
+max.replication.wait.time.sec: 0
+ nimbus.credential.renewers.freq.secs: 600
### ui.* configs are for the master
ui.port: 8080
@@@ -98,13 -121,16 +129,16 @@@ supervisor.monitor.frequency.secs:
#how frequently the supervisor heartbeats to the cluster state (for nimbus)
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true
+supervisor.bittorrent.seed.duration: 0
+ supervisor.supervisors: []
+ supervisor.supervisors.commands: []
-
### worker.* configs are for task workers
worker.childopts: "-Xmx768m"
+ worker.gc.childopts: ""
worker.heartbeat.frequency.secs: 1
- # control how many worker receiver threads we need per worker
+ # control how many worker receiver threads we need per worker
topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 2fcac56,5645b9a..e075ac0
--- a/pom.xml
+++ b/pom.xml
@@@ -208,8 -209,10 +209,10 @@@
<reply.version>0.3.0</reply.version>
<conjure.version>2.1.3</conjure.version>
<zookeeper.version>3.4.6</zookeeper.version>
+ <bittorrent.version>1.4</bittorrent.version>
-
+ <conjure.version>2.1.3</conjure.version>
+ <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
+ <clojure-contrib.version>1.2.0</clojure-contrib.version>
-
</properties>
<profiles>
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 3db84ff,8ead710..840575b
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -160,14 -172,14 +175,16 @@@
(def SUPERVISORS-ROOT "supervisors")
(def WORKERBEATS-ROOT "workerbeats")
(def ERRORS-ROOT "errors")
+(def CODE-DISTRIBUTOR-ROOT "code-distributor")
+ (def CREDENTIALS-ROOT "credentials")
(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
(def STORMS-SUBTREE (str "/" STORMS-ROOT))
(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
+(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+ (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
(defn supervisor-path
[id]
@@@ -251,22 -263,22 +272,24 @@@
supervisors-callback (atom nil)
assignments-callback (atom nil)
storm-base-callback (atom {})
+ code-distributor-callback (atom nil)
+ credentials-callback (atom {})
state-id (register
- cluster-state
- (fn [type path]
- (let [[subtree & args] (tokenize-path path)]
- (condp = subtree
+ cluster-state
+ (fn [type path]
+ (let [[subtree & args] (tokenize-path path)]
+ (condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
- (issue-callback!
assignments-callback)
- (issue-map-callback!
assignment-info-callback (first args)))
+ (issue-callback!
assignments-callback)
+ (issue-map-callback!
assignment-info-callback (first args)))
SUPERVISORS-ROOT (issue-callback!
supervisors-callback)
+ CODE-DISTRIBUTOR-ROOT (issue-callback!
code-distributor-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback
(first args))
+ CREDENTIALS-ROOT (issue-map-callback!
credentials-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree "
subtree args)))))]
- (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE
WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
+ (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE
WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE]]
- (mkdirs cluster-state p))
+ (mkdirs cluster-state p acls))
(reify
StormClusterState
@@@ -404,49 -407,56 +428,62 @@@
(set-assignment!
[this storm-id info]
- (set-data cluster-state (assignment-path storm-id) (Utils/serialize
info)))
+ (set-data cluster-state (assignment-path storm-id) (Utils/serialize
info) acls))
+ (setup-code-distributor!
+ [this storm-id info]
- (mkdirs cluster-state (code-distributor-path storm-id))
- (mkdirs cluster-state (str (code-distributor-path storm-id) "/"
info)))
++ (mkdirs cluster-state (code-distributor-path storm-id) acl)
++ (mkdirs cluster-state (str (code-distributor-path storm-id) "/" info)
acl))
+
(remove-storm!
[this storm-id]
(delete-node cluster-state (assignment-path storm-id))
+ (delete-node cluster-state (code-distributor-path storm-id))
+ (delete-node cluster-state (credentials-path storm-id))
(remove-storm-base! this storm-id))
+ (set-credentials!
+ [this storm-id creds topo-conf]
+ (let [topo-acls (mk-topo-only-acls topo-conf)
+ path (credentials-path storm-id)]
+ (set-data cluster-state path (Utils/serialize creds) topo-acls)))
+
+ (credentials
+ [this storm-id callback]
+ (when callback
+ (swap! credentials-callback assoc storm-id callback))
+ (maybe-deserialize (get-data cluster-state (credentials-path
storm-id) (not-nil? callback))))
+
(report-error
- [this storm-id component-id node port error]
- (let [path (error-path storm-id component-id)
- data {:time-secs (current-time-secs) :error (stringify-error
error) :host node :port port}
- _ (mkdirs cluster-state path)
- _ (create-sequential cluster-state (str path "/e")
(Utils/serialize data))
- to-kill (->> (get-children cluster-state path false)
- (sort-by parse-error-path)
- reverse
- (drop 10))]
- (doseq [k to-kill]
- (delete-node cluster-state (str path "/" k)))))
+ [this storm-id component-id node port error]
+ (let [path (error-path storm-id component-id)
+ data {:time-secs (current-time-secs) :error (stringify-error
error) :host node :port port}
+ _ (mkdirs cluster-state path acls)
+ _ (create-sequential cluster-state (str path "/e")
(Utils/serialize data) acls)
+ to-kill (->> (get-children cluster-state path false)
+ (sort-by parse-error-path)
+ reverse
+ (drop 10))]
+ (doseq [k to-kill]
+ (delete-node cluster-state (str path "/" k)))))
(errors
- [this storm-id component-id]
- (let [path (error-path storm-id component-id)
- _ (mkdirs cluster-state path)
- children (get-children cluster-state path false)
- errors (dofor [c children]
- (let [data (-> (get-data cluster-state (str path
"/" c) false)
- maybe-deserialize)]
- (when data
- (struct TaskError (:error data) (:time-secs
data) (:host data) (:port data))
- )))
- ]
- (->> (filter not-nil? errors)
- (sort-by (comp - :time-secs)))))
-
+ [this storm-id component-id]
+ (let [path (error-path storm-id component-id)
+ errors (if (exists-node? cluster-state path false)
+ (dofor [c (get-children cluster-state path false)]
+ (let [data (-> (get-data cluster-state (str path
"/" c) false)
+ maybe-deserialize)]
+ (when data
+ (struct TaskError (:error data) (:time-secs
data) (:host data) (:port data))
+ )))
+ ())
+ ]
+ (->> (filter not-nil? errors)
+ (sort-by (comp - :time-secs)))))
+
(disconnect
- [this]
+ [this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 2afcbec,1fbf7f0..9f5b1a6
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -14,20 -14,19 +14,21 @@@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.nimbus
- (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
- (:import [org.apache.thrift.protocol TBinaryProtocol
TBinaryProtocol$Factory])
- (:import [org.apache.thrift.exception])
- (:import [org.apache.thrift.transport TNonblockingServerTransport
TNonblockingServerSocket])
- (:import [java.nio ByteBuffer])
+ (:import [java.nio ByteBuffer]
+ [java.util Collections])
(:import [java.io FileNotFoundException])
+ (:import [java.net InetAddress])
(:import [java.nio.channels Channels WritableByteChannel])
+ (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType
ReqContext AuthUtils])
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot
TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl
DefaultScheduler ExecutorDetails])
+ (:use [backtype.storm bootstrap util zookeeper])
+ (:import [backtype.storm.generated AuthorizationException])
+ (:use [backtype.storm bootstrap util])
(:use [backtype.storm.config :only [validate-configs-with-schemas]])
(:use [backtype.storm.daemon common])
+ (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
(:gen-class
:methods [^{:static true} [launch [backtype.storm.scheduler.INimbus]
void]]))
@@@ -60,31 -59,22 +61,40 @@@
scheduler
))
+(defmulti mk-bt-tracker cluster-mode)
+(defmulti sync-code cluster-mode)
+
+;;TODO we should try genclass for zkLeaderElector and just set
NIMBUS-LEADER-ELECTOR-CLASS in defaults.yaml
+(defn mk-leader-elector [conf]
+ (if (conf NIMBUS-LEADER-ELECTOR-CLASS)
+ (do (log-message "Using custom Leade elector: " (conf
NIMBUS-LEADER-ELECTOR-CLASS))
+ (-> (conf NIMBUS-LEADER-ELECTOR-CLASS) new-instance))
+ (zk-leader-elector conf)))
+
+(defnk is-leader [nimbus :throw-exception true]
+ (let [leader-elector (:leader-elector nimbus)]
+ (if (.isLeader leader-elector) true
+ (if throw-exception
+ (let [leader-address (.getLeader leader-elector)]
+ (throw (RuntimeException. (str "not a leader, current leader is "
leader-address))))))))
+
+ (def NIMBUS-ZK-ACLS
+ [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+ (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE)
ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
+ :host-port-info (str (.getCanonicalHostName (InetAddress/getLocalHost))
":" (conf NIMBUS-THRIFT-PORT))
:inimbus inimbus
+ :authorization-handler (mk-authorization-handler (conf
NIMBUS-AUTHORIZER) conf)
:submitted-count (atom 0)
- :storm-cluster-state (cluster/mk-storm-cluster-state conf)
+ :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
+
(Utils/isZkAuthenticationConfiguredStormServer
+ conf)
+
NIMBUS-ZK-ACLS))
:submit-lock (Object.)
+ :cred-update-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
@@@ -95,8 -85,9 +105,11 @@@
(exit-process! 20 "Error when processing an
event")
))
:scheduler (mk-scheduler conf inimbus)
+ :leader-elector (mk-leader-elector conf)
+ :bt-tracker (mk-bt-tracker conf)
+ :id->sched-status (atom {})
+ :cred-renewers (AuthUtils/GetCredentialRenewers conf)
+ :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
}))
(defn inbox [nimbus]
@@@ -321,8 -312,9 +334,9 @@@
[(.getNodeId slot) (.getPort slot)]
)))
-(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
+(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf
topology]
(let [stormroot (master-stormdist-root conf storm-id)]
+ (log-message "nimbus file location:" stormroot)
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
@@@ -944,15 -979,12 +1030,16 @@@
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
- (let [nimbus (nimbus-data conf inimbus)]
+ (let [nimbus (nimbus-data conf inimbus)
+ principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
conf)
+ (.addToLeaderLockQueue (:leader-elector nimbus))
(cleanup-corrupt-topologies! nimbus)
- (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
- (transition! nimbus storm-id :startup))
+ ;register call back for code-distributor
+ (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf
nimbus)))
+ (when (is-leader nimbus :throw-exception false)
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+ (transition! nimbus storm-id :startup)))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -968,23 -1000,21 +1055,29 @@@
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
(fn []
(clean-inbox (inbox nimbus) (conf
NIMBUS-INBOX-JAR-EXPIRATION-SECS))
- ))
+ ))
-
++ ;;schedule nimbus code sync thread to sync code from other nimbuses.
+ (schedule-recurring (:timer nimbus)
+ 0
+ (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+ (fn []
+ (sync-code conf nimbus)
+ ))
+
+ (schedule-recurring (:timer nimbus)
+ 0
+ (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+ (fn []
+ (renew-credentials nimbus)))
-
(reify Nimbus$Iface
(^void submitTopologyWithOpts
[this ^String storm-name ^String uploadedJarLocation ^String
serializedConf ^StormTopology topology
^SubmitOptions submitOptions]
(try
+ (is-leader nimbus)
(assert (not-nil? submitOptions))
(validate-topology-name! storm-name)
+ (check-authorization! nimbus storm-name nil "submitTopology")
(check-storm-active! nimbus storm-name false)
(let [topo-conf (from-json serializedConf)]
(try
@@@ -997,25 -1027,48 +1090,50 @@@
topology))
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-"
(current-time-secs))
- storm-conf (normalize-conf
+ credentials (.get_creds submitOptions)
+ credentials (when credentials (.get_creds credentials))
+ topo-conf (from-json serializedConf)
+ storm-conf-submitted (normalize-conf
conf
- (-> serializedConf
- from-json
- (assoc STORM-ID storm-id)
+ (-> topo-conf
+ (assoc STORM-ID storm-id)
(assoc TOPOLOGY-NAME storm-name))
topology)
+ req (ReqContext/context)
+ principal (.principal req)
+ submitter-principal (if principal (.toString principal))
+ submitter-user (.toLocal principal-to-local principal)
+ topo-acl (distinct (remove nil? (conj (.get
storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user)))
+ storm-conf (-> storm-conf-submitted
+ (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if
submitter-principal submitter-principal ""))
+ (assoc TOPOLOGY-SUBMITTER-USER (if
submitter-user submitter-user "")) ;Don't let the user set who we launch as
+ (assoc TOPOLOGY-USERS topo-acl)
+ (assoc STORM-ZOOKEEPER-SUPERACL (.get conf
STORM-ZOOKEEPER-SUPERACL)))
+ storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer
conf)
+ storm-conf
+ (dissoc storm-conf
STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
total-storm-conf (merge conf storm-conf)
topology (normalize-topology total-storm-conf topology)
-
- storm-cluster-state (:storm-cluster-state nimbus)]
+ storm-cluster-state (:storm-cluster-state nimbus)
+ host-port-info (:host-port-info nimbus) ]
+ (when credentials (doseq [nimbus-autocred-plugin
(:nimbus-autocred-plugins nimbus)]
+ (.populateCredentials nimbus-autocred-plugin credentials
(Collections/unmodifiableMap storm-conf))))
+ (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil?
submitter-user) (.isEmpty (.trim submitter-user))))
+ (throw (AuthorizationException. "Could not determine the user
to run this topology as.")))
(system-topology! total-storm-conf topology) ;; this validates
the structure of the topology
+ (validate-topology-size topo-conf conf topology)
+ (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
+ (not (Utils/isZkAuthenticationConfiguredTopology
storm-conf)))
+ (throw (IllegalArgumentException. "The cluster is configured
for zookeeper authentication, but no payload was provided.")))
(log-message "Received topology submission for " storm-name "
with conf " storm-conf)
;; lock protects against multiple topologies being submitted at
once and
;; cleanup thread killing topology in b/w assignment and starting
the topology
(locking (:submit-lock nimbus)
+ ;;cred-update-lock is not needed here because creds are being
added for the first time.
+ (.set-credentials! storm-cluster-state storm-id credentials
storm-conf)
- (setup-storm-code conf storm-id uploadedJarLocation storm-conf
topology)
+ (setup-storm-code nimbus conf storm-id uploadedJarLocation
storm-conf topology)
+ (.setup-code-distributor! storm-cluster-state storm-id
host-port-info)
+ (wait-for-desired-code-replication nimbus conf storm-id)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE
:inactive
TopologyInitialStatus/ACTIVE
:active}]
@@@ -1207,9 -1303,6 +1368,8 @@@
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
+ (.close (:leader-elector nimbus))
- (log-message (:bt-tracker nimbus))
+ (if (:bt-tracker nimbus) (.close (:bt-tracker nimbus) (:conf nimbus)))
(log-message "Shut down master")
)
DaemonCommon
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 5de19c4,d82fd12..c96dd88
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -14,10 -14,10 +14,11 @@@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
+ (:import [java.io OutputStreamWriter BufferedWriter IOException])
(:import [backtype.storm.scheduler ISupervisor]
[java.net JarURLConnection]
- [java.net URI])
+ [java.net URI]
+ [java.io File])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
@@@ -264,12 -335,10 +338,13 @@@
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
+ (if (:bt-tracker supervisor)
+ (.cleanup (:bt-tracker supervisor) id))
))
+
(doseq [id (vals new-worker-ids)]
- (local-mkdirs (worker-pids-root conf id)))
+ (local-mkdirs (worker-pids-root conf id))
+ (local-mkdirs (worker-heartbeats-root conf id)))
(.put local-state LS-APPROVED-WORKERS
(merge
(select-keys (.get local-state LS-APPROVED-WORKERS)
@@@ -457,29 -526,49 +532,56 @@@
(.shutdown supervisor)
)
- ;; distributed implementation
+ (defn setup-storm-code-dir [conf storm-conf dir]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER)
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+ ;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir]
+ :distributed [conf storm-id master-code-dir supervisor]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
- stormroot (supervisor-stormdist-root conf storm-id)]
+ stormroot (supervisor-stormdist-root conf storm-id)
+ master-meta-file-path (master-storm-metafile-path master-code-dir)
+ supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
(FileUtils/forceMkdir (File. tmproot))
-
- (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir)
(supervisor-stormjar-path tmproot))
- (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir)
(supervisor-stormcode-path tmproot))
- (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir)
(supervisor-stormconf-path tmproot))
+ (Utils/downloadFromMaster conf master-meta-file-path
supervisor-meta-file-path)
+ (if (:bt-tracker supervisor)
+ (.download (:bt-tracker supervisor) storm-id (File.
supervisor-meta-file-path)))
(extract-dir-from-jar (supervisor-stormjar-path tmproot)
RESOURCES-SUBDIR tmproot)
+ (if (.exists (File. stormroot)) (FileUtils/forceDelete (File.
stormroot)))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- ))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id)
stormroot)
+ ))
+
+ (defn write-log-metadata-to-yaml-file! [storm-id port data conf]
+ (let [file (get-log-metadata-file storm-id port)]
+ ;;run worker as user needs the directory to have special permissions
+ ;; or it is insecure
+ (when (and (not (conf SUPERVISOR-RUN-WORKER-AS-USER))
+ (not (.exists (.getParentFile file))))
+ (.mkdirs (.getParentFile file)))
+ (let [writer (java.io.FileWriter. file)
+ yaml (Yaml.)]
+ (try
+ (.dump yaml data writer)
+ (finally
+ (.close writer))))))
+
+ (defn write-log-metadata! [storm-conf user worker-id storm-id port conf]
+ (let [data {TOPOLOGY-SUBMITTER-USER user
+ "worker-id" worker-id
+ LOGS-USERS (sort (distinct (remove nil?
+ (concat
+ (storm-conf LOGS-USERS)
+ (storm-conf TOPOLOGY-USERS)))))}]
+ (write-log-metadata-to-yaml-file! storm-id port data conf)))
+(defmethod mk-bt-tracker :distributed [conf]
+ (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
+ (.prepare code-distributor conf)
+ code-distributor))
+
(defn jlp [stormroot conf]
(let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/thrift.clj
index 640401f,ce0a5ff..2b860b3
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@@ -26,9 -26,8 +26,8 @@@
(:import [backtype.storm.grouping CustomStreamGrouping])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.clojure RichShellBolt RichShellSpout])
- (:import [org.apache.thrift.protocol TBinaryProtocol TProtocol])
- (:import [org.apache.thrift.transport TTransport TFramedTransport TSocket])
+ (:import [org.apache.thrift.transport TTransport])
- (:use [backtype.storm util config log]))
+ (:use [backtype.storm util config log zookeeper]))
(defn instantiate-java-object
[^JavaObject obj]
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 75bf746,3dc7396..12a396e
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -18,12 -18,12 +18,12 @@@
(:use compojure.core)
(:use ring.middleware.reload)
(:use [hiccup core page-helpers])
- (:use [backtype.storm config util log])
+ (:use [backtype.storm config util log zookeeper])
(:use [backtype.storm.ui helpers])
- (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID
ACKER-INIT-STREAM-ID
- ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID system-id?]]])
- (:use [ring.adapter.jetty :only [run-jetty]])
- (:use [clojure.string :only [trim]])
+ (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID
ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
+ ACKER-FAIL-STREAM-ID system-id?
mk-authorization-handler]]])
+ (:use [ring.middleware.anti-forgery])
+ (:use [clojure.string :only [blank? lower-case trim]])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.generated ExecutorSpecificStats
ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
@@@ -43,15 -48,24 +49,29 @@@
(defmacro with-nimbus
[nimbus-sym & body]
- `(thrift/with-nimbus-connection
- [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF*
NIMBUS-THRIFT-PORT)]
- ~@body))
+ `(let [leader-elector# (zk-leader-elector *STORM-CONF*)
+ leader-nimbus# (.getLeader leader-elector#)
+ host# (.getHost leader-nimbus#)
+ port# (.getPort leader-nimbus#)
+ no-op# (.close leader-elector#)]
+ (thrift/with-nimbus-connection
+ [~nimbus-sym host# port#]
+ ~@body)))
+ (defn assert-authorized-user
+ ([servlet-request op]
+ (assert-authorized-user servlet-request op nil))
+ ([servlet-request op topology-conf]
+ (if http-creds-handler (.populateContext http-creds-handler
(ReqContext/context) servlet-request))
+ (if *UI-ACL-HANDLER*
+ (let [context (ReqContext/context)]
+ (if-not (.permit *UI-ACL-HANDLER* context op topology-conf)
+ (let [principal (.principal context)
+ user (if principal (.getName principal) "unknown")]
+ (throw (AuthorizationException.
+ (str "UI request '" op "' for '"
+ user "' user is not authorized")))))))))
+
(defn get-filled-stats
[summs]
(->> summs
@@@ -268,13 -282,11 +288,14 @@@
(bolt-comp-summs id))]
(sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start)
ret)))
- (defn worker-log-link [host port]
- (url-format "http://%s:%s/log?file=worker-%s.log"
- host (*STORM-CONF* LOGVIEWER-PORT) port))
+ (defn worker-log-link [host port topology-id]
+ (let [fname (logs-filename topology-id port)]
+ (url-format (str "http://%s:%s/log?file=%s")
+ host (*STORM-CONF* LOGVIEWER-PORT) fname)))
+(defn nimbus-log-link [host port]
+ (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF*
LOGVIEWER-PORT) port))
+
(defn compute-executor-capacity
[^ExecutorSummary e]
(let [stats (.get_stats e)
@@@ -503,23 -517,8 +526,23 @@@
"slotsUsed" used-slots
"slotsFree" free-slots
"executorsTotal" total-executors
- "tasksTotal" total-tasks})))
+ "tasksTotal" total-tasks })))
+(defn nimbus-summary
+ ([]
+ (let [leader-elector (zk-leader-elector *STORM-CONF*)
+ nimbus-hosts (.getAllNimbuses leader-elector)
+ no-op (.close leader-elector)]
+ (nimbus-summary nimbus-hosts)))
+ ([nimbuses]
+ {"nimbuses"
+ (for [^NimbusInfo n nimbuses]
+ {
+ "host" (.getHost n)
+ "port" (.getPort n)
+ "nimbusLogLink" (nimbus-log-link (.getHost n) (.getPort n))
+ "isLeader" (.isLeader n)})}))
+
(defn supervisor-summary
([]
(with-nimbus nimbus
@@@ -867,21 -880,28 +904,31 @@@
(GET "/api/v1/cluster/configuration" [& m]
(json-response (cluster-configuration)
(:callback m) :serialize-fn identity))
- (GET "/api/v1/cluster/summary" [& m]
- (json-response (cluster-summary) (:callback m)))
+ (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (assert-authorized-user servlet-request "getClusterInfo")
+ (json-response (cluster-summary user) (:callback m))))
+ (GET "/api/v1/nimbus/summary" [& m]
++ (assert-authorized-user servlet-request "getClusterInfo")
+ (json-response (nimbus-summary) (:callback m)))
- (GET "/api/v1/supervisor/summary" [& m]
+ (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} &
m]
+ (assert-authorized-user servlet-request "getClusterInfo")
(json-response (supervisor-summary) (:callback m)))
- (GET "/api/v1/topology/summary" [& m]
+ (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
+ (assert-authorized-user servlet-request "getClusterInfo")
(json-response (all-topologies-summary) (:callback m)))
- (GET "/api/v1/topology/:id" [id & m]
- (json-response (topology-page id (:window m) (check-include-sys?
(:sys m))) (:callback m)))
+ (GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (assert-authorized-user servlet-request "getTopology"
(topology-config id))
+ (json-response (topology-page id (:window m) (check-include-sys?
(:sys m)) user) (:callback m))))
(GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies
servlet-request]} id & m]
- (json-response (mk-visualization-data id (:window m)
(check-include-sys? (:sys m))) (:callback m)))
- (GET "/api/v1/topology/:id/component/:component" [id component & m]
- (json-response (component-page id component (:window m)
(check-include-sys? (:sys m))) (:callback m)))
- (POST "/api/v1/topology/:id/activate" [id]
+ (assert-authorized-user servlet-request "getTopology"
(topology-config id))
+ (json-response (mk-visualization-data id (:window m)
(check-include-sys? (:sys m))) (:callback m)))
+ (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies
servlet-request]} id component & m]
+ (let [user (.getUserName http-creds-handler servlet-request)]
+ (assert-authorized-user servlet-request "getTopology"
(topology-config id))
+ (json-response (component-page id component (:window m)
(check-include-sys? (:sys m)) user) (:callback m))))
+ (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies
servlet-request]} id]
(with-nimbus nimbus
(let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
name (.get_name tplg)]
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/zookeeper.clj
index a675990,eea71be..a7d43ac
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@@ -214,91 -211,3 +215,91 @@@
(defn shutdown-inprocess-zookeeper
[handle]
(.shutdown handle))
+
+(defn- to-NimbusInfo [^Participant participant]
+ (let
+ [id (if (clojure.string/blank? (.getId participant))
+ (throw (RuntimeException. "No nimbus leader participant host found,
have you started your nimbus hosts?"))
+ (.getId participant))
+ server (first (.split id ":"))
+ port (Integer/parseInt (last (.split id ":")))
+ is-leader (.isLeader participant)]
+ (NimbusInfo. server port is-leader)))
+
+(defn leader-latch-listener-impl
+ "Leader latch listener that will be invoked when we either gain or lose
leadership"
+ [conf zk leader-latch]
+ (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
+ STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
+ (reify LeaderLatchListener
+ (^void isLeader[this]
+ (log-message (str hostname "gained leadership, checking if it has all
the topology code locally."))
+ (let [active-topology-ids (set (get-children zk STORMS-ROOT false))
+ local-topology-ids (set (.list (File. (master-stormdist-root
conf))))
+ diff-topology (first (set-delta active-topology-ids
local-topology-ids))]
+ (log-message "active-topology-ids [" (clojure.string/join ","
active-topology-ids)
+ "] local-topology-ids [" (clojure.string/join ","
local-topology-ids)
+ "] diff-topology [" (clojure.string/join ","
diff-topology) "]")
+ (if (empty? diff-topology)
+ (log-message " Accepting leadership, all active topology found
localy.")
+ (do
+ (log-message " code for all active topologies not available
locally, giving up leadership.")
+ (.close leader-latch)))))
+ (^void notLeader[this]
+ (log-message (str hostname " lost leadership."))))))
+
+(defn zk-leader-elector
+ "Zookeeper Implementation of ILeaderElector."
+ [conf]
+ (let [servers (conf STORM-ZOOKEEPER-SERVERS)
+ zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf
STORM-ZOOKEEPER-PORT) :auth-conf conf)
+ leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
+ id (str (.getCanonicalHostName (InetAddress/getLocalHost)) ":" (conf
NIMBUS-THRIFT-PORT))
+ leader-latch (atom (LeaderLatch. zk leader-lock-path id))
+ leader-latch-listener (atom (leader-latch-listener-impl conf zk
@leader-latch))
+ ]
+ (reify ILeaderElector
+ (prepare [this conf]
+ (log-message "no-op for zookeeper implementation"))
+
+ (^void addToLeaderLockQueue [this]
+ (let [state (.getState @leader-latch)]
+ ;if this latch is already closed, we need to create new instance.
+ (if (.equals LeaderLatch$State/CLOSED state)
+ (do
+ (reset! leader-latch (LeaderLatch. zk leader-lock-path id))
+ (reset! leader-latch-listener (leader-latch-listener-impl conf zk
@leader-latch))
+ (log-message "LeaderLatch was in closed state. Resetted the
leaderLatch and listeners.")
+ ))
+
+ ;Only if the latch is not already started we invoke start.
+ (if (.equals LeaderLatch$State/LATENT state)
+ (do
+ (.addListener @leader-latch @leader-latch-listener)
+ (.start @leader-latch)
+ (log-message "Queued up for leader lock."))
+ (log-message "Node already in queue for leader lock."))))
+
+ (^void removeFromLeaderLockQueue [this]
+ ;Only started latches can be closed.
+ (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch))
+ (do
+ (.close @leader-latch)
+ (log-message "Removed from leader lock queue."))
+ (log-message "leader latch is not started so no
removeFromLeaderLockQueue needed.")))
+
+ (^boolean isLeader [this]
+ (.hasLeadership @leader-latch))
+
+ (^NimbusInfo getLeader [this]
+ (to-NimbusInfo (.getLeader @leader-latch)))
+
+ (^List getAllNimbuses [this]
+ (let [participants (.getParticipants @leader-latch)]
+ (map (fn [^Participant participant]
+ (to-NimbusInfo participant))
+ participants)))
+
+ (^void close[this]
+ (log-message "closing zookeeper connection of leader elector.")
- (.close zk)))))
++ (.close zk)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 7a140b6,c680354..16216ea
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -227,6 -283,36 +283,30 @@@ public class Config extends HashMap<Str
public static final Object STORM_ID_SCHEMA = String.class;
/**
+ * The number of times to retry a Nimbus operation.
+ */
+ public static final String
STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
+ public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
+
+ /**
+ * The starting interval between exponential backoff retries of a Nimbus
operation.
+ */
+ public static final String
STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis";
+ public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA =
Number.class;
+
+ /**
+ * The ceiling of the interval between retries of a client connect to
Nimbus operation.
+ */
+ public static final String
STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis";
+ public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA =
Number.class;
+
+ /**
- * The host that the master server is running on.
- */
- public static final String NIMBUS_HOST = "nimbus.host";
- public static final Object NIMBUS_HOST_SCHEMA = String.class;
-
- /**
+ * The Nimbus transport plug-in for Thrift client/server communication
+ */
+ public static final String NIMBUS_THRIFT_TRANSPORT_PLUGIN =
"nimbus.thrift.transport";
+ public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA =
String.class;
+
+ /**
* Which port the Thrift interface of Nimbus should run on. Clients should
* connect to this port to upload jars and submit topologies.
*/
@@@ -863,67 -1224,38 +1224,97 @@@
* to backtype.storm.scheduler.IsolationScheduler to make use of the
isolation scheduler.
*/
public static final String ISOLATION_SCHEDULER_MACHINES =
"isolation.scheduler.machines";
- public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA =
Map.class;
+ public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA =
ConfigValidation.MapOfStringToNumberValidator;
+
+ /**
+ * A map from the user name to the number of machines that should that
user is allowed to use. Set storm.scheduler
+ * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+ */
+ public static final String MULTITENANT_SCHEDULER_USER_POOLS =
"multitenant.scheduler.user.pools";
+ public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA =
ConfigValidation.MapOfStringToNumberValidator;
+
+ /**
+ * The number of machines that should be used by this topology to isolate
it from all others. Set storm.scheduler
+ * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+ */
+ public static final String TOPOLOGY_ISOLATED_MACHINES =
"topology.isolate.machines";
+ public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA =
Number.class;
+
+ /**
+ * HDFS information, used to get the delegation token on behalf of the
topology
+ * submitter user and renew the tokens. see {@link
backtype.storm.security.auth.hadoop.AutoHDFS}
+ * kerberos principal name with realm should be provided.
+ */
+ public static final Object TOPOLOGY_HDFS_PRINCIPAL = "topology.hdfs.user";
+ public static final Object TOPOLOGY_HDFS_PRINCIPAL_SCHEMA = String.class;
+
+ /**
+ * The HDFS URI to be used by AutoHDFS.java to grab the delegation token
on topology
+ * submitter user's behalf by the nimbus. If this is not provided the
default URI provided
+ * in the hdfs configuration files will be used.
+ */
+ public static final Object TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
+ public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class;
+ /**
+ * Which implementation of {@link backtype.storm.nimbus.ICodeDistributor}
should be used by storm for code
+ * distribution.
+ */
+ public static final String STORM_CODE_DISTRIBUTOR_CLASS =
"storm.codedistributor.class";
+ public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA =
String.class;
+
+ /**
+ * Which port the BitTorrent tracker should bind to.
+ */
+ public static final String BITTORRENT_PORT = "bittorrent.port";
+ public static final Object BITTORRENT_PORT_SCHEMA = Number.class;
+
+ /**
+ * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String BITTORRENT_MAX_UPLOAD_RATE =
"bittorrent.max.upload.rate";
+ public static final Object BITTORRENT_MAX_UPLOAD_RATE_SCHEMA =
Number.class;
+
+ /**
+ * Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
+ */
+ public static final String BITTORRENT_MAX_DOWNLOAD_RATE =
"bittorrent.max.download.rate";
+ public static final Object BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA =
Number.class;
+
+ /**
+ * Time in seconds that a supervisor should seed after completing a
topology torrent download.
+ * A value of 0 will disable seeding (download only). A value of -1
indicates that the supervisor
+ * should seed indefinitely (until the topology is killed).
+ */
+ public static final String SUPERVISOR_BITTORRENT_SEED_DURATION =
"supervisor.bittorrent.seed.duration";
+ public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA =
Number.class;
+
+ /**
+ * Minimum number of nimbus hosts where the code must be replicated
before leader nimbus
+ * is allowed to perform topology activation tasks like setting up
heartbeats/assignments
+ * and marking the topology as active. default is 0.
+ */
+ public static final String MIN_REPLICATION_COUNT =
"min.replication.count";
+ public static final Object MIN_REPLICATION_COUNT_SCHEMA = Number.class;
+
+ /**
+ * Maximum wait time for the nimbus host replication to achieve the
min.replication.count.
+ * Once this time is elapsed nimbus will go ahead and perform topology
activation tasks even
+ * if required min.replication.count is not achieved. The default is 0
seconds, a value of
+ * -1 indicates to wait for ever.
+ */
+ public static final String MAX_REPLICATION_WAIT_TIME_SEC =
"max.replication.wait.time.sec";
+ public static final Object MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA =
Number.class;
+
+
+ /**
+ * How often nimbus should wake the cleanup thread to clean the inbox.
+ * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
+ */
+ public static final String NIMBUS_CODE_SYNC_FREQ_SECS =
"nimbus.code.sync.freq.secs";
+ public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA =
ConfigValidation.IntegerValidator;
+
+
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index b96e5b5,273e232..da10a4f
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -17,19 -17,15 +17,20 @@@
*/
package backtype.storm.utils;
-import backtype.storm.Config;
++
+import backtype.storm.generated.Nimbus;
+import backtype.storm.nimbus.ILeaderElector;
+import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.security.auth.ThriftClient;
+ import backtype.storm.security.auth.ThriftConnectionType;
-import backtype.storm.generated.Nimbus;
-import java.util.Map;
+import clojure.lang.IFn;
+import clojure.lang.PersistentArrayMap;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import java.net.InetSocketAddress;
+import java.util.Map;
+
public class NimbusClient extends ThriftClient {
private Nimbus.Client _client;
private static final Logger LOG =
LoggerFactory.getLogger(NimbusClient.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index f483dc7,6e8458a..d392397
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -249,18 -255,8 +255,18 @@@ public class Utils
return ret;
}
- public static void downloadFromMaster(Map conf, String file, String
localFile) throws IOException, TException {
+ public static void downloadFromMaster(Map conf, String file, String
localFile) throws AuthorizationException, IOException, TException {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ download(client, file, localFile);
+ }
+
+ public static void downloadFromHost(Map conf, String file, String
localFile, String host, int port) throws IOException, TException {
+ //TODO : instead of null as last arg we probably need some real
timeout, check what is the default and if its ok to reuse.
+ NimbusClient client = new NimbusClient (conf, host, port, null);
+ download(client, file, localFile);
+ }
+
+ private static void download(NimbusClient client, String file, String
localFile) throws IOException, TException {
String id = client.getClient().beginFileDownload(file);
WritableByteChannel out = Channels.newChannel(new
FileOutputStream(localFile));
while(true) {
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/index.html
index 4e74494,6fac19a..a0bb3d8
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@@ -59,9 -58,8 +61,9 @@@ $(document).ready(function()
});
}
});
- var template = $.get("/templates/index-page-template.html");
+ var uiUser = $("#ui-user");
var clusterSummary = $("#cluster-summary");
+ var nimbusSummary = $("#nimbus-summary");
var topologySummary = $("#topology-summary");
var supervisorSummary = $("#supervisor-summary");
var config = $("#nimbus-configuration");
http://git-wip-us.apache.org/repos/asf/storm/blob/6b0da168/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------