STORM-615. Add REST API to upload topology.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1945f709 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1945f709 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1945f709 Branch: refs/heads/nimbus-ha-branch Commit: 1945f709174bcd867c8412198c559bbb15e287d9 Parents: 117256b Author: Sriharsha Chintalapani <[email protected]> Authored: Thu Mar 12 13:58:31 2015 -0700 Committer: Sriharsha Chintalapani <[email protected]> Committed: Tue Mar 31 17:21:33 2015 -0700 ---------------------------------------------------------------------- STORM-UI-REST-API.md | 31 ++++++++++++ storm-core/src/clj/backtype/storm/ui/core.clj | 58 +++++++++++++++++++++- 2 files changed, 87 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1945f709/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md index 72b07d4..cdbf05e 100644 --- a/STORM-UI-REST-API.md +++ b/STORM-UI-REST-API.md @@ -616,6 +616,37 @@ error response: } ``` +### /api/v1/uploadTopology (POST) + +uploads a topology. + +Caution: This api doesn't work in security mode. + +|Parameter |Value |Description | +|----------|--------|-------------| +|topologyConfig |String (required)| topology json config | +|topologyJar |String (required)| topology jar file | + +Sample topologyConfig json: +```json +{"topologyMainClass": "storm.starter.WordCountTopology", "topologyMainClassArgs": ["wordcount1"]} +``` + +Examples: + +```no-highlight +curl -i -b ~/cookiejar.txt -c ~/cookiejar.txt -X POST +-H 'x-csrf-token: ycit8Wi89ZdAOo9KKaka/Pvd0vnx8TZzP8xSDDSw8J8bTfyn4jz38VN4Xcb7CF6xigRzDLaGVHbrSj80' +-F topologyConfig='{"topologyMainClass": "storm.starter.WordCountTopology", "topologyMainClassArgs": ["wordcount1"]}' +-F topologyJar=@examples/storm-starter/storm-starter-topologies-0.10.0-SNAPSHOT.jar +http://localhost:8080/api/v1/uploadTopology +``` + +Sample Response: + +```json +{"status":"success"} +``` ### /api/v1/topology/:id/activate (POST) http://git-wip-us.apache.org/repos/asf/storm/blob/1945f709/storm-core/src/clj/backtype/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index c64f35d..0a797a3 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -16,7 +16,9 @@ (ns backtype.storm.ui.core (:use compojure.core) - (:use ring.middleware.reload) + (:use [clojure.java.shell :only [sh]]) + (:use ring.middleware.reload + ring.middleware.multipart-params) (:use [ring.middleware.json :only [wrap-json-params]]) (:use [hiccup core page-helpers]) (:use [backtype.storm config util log]) @@ -503,6 +505,39 @@ (hashmap-to-persistent bolts)) spout-comp-summs bolt-comp-summs window id)))) +(defn validate-tplg-submit-params [params] + (let [tplg-jar-file (params :topologyJar) + tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))] + (cond + (nil? tplg-jar-file) {:valid false :error "missing topology jar file"} + (nil? tplg-config) {:valid false :error "missing topology config"} + (nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"} + :else {:valid true}))) + +(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user] + (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass"))) + tplg-main-class-args (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyMainClassArgs"))) + tplg-jvm-opts (if (not-nil? tplg-config) (clojure.string/join " " (tplg-config "topologyJvmOpts"))) + storm-home (System/getProperty "storm.home") + storm-conf-dir (str storm-home file-path-separator "conf") + storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir") + (str storm-home file-path-separator "logs")) + storm-libs (str storm-home file-path-separator "lib" file-path-separator "*") + java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java") + storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm") + tplg-cmd-response (sh storm-cmd "jar" tplg-jar-file + tplg-main-class + tplg-main-class-args + (if (not= user "unknown") (str "-c storm.doAsUser=" user) ""))] + (log-message "tplg-cmd-response " tplg-cmd-response) + (cond + (= (tplg-cmd-response :exit) 0) {"status" "success"} + (and (not= (tplg-cmd-response :exit) 0) + (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"} + (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))} + :else {"status" "success" "response" "topology deployed"} + ))) + (defn cluster-configuration [] (with-nimbus nimbus (.getNimbusConf ^Nimbus$Client nimbus))) @@ -982,7 +1017,25 @@ (.killTopologyWithOpts nimbus name options) (log-message "Killing topology '" name "' with wait time: " wait-time " secs"))) (json-response (topology-op-response id "kill") (m "callback"))) - + (POST "/api/v1/uploadTopology" [:as {:keys [cookies servlet-request]} id & params] + (assert-authorized-user servlet-request "submitTopology") + (let [valid-tplg (validate-tplg-submit-params params) + valid (valid-tplg :valid) + context (ReqContext/context)] + (if http-creds-handler (.populateContext http-creds-handler context servlet-request)) + (if (= valid true) + (let [tplg-file-data (params :topologyJar) + tplg-temp-file (tplg-file-data :tempfile) + tplg-file-name (tplg-file-data :filename) + tplg-jar-file (clojure.string/join [(.getParent tplg-temp-file) file-path-separator tplg-file-name]) + tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig))) + principal (if (.isImpersonating context) (.realPrincipal context) (.principal context)) + user (if principal (.getName principal) "unknown")] + (.renameTo tplg-temp-file (File. tplg-jar-file)) + (let [ret (run-tplg-submit-cmd tplg-jar-file tplg-config user)] + (json-response ret (params "callback")))) + (json-response {"status" "failed" "error" (valid-tplg :error)} (params "callback")) + ))) (GET "/" [:as {cookies :cookies}] (resp/redirect "/index.html")) (route/resources "/") @@ -1012,6 +1065,7 @@ (def app (handler/site (-> main-routes (wrap-json-params) + (wrap-multipart-params) (wrap-reload '[backtype.storm.ui.core]) (wrap-anti-forgery {:error-response csrf-error-response}) catch-errors)))
