Merge remote-tracking branch 'upstream/master' into STORM-166

Conflicts:
        conf/defaults.yaml
        storm-core/src/clj/backtype/storm/cluster.clj
        storm-core/src/clj/backtype/storm/daemon/nimbus.clj
        storm-core/src/clj/backtype/storm/ui/core.clj
        storm-core/test/clj/backtype/storm/cluster_test.clj
        storm-core/test/clj/backtype/storm/security/auth/auth_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/57587182
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/57587182
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/57587182

Branch: refs/heads/0.11.x-branch
Commit: 575871822b116608f47a0a7a4cd3b4b17df9a672
Parents: f7205d2 bb8d48d
Author: Parth Brahmbhatt <[email protected]>
Authored: Thu Mar 19 15:04:43 2015 -0700
Committer: Parth Brahmbhatt <[email protected]>
Committed: Thu Mar 19 15:24:50 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md                                    |   21 +
 DEVELOPER.md                                    |    7 +
 README.markdown                                 |    2 +
 SECURITY.md                                     |   59 +-
 STORM-UI-REST-API.md                            |   12 +
 bin/storm                                       |  584 +--------
 bin/storm.py                                    |  543 ++++++++
 conf/defaults.yaml                              |    3 +
 conf/storm-env.sh                               |   24 +
 dev-tools/test-ns.py                            |   17 +
 docs/documentation/Clojure-DSL.md               |    4 +-
 docs/documentation/Command-line-client.md       |    2 +-
 docs/documentation/Common-patterns.md           |    6 +-
 docs/documentation/Concepts.md                  |   48 +-
 docs/documentation/Configuration.md             |    4 +-
 docs/documentation/Distributed-RPC.md           |    2 +-
 .../Guaranteeing-message-processing.md          |    6 +-
 docs/documentation/Hooks.md                     |    6 +-
 docs/documentation/Local-mode.md                |    4 +-
 docs/documentation/Powered-By.md                |  106 +-
 ...unning-topologies-on-a-production-cluster.md |    6 +-
 .../Serialization-(prior-to-0.6.0).md           |    4 +-
 docs/documentation/Serialization.md             |    2 +-
 docs/documentation/Structure-of-the-codebase.md |    8 +-
 docs/documentation/Transactional-topologies.md  |   18 +-
 docs/documentation/Tutorial.md                  |    8 +-
 ...nding-the-parallelism-of-a-Storm-topology.md |   16 +-
 external/README.md                              |   18 +
 external/storm-jdbc/README.md                   |   84 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   36 +-
 .../org/apache/storm/jdbc/common/Column.java    |    3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    |   52 +-
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |    2 +-
 .../storm/jdbc/trident/state/JdbcState.java     |   18 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   39 +-
 .../jdbc/topology/UserPersistanceTopology.java  |    2 +-
 .../ExponentialBackoffMsgRetryManager.java      |    2 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |    7 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |    2 +-
 .../storm/redis/trident/state/RedisState.java   |    2 +-
 storm-core/pom.xml                              |    3 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   60 +-
 storm-core/src/clj/backtype/storm/config.clj    |    7 +-
 storm-core/src/clj/backtype/storm/converter.clj |  201 +++
 .../src/clj/backtype/storm/daemon/common.clj    |   10 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |    4 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |    5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  150 ++-
 .../clj/backtype/storm/daemon/supervisor.clj    |   16 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   11 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    3 +-
 storm-core/src/clj/backtype/storm/stats.clj     |   78 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   32 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   42 +-
 storm-core/src/genthrift.sh                     |   13 +-
 storm-core/src/java_license_header.txt          |   17 +
 storm-core/src/jvm/backtype/storm/Config.java   |   40 +-
 .../jvm/backtype/storm/ConfigValidation.java    |    8 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   88 +-
 .../storm/coordination/BatchBoltExecutor.java   |    4 +-
 .../backtype/storm/generated/Assignment.java    |  983 ++++++++++++++
 .../storm/generated/ClusterWorkerHeartbeat.java |  673 ++++++++++
 .../backtype/storm/generated/ExecutorStats.java |  105 +-
 .../jvm/backtype/storm/generated/NodeInfo.java  |  556 ++++++++
 .../jvm/backtype/storm/generated/StormBase.java | 1211 ++++++++++++++++++
 .../storm/generated/SupervisorInfo.java         | 1182 +++++++++++++++++
 .../storm/generated/TopologyActionOptions.java  |  387 ++++++
 .../storm/generated/TopologyStatus.java         |   68 +
 .../backtype/storm/messaging/netty/Client.java  |   10 +-
 .../backtype/storm/messaging/netty/Context.java |   33 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |   19 +-
 .../storm/security/auth/ITransportPlugin.java   |    4 +-
 .../storm/security/auth/ReqContext.java         |   28 +-
 .../security/auth/SaslTransportPlugin.java      |    3 +-
 .../security/auth/SimpleTransportPlugin.java    |    5 +-
 .../storm/security/auth/TBackoffConnect.java    |    4 +-
 .../storm/security/auth/ThriftClient.java       |   12 +-
 .../authorizer/ImpersonationAuthorizer.java     |  154 +++
 .../auth/authorizer/SimpleACLAuthorizer.java    |   55 +-
 .../auth/digest/DigestSaslTransportPlugin.java  |    6 +-
 .../auth/digest/ServerCallbackHandler.java      |   21 +-
 .../kerberos/KerberosSaslTransportPlugin.java   |    9 +-
 .../auth/kerberos/ServerCallbackHandler.java    |   38 +-
 .../DefaultSerializationDelegate.java           |   11 +-
 .../GzipBridgeSerializationDelegate.java        |    7 +-
 .../GzipSerializationDelegate.java              |   10 +-
 .../serialization/SerializationDelegate.java    |    2 +-
 .../ThriftSerializationDelegate.java            |   52 +
 .../backtype/storm/topology/BoltDeclarer.java   |    4 +
 .../backtype/storm/topology/InputDeclarer.java  |  128 ++
 .../storm/topology/TopologyBuilder.java         |    6 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |    2 +-
 .../jvm/backtype/storm/utils/LocalState.java    |    4 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   16 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   82 +-
 .../backtype/storm/utils/VersionedStore.java    |    9 +-
 storm-core/src/jvm/storm/trident/Stream.java    |    2 +-
 .../src/jvm/storm/trident/TridentTopology.java  |    6 +-
 storm-core/src/py/storm/Nimbus-remote           |    0
 storm-core/src/py/storm/ttypes.py               |  827 +++++++++++-
 storm-core/src/py_license_header.txt            |   18 +
 storm-core/src/storm.thrift                     |   51 +
 .../templates/component-page-template.html      |    2 +-
 .../templates/topology-page-template.html       |    4 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   35 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    6 +-
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   16 +-
 .../backtype/storm/security/auth/auth_test.clj  |  146 ++-
 .../GzipBridgeSerializationDelegateTest.java    |    6 +-
 .../ThriftBridgeSerializationDelegateTest.java  |   60 +
 110 files changed, 8564 insertions(+), 1095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/STORM-UI-REST-API.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index 305b31c,78e89bb..49584f2
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -47,11 -47,10 +47,12 @@@ storm.auth.simple-white-list.users: [
  storm.auth.simple-acl.users: []
  storm.auth.simple-acl.users.commands: []
  storm.auth.simple-acl.admins: []
 +storm.meta.serialization.delegate: 
"backtype.storm.serialization.DefaultSerializationDelegate"
 +storm.codedistributor.class: 
"backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
+ storm.meta.serialization.delegate: 
"backtype.storm.serialization.ThriftSerializationDelegate"
  
  ### nimbus.* configs are for the master
 -nimbus.host: "localhost"
 +nimbus.seeds : ["localhost:6627"]
  nimbus.thrift.port: 6627
  nimbus.thrift.threads: 64
  nimbus.thrift.max_buffer_size: 1048576

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index c48e3c1,7987a30..333feec
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -15,13 -15,14 +15,15 @@@
  ;; limitations under the License.
  
  (ns backtype.storm.cluster
-   (:import [org.apache.zookeeper.data Stat ACL Id])
+   (:import [org.apache.zookeeper.data Stat ACL Id]
 -           [backtype.storm.generated SupervisorInfo Assignment StormBase 
ClusterWorkerHeartbeat ErrorInfo Credentials]
++           [backtype.storm.generated SupervisorInfo Assignment StormBase 
ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary]
+            [java.io Serializable])
    (:import [org.apache.zookeeper KeeperException 
KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
    (:import [backtype.storm.utils Utils])
    (:import [java.security MessageDigest])
    (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
 +  (:import [backtype.storm.nimbus NimbusInfo])
-   (:use [backtype.storm util log config])
+   (:use [backtype.storm util log config converter])
    (:require [backtype.storm [zookeeper :as zk]])
    (:require [backtype.storm.daemon [common :as common]]))
  
@@@ -348,25 -322,6 +350,25 @@@
            (swap! assignment-version-callback assoc storm-id callback))
          (get-version cluster-state (assignment-path storm-id) (not-nil? 
callback)))
  
 +      (code-distributor
 +        [this callback]
 +        (when callback
 +          (reset! code-distributor-callback callback))
 +        (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? 
callback)))
 +
 +      (nimbuses
 +        [this]
-         (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) 
false))
++        (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) 
false) NimbusSummary)
 +          (get-children cluster-state NIMBUSES-SUBTREE false)))
 +
 +      (add-nimbus-host!
 +        [this nimbus-id nimbus-summary]
 +        (set-ephemeral-node cluster-state (nimbus-path nimbus-id) 
(Utils/serialize nimbus-summary) acls))
 +
 +      (code-distributor-info
 +        [this storm-id]
 +        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children 
cluster-state (code-distributor-path storm-id) false)))
 +
        (active-storms
          [this]
          (get-children cluster-state STORMS-SUBTREE false))
@@@ -465,13 -428,9 +475,14 @@@
  
        (set-assignment!
          [this storm-id info]
-         (set-data cluster-state (assignment-path storm-id) (Utils/serialize 
info) acls))
+         (let [thrift-assignment (thriftify-assignment info)]
+           (set-data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) acls)))
  
 +      (setup-code-distributor!
 +        [this storm-id nimbusInfo]
 +        (mkdirs cluster-state (code-distributor-path storm-id) acls)
 +        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" 
(.toHostPortString nimbusInfo)) acls))
 +
        (remove-storm!
          [this storm-id]
          (delete-node cluster-state (assignment-path storm-id))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index eee0417,d1a1a59..8a2c0fb
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -91,9 -77,9 +91,10 @@@
  (defn nimbus-data [conf inimbus]
    (let [forced-scheduler (.getForcedScheduler inimbus)]
      {:conf conf
 +     :nimbus-host-port-info (NimbusInfo/fromConf conf)
       :inimbus inimbus
       :authorization-handler (mk-authorization-handler (conf 
NIMBUS-AUTHORIZER) conf)
+      :impersonation-authorization-handler (mk-authorization-handler (conf 
NIMBUS-IMPERSONATION-AUTHORIZER) conf)
       :submitted-count (atom 0)
       :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
                                                                         
(Utils/isZkAuthenticationConfiguredStormServer
@@@ -221,11 -195,11 +212,12 @@@
    ([nimbus storm-id event]
       (transition! nimbus storm-id event false))
    ([nimbus storm-id event error-on-no-transition?]
 -     (locking (:submit-lock nimbus)
 +    (is-leader nimbus)
 +    (locking (:submit-lock nimbus)
         (let [system-events #{:startup}
               [event & event-args] (if (keyword? event) [event] event)
-              status (topology-status nimbus storm-id)]
+              storm-base (-> nimbus :storm-cluster-state  (.storm-base 
storm-id nil))
+              status (:status storm-base)]
           ;; handles the case where event was scheduled but topology has been 
removed
           (if-not status
             (log-message "Cannot apply event " event " to " storm-id " because 
topology no longer exists")
@@@ -347,32 -323,9 +341,32 @@@
     (FileUtils/cleanDirectory (File. stormroot))
     (setup-jar conf tmp-jar-location stormroot)
     (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) 
(Utils/serialize topology))
-    (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) 
(Utils/serialize storm-conf))
+    (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) 
(Utils/javaSerialize storm-conf))
 +   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
     ))
  
 +(defn- wait-for-desired-code-replication [nimbus conf storm-id]
 +  (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 +        max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
 +        total-wait-time (atom 0)
 +        current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
 +  (if (:code-distributor nimbus)
 +    (while (and (> min-replication-count @current-replication-count)
 +             (or (= -1 max-replication-wait-time)
 +               (< @total-wait-time max-replication-wait-time)))
 +        (sleep-secs 1)
 +        (log-debug "waiting for desired replication to be achieved.
 +          min-replication-count = " min-replication-count  " 
max-replication-wait-time = " max-replication-wait-time
 +          "current-replication-count = " @current-replication-count " 
total-wait-time " @total-wait-time)
 +        (swap! total-wait-time inc)
 +        (reset! current-replication-count  (.getReplicationCount 
(:code-distributor nimbus) storm-id))))
 +  (if (< min-replication-count @current-replication-count)
 +    (log-message "desired replication count "  min-replication-count " 
achieved,
 +      current-replication-count" @current-replication-count)
 +    (log-message "desired replication count of "  min-replication-count " not 
achieved but we have hit the max wait time
 +      so moving on with replication count = " @current-replication-count)
 +    )))
 +
  (defn- read-storm-topology [conf storm-id]
    (let [stormroot (master-stormdist-root conf storm-id)]
      (Utils/deserialize
@@@ -708,10 -661,9 +702,10 @@@
  ;; only keep existing slots that satisfy one of those slots. for rest, 
reassign them across remaining slots
  ;; edge case for slots with no executor timeout but with supervisor 
timeout... just treat these as valid slots that can be reassigned to. worst 
comes to worse the executor will timeout and won't assign here next time around
  (defnk mk-assignments [nimbus :scratch-topology-id nil]
 -  (let [conf (:conf nimbus)
 +  (if (is-leader nimbus :throw-exception false)
 +    (let [conf (:conf nimbus)
          storm-cluster-state (:storm-cluster-state nimbus)
-         ^INimbus inimbus (:inimbus nimbus) 
+         ^INimbus inimbus (:inimbus nimbus)
          ;; read all the topologies
          topology-ids (.active-storms storm-cluster-state)
          topologies (into {} (for [tid topology-ids]
@@@ -775,14 -727,14 +769,14 @@@
      (->> new-assignments
            (map (fn [[topology-id assignment]]
              (let [existing-assignment (get existing-assignments topology-id)]
-               [topology-id (map to-worker-slot (newly-added-slots 
existing-assignment assignment))] 
+               [topology-id (map to-worker-slot (newly-added-slots 
existing-assignment assignment))]
                )))
            (into {})
 -          (.assignSlots inimbus topologies))
 -    ))
 +          (.assignSlots inimbus topologies)))
 +    (log-message "not a leader, skipping assignments")))
  
  (defn- start-storm [nimbus storm-name storm-id topology-initial-status]
-   {:pre [(#{:active :inactive} topology-initial-status)]}                
+   {:pre [(#{:active :inactive} topology-initial-status)]}
    (let [storm-cluster-state (:storm-cluster-state nimbus)
          conf (:conf nimbus)
          storm-conf (read-storm-conf conf storm-id)
@@@ -1049,24 -1009,9 +1058,23 @@@
    (let [nimbus (nimbus-data conf inimbus)
         principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
      (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) 
conf)
 +
 +    ;add to nimbuses
-     (.add-nimbus-host! (:storm-cluster-state nimbus)
-       (.toHostPortString (:nimbus-host-port-info nimbus))
-       {
-         :host (.getHost (:nimbus-host-port-info nimbus))
-         :port (.getPort (:nimbus-host-port-info nimbus))
-         :start-time-secs (current-time-secs)
-         :version (str (VersionInfo/getVersion))
-         })
++    (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString 
(:nimbus-host-port-info nimbus))
++      (NimbusSummary.
++        (.getHost (:nimbus-host-port-info nimbus))
++        (.getPort (:nimbus-host-port-info nimbus))
++        (current-time-secs)
++        false ;is-leader
++        (str (VersionInfo/getVersion))))
 +
 +    (.addToLeaderLockQueue (:leader-elector nimbus))
      (cleanup-corrupt-topologies! nimbus)
 -    (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 -      (transition! nimbus storm-id :startup))
 +    ;register call back for code-distributor
 +    (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf 
nimbus)))
 +    (when (is-leader nimbus :throw-exception false)
 +      (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 +        (transition! nimbus storm-id :startup)))
      (schedule-recurring (:timer nimbus)
                          0
                          (conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -1317,14 -1253,8 +1325,16 @@@
                                                                  (count 
(:used-ports info))
                                                                  id )
                                              ))
 -              nimbus-uptime ((:uptime nimbus))
                bases (topology-bases storm-cluster-state)
 +              nimbuses (.nimbuses storm-cluster-state)
-               nimbuses (map #(NimbusSummary. (:host %1) (:port %1) 
(time-delta (:start-time-secs %1))
-                                (let [leader (.getLeader (:leader-elector 
nimbus))]
-                                  (and (= (.getHost leader) (:host %1)) (= 
(.getPort leader) (:port %1))))
-                                (:version %1))
-                          nimbuses
-                          )
++
++              ;;update the isLeader field for each nimbus summary
++              _ (let [leader (.getLeader (:leader-elector nimbus))
++                      leader-host (.getHost leader)
++                      leader-port (.getPort leader)]
++                  (doseq [nimbus-summary nimbuses]
++                    (.set_isLeader nimbus-summary (and (= leader-host 
(.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
++
                topology-summaries (dofor [[id base] bases :when base]
                                          (let [assignment (.assignment-info 
storm-cluster-state id nil)
                                                  topo-summ (TopologySummary. id

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d315925,bc8b999..4fc219e
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -376,7 -377,7 +382,7 @@@
              master-code-dir (if (contains? storm-code-map :data) 
(storm-code-map :data))
              stormroot (supervisor-stormdist-root conf storm-id)]
          (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. 
stormroot)) (nil? master-code-dir))
--          (download-storm-code conf storm-id master-code-dir download-lock))
++          (download-storm-code conf storm-id master-code-dir supervisor 
download-lock))
          ))
  
      (wait-for-workers-launch
@@@ -725,7 -728,7 +731,7 @@@
         first ))
  
  (defmethod download-storm-code
--    :local [conf storm-id master-code-dir download-lock]
++    :local [conf storm-id master-code-dir supervisor download-lock]
      (let [stormroot (supervisor-stormdist-root conf storm-id)]
        (locking download-lock
              (FileUtils/copyDirectory (File. master-code-dir) (File. 
stormroot))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/thrift.clj
index 474ea67,6445f46..8f4c659
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@@ -20,9 -22,9 +22,10 @@@
              StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
              ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
              GlobalStreamId ComponentObject ComponentObject$_Fields
-             ShellComponent])
+             ShellComponent SupervisorInfo])
    (:import [backtype.storm.utils Utils NimbusClient])
    (:import [backtype.storm Constants])
++  (:import [backtype.storm.security.auth ReqContext])
    (:import [backtype.storm.grouping CustomStreamGrouping])
    (:import [backtype.storm.topology TopologyBuilder])
    (:import [backtype.storm.clojure RichShellBolt RichShellSpout])
@@@ -84,13 -88,17 +89,15 @@@
  (defmacro with-configured-nimbus-connection
    [client-sym & body]
    `(let [conf# (read-storm-config)
-          nimbusClient# (NimbusClient/getConfiguredClient conf#)
 -         host# (conf# NIMBUS-HOST)
 -         port# (conf# NIMBUS-THRIFT-PORT)]
 -    (with-nimbus-connection [~client-sym host# port#]
 -      ~@body)))
++         context# (ReqContext/context)
++         user# (if (.principal context#) (.getName (.principal context#)))
++         nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
 +         ~client-sym (.getClient nimbusClient#)
 +         conn# (.transport nimbusClient#)
 +         ]
 +     (try
 +       ~@body
 +     (finally (.close conn#)))))
  
  (defn direct-output-fields
    [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 0f93f23,c64f35d..553434e
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -931,18 -929,19 +947,19 @@@
    (GET "/api/v1/token" [ & m]
         (json-response (format "{\"antiForgeryToken\": \"%s\"}" 
*anti-forgery-token*) (:callback m) :serialize-fn identity))
    (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies 
servlet-request]} id & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "activate" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
                        (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
              name (.get_name tplg)]
-         (assert-authorized-user servlet-request "activate" (topology-config 
id))
          (.activate nimbus name)
          (log-message "Activating topology '" name "'")))
 -    (json-response (topology-op-response id "deactivate") (m "callback")))
 +    (json-response (topology-op-response id "activate") (m "callback")))
    (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies 
servlet-request]} id & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "deactivate" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
@@@ -953,7 -951,8 +969,8 @@@
          (log-message "Deactivating topology '" name "'")))
      (json-response (topology-op-response id "deactivate") (m "callback")))
    (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies 
servlet-request]} id wait-time & m]
 +    (thrift/with-configured-nimbus-connection nimbus
+     (assert-authorized-user servlet-request "rebalance" (topology-config id))
 -    (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))
@@@ -972,7 -970,8 +988,8 @@@
          (log-message "Rebalancing topology '" name "' with wait time: " 
wait-time " secs")))
      (json-response (topology-op-response id "rebalance") (m "callback")))
    (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies 
servlet-request]} id wait-time & m]
+     (assert-authorized-user servlet-request "killTopology" (topology-config 
id))
 -    (with-nimbus nimbus
 +    (thrift/with-configured-nimbus-connection nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index 39d3895,b171353..071d2b6
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@@ -40,31 -30,23 +40,36 @@@ public class NimbusClient extends Thrif
      private Nimbus.Client _client;
      private static final Logger LOG = 
LoggerFactory.getLogger(NimbusClient.class);
  
+ 
      public static NimbusClient getConfiguredClient(Map conf) {
 -        try {
 -            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 -            return new NimbusClient(conf, nimbusHost);
 -        } catch (TTransportException ex) {
 -            throw new RuntimeException(ex);
 -        }
++        return getConfiguredClientAs(conf, null);
+     }
+ 
+     public static NimbusClient getConfiguredClientAs(Map conf, String asUser) 
{
 -        try {
 -            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 -            return new NimbusClient(conf, nimbusHost, null, null, asUser);
 -        } catch (TTransportException ex) {
 -            throw new RuntimeException(ex);
 +        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
 +        for(String seed : seeds) {
 +            String[] split = seed.split(":");
 +            String host = split[0];
 +            int port = Integer.parseInt(split[1]);
 +            try {
 +                NimbusClient client = new NimbusClient(conf,host,port);
 +                ClusterSummary clusterInfo = 
client.getClient().getClusterInfo();
 +                List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
 +                if(nimbuses != null) {
 +                    for(NimbusSummary nimbusSummary : nimbuses) {
 +                        if(nimbusSummary.is_isLeader()) {
-                             return new NimbusClient(conf, 
nimbusSummary.get_host(), nimbusSummary.get_port());
++                            return new NimbusClient(conf, 
nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
 +                        }
 +                    }
 +                }
 +                throw new RuntimeException("Found nimbuses " + nimbuses + " 
none of which is elected as leader, please try " +
 +                        "again after some time.");
 +            } catch (Exception e) {
 +                LOG.warn("Ignoring exception while trying to get leader 
nimbus info from " + seed, e);
 +            }
          }
 +        throw new RuntimeException("Could not find leader nimbus from seed 
hosts " + seeds +". " +
 +                "Did you specify a valid list of nimbus host:port for config 
" + Config.NIMBUS_SEEDS);
      }
  
      public NimbusClient(Map conf, String host, int port) throws 
TTransportException {

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/storm.thrift
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/cluster_test.clj
index 85aaf3b,98eae68..ffc72af
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@@ -169,12 -168,10 +169,14 @@@
  (deftest test-storm-cluster-state-basics
    (with-inprocess-zookeeper zk-port
      (let [state (mk-storm-state zk-port)
-           assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
-           assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
+           assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {})
+           assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {})
 +          nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
 +          nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
-           base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "")
-           base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")]
++          nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (current-time-secs) 
false "v1")
++          nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (current-time-secs) 
false "v2")
+           base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil)
+           base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil)]
        (is (= [] (.assignments state nil)))
        (.set-assignment! state "storm1" assignment1)
        (is (= assignment1 (.assignment-info state "storm1" nil)))
@@@ -204,21 -201,6 +206,21 @@@
        (.set-credentials! state "storm1" {"b" "b"} {})
        (is (= {"b" "b"} (.credentials state "storm1" nil)))
  
 +      (is (= [] (.code-distributor state nil)))
 +      (.setup-code-distributor! state "storm1" nimbusInfo1)
 +      (is (= ["storm1"] (.code-distributor state nil)))
 +      (is (= [nimbusInfo1] (.code-distributor-info state "storm1")))
 +      (.setup-code-distributor! state "storm1" nimbusInfo2)
 +      (is (= #{nimbusInfo1 nimbusInfo2} (set (.code-distributor-info state 
"storm1"))))
 +      (.remove-storm! state "storm1")
 +      (is (= [] (.code-distributor state nil)))
 +
 +      (is (= [] (.nimbuses state)))
-       (.add-nimbus-host! state "host:port" nimbusInfo1)
-       (is (= [nimbusInfo1] (.nimbuses state)))
-       (.add-nimbus-host! state "host1:port" nimbusInfo2)
-       (is (= #{nimbusInfo1 nimbusInfo2} (set (.nimbuses state))))
++      (.add-nimbus-host! state "nimbus1:port" nimbusSummary1)
++      (is (= [nimbusSummary1] (.nimbuses state)))
++      (.add-nimbus-host! state "nimbus2:port" nimbusSummary2)
++      (is (= #{nimbusSummary1 nimbusSummary2} (set (.nimbuses state))))
 +
        ;; TODO add tests for task info and task heartbeat setting and getting
        (.disconnect state)
        )))

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57587182/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index 416dddf,ddd5e03..cb4ccc8
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@@ -117,11 -119,12 +119,11 @@@
           (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
    ([conf inimbus]
       (dummy-service-handler conf inimbus nil)))
-      
  
- (defn launch-server [server-port login-cfg aznClass transportPluginClass 
serverConf] 
+ 
+ (defn launch-server [server-port login-cfg aznClass transportPluginClass 
serverConf]
    (let [conf1 (merge (read-storm-config)
-                      {NIMBUS-AUTHORIZER aznClass 
+                      {NIMBUS-AUTHORIZER aznClass
 -                      NIMBUS-HOST "localhost"
                        NIMBUS-THRIFT-PORT server-port
                        STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
          conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" 
login-cfg}) conf1)

Reply via email to