STORM-615. Add REST API to upload topology.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1945f709
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1945f709
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1945f709

Branch: refs/heads/nimbus-ha-branch
Commit: 1945f709174bcd867c8412198c559bbb15e287d9
Parents: 117256b
Author: Sriharsha Chintalapani <[email protected]>
Authored: Thu Mar 12 13:58:31 2015 -0700
Committer: Sriharsha Chintalapani <[email protected]>
Committed: Tue Mar 31 17:21:33 2015 -0700

----------------------------------------------------------------------
 STORM-UI-REST-API.md                          | 31 ++++++++++++
 storm-core/src/clj/backtype/storm/ui/core.clj | 58 +++++++++++++++++++++-
 2 files changed, 87 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1945f709/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md
index 72b07d4..cdbf05e 100644
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@ -616,6 +616,37 @@ error response:
 }
 ```
 
+### /api/v1/uploadTopology (POST)
+
+uploads a topology.
+
+Caution: This api doesn't work in security mode.
+
+|Parameter |Value   |Description  |
+|----------|--------|-------------|
+|topologyConfig |String (required)| topology json config  |
+|topologyJar |String (required)| topology jar file |
+
+Sample topologyConfig json:
+```json
+{"topologyMainClass": "storm.starter.WordCountTopology", 
"topologyMainClassArgs": ["wordcount1"]}
+```
+
+Examples:
+
+```no-highlight
+curl  -i -b ~/cookiejar.txt -c ~/cookiejar.txt -X POST  
+-H 'x-csrf-token: 
ycit8Wi89ZdAOo9KKaka/Pvd0vnx8TZzP8xSDDSw8J8bTfyn4jz38VN4Xcb7CF6xigRzDLaGVHbrSj80'
  
+-F topologyConfig='{"topologyMainClass": "storm.starter.WordCountTopology", 
"topologyMainClassArgs": ["wordcount1"]}' 
+-F 
topologyJar=@examples/storm-starter/storm-starter-topologies-0.10.0-SNAPSHOT.jar
 
+http://localhost:8080/api/v1/uploadTopology
+```
+
+Sample Response:
+
+```json
+{"status":"success"}
+```
 
 ### /api/v1/topology/:id/activate (POST)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1945f709/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj 
b/storm-core/src/clj/backtype/storm/ui/core.clj
index c64f35d..0a797a3 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -16,7 +16,9 @@
 
 (ns backtype.storm.ui.core
   (:use compojure.core)
-  (:use ring.middleware.reload)
+  (:use [clojure.java.shell :only [sh]])
+  (:use ring.middleware.reload
+        ring.middleware.multipart-params)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
   (:use [backtype.storm config util log])
@@ -503,6 +505,39 @@
               (hashmap-to-persistent bolts))
        spout-comp-summs bolt-comp-summs window id))))
 
+(defn validate-tplg-submit-params [params]
+  (let [tplg-jar-file (params :topologyJar)
+        tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params 
:topologyConfig)))]
+    (cond
+     (nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
+     (nil? tplg-config) {:valid false :error "missing topology config"}
+     (nil? (tplg-config "topologyMainClass")) {:valid false :error 
"topologyMainClass missing in topologyConfig"}
+     :else {:valid true})))
+
+(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
+  (let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config 
"topologyMainClass")))
+        tplg-main-class-args (if (not-nil? tplg-config) (clojure.string/join " 
" (tplg-config "topologyMainClassArgs")))
+        tplg-jvm-opts (if (not-nil? tplg-config) (clojure.string/join " " 
(tplg-config "topologyJvmOpts")))
+        storm-home (System/getProperty "storm.home")
+        storm-conf-dir (str storm-home file-path-separator "conf")
+        storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) 
(*STORM-CONF* "storm.log.dir")
+                          (str storm-home file-path-separator "logs"))
+        storm-libs (str storm-home file-path-separator "lib" 
file-path-separator "*")
+        java-cmd (str (System/getProperty "java.home") file-path-separator 
"bin" file-path-separator "java")
+        storm-cmd (str storm-home file-path-separator "bin" 
file-path-separator "storm")
+        tplg-cmd-response (sh storm-cmd "jar" tplg-jar-file
+                              tplg-main-class
+                              tplg-main-class-args
+                              (if (not= user "unknown") (str "-c 
storm.doAsUser=" user) ""))]
+    (log-message "tplg-cmd-response " tplg-cmd-response)
+    (cond
+     (= (tplg-cmd-response :exit) 0) {"status" "success"}
+     (and (not= (tplg-cmd-response :exit) 0)
+          (not-nil? (re-find #"already exists on cluster" (tplg-cmd-response 
:err)))) {"status" "failed" "error" "Topology with the same name exists in 
cluster"}
+          (not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" 
(clojure.string/trim-newline (tplg-cmd-response :err))}
+          :else {"status" "success" "response" "topology deployed"}
+          )))
+
 (defn cluster-configuration []
   (with-nimbus nimbus
     (.getNimbusConf ^Nimbus$Client nimbus)))
@@ -982,7 +1017,25 @@
         (.killTopologyWithOpts nimbus name options)
         (log-message "Killing topology '" name "' with wait time: " wait-time 
" secs")))
     (json-response (topology-op-response id "kill") (m "callback")))
-
+  (POST "/api/v1/uploadTopology" [:as {:keys [cookies servlet-request]} id & 
params]
+        (assert-authorized-user servlet-request "submitTopology")
+        (let [valid-tplg (validate-tplg-submit-params params)
+              valid (valid-tplg :valid)
+              context (ReqContext/context)]
+          (if http-creds-handler (.populateContext http-creds-handler context 
servlet-request))
+          (if (= valid true)
+            (let [tplg-file-data (params :topologyJar)
+                  tplg-temp-file (tplg-file-data :tempfile)
+                  tplg-file-name (tplg-file-data :filename)
+                  tplg-jar-file (clojure.string/join [(.getParent 
tplg-temp-file) file-path-separator tplg-file-name])
+                  tplg-config (if (not-nil? (params :topologyConfig)) 
(from-json (params :topologyConfig)))
+                  principal (if (.isImpersonating context) (.realPrincipal 
context) (.principal context))
+                  user (if principal (.getName principal) "unknown")]
+              (.renameTo tplg-temp-file (File. tplg-jar-file))
+              (let [ret (run-tplg-submit-cmd tplg-jar-file tplg-config user)]
+                (json-response ret (params "callback"))))
+            (json-response {"status" "failed" "error" (valid-tplg :error)} 
(params "callback"))
+            )))
   (GET "/" [:as {cookies :cookies}]
        (resp/redirect "/index.html"))
   (route/resources "/")
@@ -1012,6 +1065,7 @@
 (def app
   (handler/site (-> main-routes
                     (wrap-json-params)
+                    (wrap-multipart-params)
                     (wrap-reload '[backtype.storm.ui.core])
                     (wrap-anti-forgery {:error-response csrf-error-response})
                     catch-errors)))

Reply via email to