http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index d659d57..e066269 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -14,19 +14,21 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns backtype.storm.daemon.supervisor - (:import [java.io OutputStreamWriter BufferedWriter IOException]) + (:import [java.io File IOException FileOutputStream]) (:import [backtype.storm.scheduler ISupervisor] [backtype.storm.utils LocalState Time Utils] [backtype.storm.daemon Shutdownable] [backtype.storm Constants] [java.net JarURLConnection] [java.net URI] - [org.apache.commons.io FileUtils] - [java.io File]) + [org.apache.commons.io FileUtils]) (:use [backtype.storm config util log timer local-state]) + (:import [backtype.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [backtype.storm.utils VersionInfo]) + (:import [java.nio.file Files StandardCopyOption]) (:import [backtype.storm Config]) (:import [backtype.storm.generated WorkerResources ProfileAction]) + (:import [backtype.storm.localizer LocalResource]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.command [healthcheck :as healthcheck]]) (:require [backtype.storm.daemon [worker :as worker]] @@ -44,7 +46,6 @@ (defmulti download-storm-code cluster-mode) (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor)))) -(defmulti mk-code-distributor cluster-mode) (defprotocol SupervisorDaemon (get-id [this]) @@ -238,20 +239,21 @@ (defn- rmr-as-user "Launches a process owned by the given user that deletes the given path recursively. Throws RuntimeException if the directory is not removed." - [conf id user path] - (worker-launcher-and-wait conf - user - ["rmr" path] - :log-prefix (str "rmr " id)) - (if (exists-file? path) - (throw (RuntimeException. (str path " was not deleted"))))) - -(defn try-cleanup-worker [conf id user] + [conf id path] + (let [user (Utils/getFileOwner path)] + (worker-launcher-and-wait conf + user + ["rmr" path] + :log-prefix (str "rmr " id)) + (if (exists-file? path) + (throw (RuntimeException. (str path " was not deleted")))))) + +(defn try-cleanup-worker [conf id] (try (if (.exists (File. (worker-root conf id))) (do (if (conf SUPERVISOR-RUN-WORKER-AS-USER) - (rmr-as-user conf id user (worker-root conf id)) + (rmr-as-user conf id (worker-root conf id)) (do (rmr (worker-heartbeats-root conf id)) ;; this avoids a race condition with worker or subprocess writing pid around same time @@ -290,11 +292,11 @@ (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid)) (force-kill-process pid)) (if as-user - (rmr-as-user conf id user (worker-pid-path conf id pid)) + (rmr-as-user conf id (worker-pid-path conf id pid)) (try (rmpath (worker-pid-path conf id pid)) (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory - (try-cleanup-worker conf id user)) + (try-cleanup-worker conf id)) (log-message "Shut down " (:supervisor-id supervisor) ":" id)) (def SUPERVISOR-ZK-ACLS @@ -326,16 +328,62 @@ (log-error t "Error when processing event") (exit-process! 20 "Error when processing an event") )) + :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer + [t] + (log-error t "Error when processing event") + (exit-process! 20 "Error when processing a event")) + :timer-name "blob-update-timer") + :localizer (Utils/createLocalizer conf (supervisor-local-dir conf)) :assignment-versions (atom {}) :sync-retry (atom 0) - :code-distributor (mk-code-distributor conf) :download-lock (Object.) :stormid->profiler-actions (atom {}) }) +(defn required-topo-files-exist? + [conf storm-id] + (let [stormroot (supervisor-stormdist-root conf storm-id) + stormjarpath (supervisor-stormjar-path stormroot) + stormcodepath (supervisor-stormcode-path stormroot) + stormconfpath (supervisor-stormconf-path stormroot)] + (and (every? exists-file? [stormroot stormconfpath stormcodepath]) + (or (local-mode? conf) + (exists-file? stormjarpath))))) + +(defn get-worker-assignment-helper-msg + [assignment supervisor port id] + (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port " + port " with id " id)) + +(defn get-valid-new-worker-ids + [conf supervisor reassign-executors new-worker-ids] + (into {} + (remove nil? + (dofor [[port assignment] reassign-executors] + (let [id (new-worker-ids port) + storm-id (:storm-id assignment) + ^WorkerResources resources (:resources assignment) + mem-onheap (.get_mem_on_heap resources)] + ;; This condition checks for required files exist before launching the worker + (if (required-topo-files-exist? conf storm-id) + (do + (log-message "Launching worker with assignment " + (get-worker-assignment-helper-msg assignment supervisor port id)) + (local-mkdirs (worker-pids-root conf id)) + (local-mkdirs (worker-heartbeats-root conf id)) + (launch-worker supervisor + (:storm-id assignment) + port + id + mem-onheap) + [id port]) + (do + (log-message "Missing topology storm code, so can't launch worker with assignment " + (get-worker-assignment-helper-msg assignment supervisor port id)) + nil))))))) + (defn sync-processes [supervisor] (let [conf (:conf supervisor) - download-lock (:download-lock supervisor) ^LocalState local-state (:local-state supervisor) storm-cluster-state (:storm-cluster-state supervisor) assigned-executors (defaulted (ls-local-assignments local-state) {}) @@ -349,8 +397,7 @@ new-worker-ids (into {} (for [port (keys reassign-executors)] - [port (uuid)])) - ] + [port (uuid)]))] ;; 1. to kill are those in allocated that are dead or disallowed ;; 2. kill the ones that should be dead ;; - read pids, kill -9 and individually remove file @@ -371,67 +418,14 @@ ". Current supervisor time: " now ". State: " state ", Heartbeat: " (pr-str heartbeat)) - (shutdown-worker supervisor id) - (if (:code-distributor supervisor) - (.cleanup (:code-distributor supervisor) id)) - )) - - (doseq [id (vals new-worker-ids)] - (local-mkdirs (worker-pids-root conf id)) - (local-mkdirs (worker-heartbeats-root conf id))) - (ls-approved-workers! local-state - (merge - (select-keys (ls-approved-workers local-state) - (keys keepers)) - (zipmap (vals new-worker-ids) (keys new-worker-ids)) - )) - - ;; check storm topology code dir exists before launching workers - (doseq [[port assignment] reassign-executors] - (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - storm-id (:storm-id assignment) - cached-assignment-info @(:assignment-versions supervisor) - assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id )) - (get cached-assignment-info storm-id) - (.assignment-info-with-version storm-cluster-state storm-id nil)) - storm-code-map (read-storm-code-locations assignment-info) - master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data)) - stormroot (supervisor-stormdist-root conf storm-id)] - (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir)) - (download-storm-code conf storm-id master-code-dir supervisor download-lock)) - )) - - (wait-for-workers-launch - conf - (dofor [[port assignment] reassign-executors] - (let [id (new-worker-ids port) - storm-id (:storm-id assignment) - ^WorkerResources resources (:resources assignment) - mem-onheap (.get_mem_on_heap resources)] - (try - (log-message "Launching worker with assignment " - (pr-str assignment) - " for this supervisor " - (:supervisor-id supervisor) - " on port " - port - " with id " - id - ) - (launch-worker supervisor - (:storm-id assignment) - port - id - mem-onheap) - (mark! supervisor:num-workers-launched) - (catch java.io.FileNotFoundException e - (log-message "Unable to launch worker due to " - (.getMessage e))) - (catch java.io.IOException e - (log-message "Unable to launch worker due to " - (.getMessage e)))) - id))) - )) + (shutdown-worker supervisor id))) + (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)] + (ls-approved-workers! local-state + (merge + (select-keys (ls-approved-workers local-state) + (keys keepers)) + valid-new-worker-ids)) + (wait-for-workers-launch conf (keys valid-new-worker-ids))))) (defn assigned-storm-ids-from-port-assignments [assignment] (->> assignment @@ -454,10 +448,80 @@ (shutdown-worker supervisor id)) )) +(defn get-blob-localname + "Given the blob information either gets the localname field if it exists, + else routines the default value passed in." + [blob-info defaultValue] + (or (get blob-info "localname") defaultValue)) + +(defn should-uncompress-blob? + "Given the blob information returns the value of the uncompress field, handling it either being + a string or a boolean value, or if it's not specified then returns false" + [blob-info] + (Boolean. (get blob-info "uncompress"))) + +(defn remove-blob-references + "Remove a reference to a blob when its no longer needed." + [localizer storm-id conf] + (let [storm-conf (read-supervisor-storm-conf conf storm-id) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + user (storm-conf TOPOLOGY-SUBMITTER-USER) + topo-name (storm-conf TOPOLOGY-NAME)] + (if blobstore-map + (doseq [[k, v] blobstore-map] + (.removeBlobReference localizer + k + user + topo-name + (should-uncompress-blob? v)))))) + +(defn blobstore-map-to-localresources + "Returns a list of LocalResources based on the blobstore-map passed in." + [blobstore-map] + (if blobstore-map + (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v))) + ())) + +(defn add-blob-references + "For each of the downloaded topologies, adds references to the blobs that the topologies are + using. This is used to reconstruct the cache on restart." + [localizer storm-id conf] + (let [storm-conf (read-supervisor-storm-conf conf storm-id) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + user (storm-conf TOPOLOGY-SUBMITTER-USER) + topo-name (storm-conf TOPOLOGY-NAME) + localresources (blobstore-map-to-localresources blobstore-map)] + (if blobstore-map + (.addReferences localizer localresources user topo-name)))) + +(defn rm-topo-files + [conf storm-id localizer rm-blob-refs?] + (let [path (supervisor-stormdist-root conf storm-id)] + (try + (if rm-blob-refs? + (remove-blob-references localizer storm-id conf)) + (if (conf SUPERVISOR-RUN-WORKER-AS-USER) + (rmr-as-user conf storm-id path) + (rmr (supervisor-stormdist-root conf storm-id))) + (catch Exception e + (log-message e (str "Exception removing: " storm-id)))))) + +(defn verify-downloaded-files + "Check for the files exists to avoid supervisor crashing + Also makes sure there is no necessity for locking" + [conf localizer assigned-storm-ids all-downloaded-storm-ids] + (remove nil? + (into #{} + (for [storm-id all-downloaded-storm-ids + :when (contains? assigned-storm-ids storm-id)] + (when-not (required-topo-files-exist? conf storm-id) + (log-debug "Files not present in topology directory") + (rm-topo-files conf storm-id localizer false) + storm-id))))) + (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] (fn this [] (let [conf (:conf supervisor) - download-lock (:download-lock supervisor) storm-cluster-state (:storm-cluster-state supervisor) ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) @@ -468,7 +532,7 @@ versions :versions} (assignments-snapshot storm-cluster-state sync-callback assignment-versions) storm-code-map (read-storm-code-locations assignments-snapshot) - downloaded-storm-ids (set (read-downloaded-storm-ids conf)) + all-downloaded-storm-ids (set (read-downloaded-storm-ids conf)) existing-assignment (ls-local-assignments local-state) all-assignment (read-assignments assignments-snapshot (:assignment-id supervisor) @@ -476,14 +540,20 @@ (:sync-retry supervisor)) new-assignment (->> all-assignment (filter-key #(.confirmAssigned isupervisor %))) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] + assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) + localizer (:localizer supervisor) + checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids)) + downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)] + (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) - (log-debug "Downloaded storm ids: " downloaded-storm-ids) (log-debug "All assignment: " all-assignment) (log-debug "New assignment: " new-assignment) - (log-debug "Storm Ids Profiler Actions" storm-id->profiler-actions) - + (log-debug "Assigned Storm Ids " assigned-storm-ids) + (log-debug "All Downloaded Ids " all-downloaded-storm-ids) + (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids) + (log-debug "Downloaded Ids " downloaded-storm-ids) + (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions) ;; download code first ;; This might take awhile ;; - should this be done separately from usual monitoring? @@ -491,7 +561,9 @@ (doseq [[storm-id master-code-dir] storm-code-map] (when (and (not (downloaded-storm-ids storm-id)) (assigned-storm-ids storm-id)) - (download-storm-code conf storm-id master-code-dir supervisor download-lock))) + (log-message "Downloading code for storm id " storm-id) + (download-storm-code conf storm-id master-code-dir localizer) + (log-message "Finished downloading code for storm id " storm-id))) (log-debug "Writing new assignment " (pr-str new-assignment)) @@ -510,22 +582,52 @@ ;; synchronize-supervisor doesn't try to launch workers for which the ;; resources don't exist (if on-windows? (shutdown-disallowed-workers supervisor)) - (doseq [storm-id downloaded-storm-ids] + (doseq [storm-id all-downloaded-storm-ids] (when-not (storm-code-map storm-id) (log-message "Removing code for storm id " storm-id) - (try - (rmr (supervisor-stormdist-root conf storm-id)) - (catch Exception e (log-message (.getMessage e)))) - )) - (.add processes-event-manager sync-processes) - ))) + (rm-topo-files conf storm-id localizer true))) + (.add processes-event-manager sync-processes)))) (defn mk-supervisor-capacities [conf] {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB)) Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))}) +(defn update-blobs-for-topology! + "Update each blob listed in the topology configuration if the latest version of the blob + has not been downloaded." + [conf storm-id localizer] + (let [storm-conf (read-supervisor-storm-conf conf storm-id) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + user (storm-conf TOPOLOGY-SUBMITTER-USER) + localresources (blobstore-map-to-localresources blobstore-map)] + (try + (.updateBlobs localizer localresources user) + (catch AuthorizationException authExp + (log-error authExp)) + (catch KeyNotFoundException knf + (log-error knf))))) + +(defn update-blobs-for-all-topologies-fn + "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned + to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically + by a timer, created elsewhere." + [supervisor] + (fn [] + (try + (let [conf (:conf supervisor) + downloaded-storm-ids (set (read-downloaded-storm-ids conf)) + new-assignment @(:curr-assignment supervisor) + assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] + (doseq [topology-id downloaded-storm-ids] + (let [storm-root (supervisor-stormdist-root conf topology-id)] + (when (assigned-storm-ids topology-id) + (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root) + (update-blobs-for-topology! conf topology-id (:localizer supervisor)))))) + (catch Exception e + (log-error e "Error updating blobs, will retry again later"))))) + (defn jvm-cmd [cmd] (let [java-home (.get (System/getenv) "JAVA_HOME")] (if (nil? java-home) @@ -650,6 +752,8 @@ [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] sync-processes (partial sync-processes supervisor) synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) + synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) + downloaded-storm-ids (set (read-downloaded-storm-ids conf)) run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) heartbeat-fn (fn [] (.supervisor-heartbeat! (:storm-cluster-state supervisor) @@ -671,6 +775,12 @@ 0 (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) + (doseq [storm-id downloaded-storm-ids] + (add-blob-references (:localizer supervisor) storm-id + conf)) + ;; do this after adding the references so we don't try to clean things being used + (.startCleaner (:localizer supervisor)) + (when (conf SUPERVISOR-ENABLE) ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up ;; to date even if callbacks don't all work exactly right @@ -679,6 +789,13 @@ 0 (conf SUPERVISOR-MONITOR-FREQUENCY-SECS) (fn [] (.add processes-event-manager sync-processes))) + + ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds + (schedule-recurring (:blob-update-timer supervisor) + 30 + 30 + (fn [] (.add event-manager synchronize-blobs-fn))) + (schedule-recurring (:event-timer supervisor) (* 60 5) (* 60 5) @@ -689,6 +806,7 @@ (doseq [id ids] (shutdown-worker supervisor id)) (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))) + ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds (schedule-recurring (:event-timer supervisor) 30 @@ -702,8 +820,10 @@ (reset! (:active supervisor) false) (cancel-timer (:heartbeat-timer supervisor)) (cancel-timer (:event-timer supervisor)) + (cancel-timer (:blob-update-timer supervisor)) (.shutdown event-manager) (.shutdown processes-event-manager) + (.shutdown (:localizer supervisor)) (.disconnect (:storm-cluster-state supervisor))) SupervisorDaemon (get-conf [this] @@ -728,29 +848,92 @@ (.shutdown supervisor) ) -(defn setup-storm-code-dir [conf storm-conf dir] +(defn setup-storm-code-dir + [conf storm-conf dir] (if (conf SUPERVISOR-RUN-WORKER-AS-USER) (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir)))) +(defn setup-blob-permission + [conf storm-conf path] + (if (conf SUPERVISOR-RUN-WORKER-AS-USER) + (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path)))) + +(defn download-blobs-for-topology! + "Download all blobs listed in the topology configuration for a given topology." + [conf stormconf-path localizer tmproot] + (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + user (storm-conf TOPOLOGY-SUBMITTER-USER) + topo-name (storm-conf TOPOLOGY-NAME) + user-dir (.getLocalUserFileCacheDir localizer user) + localresources (blobstore-map-to-localresources blobstore-map)] + (when localresources + (when-not (.exists user-dir) + (FileUtils/forceMkdir user-dir) + (setup-blob-permission conf storm-conf (.toString user-dir))) + (try + (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)] + (setup-blob-permission conf storm-conf (.toString user-dir)) + (doseq [local-rsrc localized-resources] + (let [rsrc-file-path (File. (.getFilePath local-rsrc)) + key-name (.getName rsrc-file-path) + blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc))) + symlink-name (get-blob-localname (get blobstore-map key-name) key-name)] + (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name + blob-symlink-target-name)))) + (catch AuthorizationException authExp + (log-error authExp)) + (catch KeyNotFoundException knf + (log-error knf)))))) + +(defn get-blob-file-names + [blobstore-map] + (if blobstore-map + (for [[k, data] blobstore-map] + (get-blob-localname data k)))) + +(defn download-blobs-for-topology-succeed? + "Assert if all blobs are downloaded for the given topology" + [stormconf-path target-dir] + (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + file-names (get-blob-file-names blobstore-map)] + (if-not (empty? file-names) + (every? #(Utils/checkFileExists target-dir %) file-names) + true))) + ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir supervisor download-lock] - ;; Downloading to permanent location is atomic - (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) - stormroot (supervisor-stormdist-root conf storm-id) - master-meta-file-path (master-storm-metafile-path master-code-dir) - supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)] - (locking download-lock - (log-message "Downloading code for storm id " storm-id " from " master-code-dir) - (FileUtils/forceMkdir (File. tmproot)) - (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path) - (if (:code-distributor supervisor) - (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path))) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot))) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) - (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot) - (log-message "Finished downloading code for storm id " storm-id " from " master-code-dir)))) + :distributed [conf storm-id master-code-dir localizer] + ;; Downloading to permanent location is atomic + (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) + stormroot (supervisor-stormdist-root conf storm-id) + blobstore (Utils/getClientBlobStoreForSupervisor conf)] + (FileUtils/forceMkdir (File. tmproot)) + (if-not on-windows? + (Utils/restrictPermissions tmproot) + (if (conf SUPERVISOR-RUN-WORKER-AS-USER) + (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions")))) + (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id) + (supervisor-stormjar-path tmproot) blobstore) + (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id) + (supervisor-stormcode-path tmproot) blobstore) + (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id) + (supervisor-stormconf-path tmproot) blobstore) + (.shutdown blobstore) + (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) + (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer + tmproot) + (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot) + (do + (log-message "Successfully downloaded blob resources for storm-id " storm-id) + (FileUtils/forceMkdir (File. stormroot)) + (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) + (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE))) + (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)) + (do + (log-message "Failed to download blob resources for storm-id " storm-id) + (rmr tmproot))))) (defn write-log-metadata-to-yaml-file! [storm-id port data conf] (let [file (get-log-metadata-file conf storm-id port)] @@ -781,11 +964,6 @@ (storm-conf TOPOLOGY-USERS)))))}] (write-log-metadata-to-yaml-file! storm-id port data conf))) -(defmethod mk-code-distributor :distributed [conf] - (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))] - (.prepare code-distributor conf) - code-distributor)) - (defn jlp [stormroot conf] (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR) os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_") @@ -811,6 +989,21 @@ :else (-> value sub-fn (clojure.string/split #"\s+"))))) +(defn create-blobstore-links + "Create symlinks in worker launch directory for all blobs" + [conf storm-id worker-id] + (let [stormroot (supervisor-stormdist-root conf storm-id) + storm-conf (read-supervisor-storm-conf conf storm-id) + workerroot (worker-root conf worker-id) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + blob-file-names (get-blob-file-names blobstore-map) + resource-file-names (cons RESOURCES-SUBDIR blob-file-names)] + (log-message "Creating symlinks for worker-id: " worker-id " storm-id: " + storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names)) + (create-symlink! workerroot stormroot RESOURCES-SUBDIR) + (doseq [file-name blob-file-names] + (create-symlink! workerroot stormroot file-name file-name)))) + (defn create-artifacts-link "Create a symlink from workder directory to its port artifacts directory" [conf storm-id port worker-id] @@ -913,6 +1106,7 @@ (add-dead-worker worker-id)) worker-dir (worker-root conf worker-id)] (remove-dead-worker worker-id) + (create-blobstore-links conf storm-id worker-id) (if run-worker-as-user (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)) (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))) @@ -927,31 +1121,31 @@ first )) (defmethod download-storm-code - :local [conf storm-id master-code-dir supervisor download-lock] - (let [stormroot (supervisor-stormdist-root conf storm-id)] - (locking download-lock - (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot)) - (let [classloader (.getContextClassLoader (Thread/currentThread)) - resources-jar (resources-jar) - url (.getResource classloader RESOURCES-SUBDIR) - target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)] - (cond - resources-jar - (do - (log-message "Extracting resources from jar at " resources-jar " to " target-dir) - (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot)) - url - (do - (log-message "Copying resources at " (URI. (str url)) " to " target-dir) - (if (= (.getProtocol url) "jar" ) - (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot) - (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir))) - ) - ) - ) - ))) - -(defmethod mk-code-distributor :local [conf] nil) + :local [conf storm-id master-code-dir localizer] + (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) + stormroot (supervisor-stormdist-root conf storm-id) + blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] + (try + (FileUtils/forceMkdir (File. tmproot)) + (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil) + (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil) + (finally + (.shutdown blob-store))) + (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot) + (let [classloader (.getContextClassLoader (Thread/currentThread)) + resources-jar (resources-jar) + url (.getResource classloader RESOURCES-SUBDIR) + target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)] + (cond + resources-jar + (do + (log-message "Extracting resources from jar at " resources-jar " to " target-dir) + (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot)) + url + (do + (log-message "Copying resources at " (str url) " to " target-dir) + (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))))))) (defmethod launch-worker :local [supervisor storm-id port worker-id mem-onheap]
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 2c98b07..c552519 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -126,7 +126,8 @@ ZMQ-LINGER-MILLIS 0 TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 - STORM-CLUSTER-MODE "local"} + STORM-CLUSTER-MODE "local" + BLOBSTORE-SUPERUSER (System/getProperty "user.name")} (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) {STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) @@ -628,7 +629,7 @@ track-id (-> tracked-topology :cluster ::track-id) waiting? (fn [] (or (not= target (global-amt track-id "spout-emitted")) - (not= (global-amt track-id "transferred") + (not= (global-amt track-id "transferred") (global-amt track-id "processed"))))] (while-timeout timeout-ms (waiting?) ;; (println "Spout emitted: " (global-amt track-id "spout-emitted")) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/util.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index 9b22e70..cbe5bf9 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -22,6 +22,8 @@ (:import [backtype.storm Config]) (:import [backtype.storm.utils Time Container ClojureTimerTask Utils MutableObject MutableInt]) + (:import [backtype.storm.security.auth NimbusPrincipal]) + (:import [javax.security.auth Subject]) (:import [java.util UUID Random ArrayList List Collections]) (:import [java.util.zip ZipFile]) (:import [java.util.concurrent.locks ReentrantReadWriteLock]) @@ -1099,7 +1101,19 @@ (assoc coll k (apply str (repeat (count (coll k)) "#"))) coll)) -(defn log-thrift-access [request-id remoteAddress principal operation] +(defn log-thrift-access + [request-id remoteAddress principal operation] (doto (ThriftAccessLogger.) (.log (str "Request ID: " request-id " access from: " remoteAddress " principal: " principal " operation: " operation)))) + +(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"}) + +(defn validate-key-name! + [name] + (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS) + (throw (RuntimeException. + (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS)))) + (if (clojure.string/blank? name) + (throw (RuntimeException. + ("Key name cannot be blank")))))) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj index 26def33..c91ffa4 100644 --- a/storm-core/src/clj/backtype/storm/zookeeper.clj +++ b/storm-core/src/clj/backtype/storm/zookeeper.clj @@ -114,6 +114,7 @@ (try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path))) (catch KeeperException$NoNodeException e ;; do nothing + (log-message "exception" e) ) (catch Exception e (throw (wrap-in-runtime e))))))) @@ -129,7 +130,6 @@ )) ))) - (defn sync-path [^CuratorFramework zk ^String path] (try @@ -186,6 +186,19 @@ (.. zk (getChildren) (forPath (normalize-path path)))) (catch Exception e (throw (wrap-in-runtime e))))) +(defn delete-node-blobstore + "Deletes the state inside the zookeeper for a key, for which the + contents of the key starts with nimbus host port information" + [^CuratorFramework zk ^String parent-path ^String host-port-info] + (let [parent-path (normalize-path parent-path) + child-path-list (if (exists-node? zk parent-path false) + (into [] (get-children zk parent-path false)) + [])] + (doseq [child child-path-list] + (when (.startsWith child host-port-info) + (log-debug "delete-node " "child" child) + (delete-node zk (str parent-path "/" child)))))) + (defn set-data [^CuratorFramework zk ^String path ^bytes data] (try @@ -232,22 +245,10 @@ (defn leader-latch-listener-impl "Leader latch listener that will be invoked when we either gain or lose leadership" [conf zk leader-latch] - (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost)) - STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")] + (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))] (reify LeaderLatchListener (^void isLeader[this] - (log-message (str hostname " gained leadership, checking if it has all the topology code locally.")) - (let [active-topology-ids (set (get-children zk STORMS-ROOT false)) - local-topology-ids (set (.list (File. (master-stormdist-root conf)))) - diff-topology (first (set-delta active-topology-ids local-topology-ids))] - (log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids) - "] local-topology-ids [" (clojure.string/join "," local-topology-ids) - "] diff-topology [" (clojure.string/join "," diff-topology) "]") - (if (empty? diff-topology) - (log-message "Accepting leadership, all active topology found localy.") - (do - (log-message "code for all active topologies not available locally, giving up leadership.") - (.close leader-latch))))) + (log-message (str hostname " gained leadership"))) (^void notLeader[this] (log-message (str hostname " lost leadership.")))))) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 89422f6..b663dcb 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1056,6 +1056,122 @@ public class Config extends HashMap<String, Object> { public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports"; /** + * What blobstore implementation the supervisor should use. + */ + @isString + public static final String SUPERVISOR_BLOBSTORE = "supervisor.blobstore.class"; + + /** + * The distributed cache target size in MB. This is a soft limit to the size of the distributed + * cache contents. + */ + @isPositiveNumber + @isInteger + public static final String SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB = "supervisor.localizer.cache.target.size.mb"; + + /** + * The distributed cache cleanup interval. Controls how often it scans to attempt to cleanup + * anything over the cache target size. + */ + @isPositiveNumber + @isInteger + public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = "supervisor.localizer.cleanup.interval.ms"; + + /** + * What blobstore implementation the storm client should use. + */ + @isString + public static final String CLIENT_BLOBSTORE = "client.blobstore.class"; + + /** + * What blobstore download parallelism the supervisor should use. + */ + @isPositiveNumber + @isInteger + public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT = "supervisor.blobstore.download.thread.count"; + + /** + * Maximum number of retries a supervisor is allowed to make for downloading a blob. + */ + @isPositiveNumber + @isInteger + public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries"; + + /** + * The blobstore super user has all read/write/admin permissions to all blobs - user running + * the blobstore. + */ + @isString + public static final String BLOBSTORE_SUPERUSER = "blobstore.superuser"; + + /** + * What directory to use for the blobstore. The directory is expected to be an + * absolute path when using HDFS blobstore, for LocalFsBlobStore it could be either + * absolute or relative. + */ + @isString + public static final String BLOBSTORE_DIR = "blobstore.dir"; + + /** + * What buffer size to use for the blobstore uploads. + */ + @isPositiveNumber + @isInteger + public static final String STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES = "storm.blobstore.inputstream.buffer.size.bytes"; + + /** + * Enable the blobstore cleaner. Certain blobstores may only want to run the cleaner + * on one daemon. Currently Nimbus handles setting this. + */ + @isBoolean + public static final String BLOBSTORE_CLEANUP_ENABLE = "blobstore.cleanup.enable"; + + /** + * principal for nimbus/supervisor to use to access secure hdfs for the blobstore. + */ + @isString + public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal"; + + /** + * keytab for nimbus/supervisor to use to access secure hdfs for the blobstore. + */ + @isString + public static final String BLOBSTORE_HDFS_KEYTAB = "blobstore.hdfs.keytab"; + + /** + * Set replication factor for a blob in HDFS Blobstore Implementation + */ + @isPositiveNumber + @isInteger + public static final String STORM_BLOBSTORE_REPLICATION_FACTOR = "storm.blobstore.replication.factor"; + + /** + * What blobstore implementation nimbus should use. + */ + @isString + public static final String NIMBUS_BLOBSTORE = "nimbus.blobstore.class"; + + /** + * During operations with the blob store, via master, how long a connection + * is idle before nimbus considers it dead and drops the session and any + * associated connections. + */ + @isPositiveNumber + @isInteger + public static final String NIMBUS_BLOBSTORE_EXPIRATION_SECS = "nimbus.blobstore.expiration.secs"; + + /** + * A map with blobstore keys mapped to each filename the worker will have access to in the + * launch directory to the blob by local file name and uncompress flag. Both localname and + * uncompress flag are optional. It uses the key is localname is not specified. Each topology + * will have different map of blobs. Example: topology.blobstore.map: {"blobstorekey" : + * {"localname": "myblob", "uncompress": false}, "blobstorearchivekey" : + * {"localname": "myarchive", "uncompress": true}} + */ + @CustomValidator(validatorClass = MapOfStringToMapOfStringToObjectValidator.class) + public static final String TOPOLOGY_BLOBSTORE_MAP = "topology.blobstore.map"; + + /** * A number representing the maximum number of workers any single topology can acquire. */ @isInteger @@ -1847,13 +1963,6 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS="topology.disruptor.batch.timeout.millis"; /** - * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code - * distribution. - */ - @isString - public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class"; - - /** * Minimum number of nimbus hosts where the code must be replicated before leader nimbus * is allowed to perform topology activation tasks like setting up heartbeats/assignments * and marking the topology as active. default is 0. http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java new file mode 100644 index 0000000..f35b7a7 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An output stream where all of the data is committed on close, + * or can be canceled with cancel. + */ +public abstract class AtomicOutputStream extends OutputStream { + /** + * Cancel all of the writes associated with this stream and close it. + */ + public abstract void cancel() throws IOException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java new file mode 100644 index 0000000..53cfa15 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +public class BlobKeySequenceInfo { + private String nimbusHostPort; + private String sequenceNumber; + + public void setNimbusHostPort(String nimbusHostPort) { + this.nimbusHostPort = nimbusHostPort; + } + + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public String getNimbusHostPort() { + return nimbusHostPort; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java new file mode 100644 index 0000000..a714b76 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.regex.Pattern; + +import javax.security.auth.Subject; + +import backtype.storm.nimbus.NimbusInfo; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.daemon.Shutdownable; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; + +/** + * Provides a way to store blobs that can be downloaded. + * Blobs must be able to be uploaded and listed from Nimbus, + * and downloaded from the Supervisors. It is a key value based + * store. Key being a string and value being the blob data. + * + * ACL checking must take place against the provided subject. + * If the blob store does not support Security it must validate + * that all ACLs set are always WORLD, everything. + * + * The users can upload their blobs through the blob store command + * line. The command line also allows us to update and delete blobs. + * + * Modifying the replication factor only works for HdfsBlobStore + * as for the LocalFsBlobStore the replication is dependent on + * the number of Nimbodes available. + */ +public abstract class BlobStore implements Shutdownable { + private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class); + private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$"); + protected static final String BASE_BLOBS_DIR_NAME = "blobs"; + + /** + * Allows us to initialize the blob store + * @param conf The storm configuration + * @param baseDir The directory path to store the blobs + * @param nimbusInfo Contains the nimbus host, port and leadership information. + */ + public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo); + + /** + * Creates the blob. + * @param key Key for the blob. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + */ + public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException; + + /** + * Updates the blob data. + * @param key Key for the blob. + * @param who Is the subject having the write privilege for the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Gets the current version of metadata for a blob + * to be viewed by the user or downloaded by the supervisor. + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Sets the metadata with renewed acls for the blob. + * @param key Key for the blob. + * @param meta Metadata which contains the updated + * acls information. + * @param who Is the subject having the write privilege for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Deletes the blob data and metadata. + * @param key Key for the blob. + * @param who Is the subject having write privilege for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Gets the InputStream to read the blob details + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return InputStreamWithMeta has the additional + * file length and version information. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Returns an iterator with all the list of + * keys currently available on the blob store. + * @return Iterator<String> + */ + public abstract Iterator<String> listKeys(); + + /** + * Gets the replication factor of the blob. + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return BlobReplication object containing the + * replication factor for the blob. + * @throws Exception + */ + public abstract int getBlobReplication(String key, Subject who) throws Exception; + + /** + * Modifies the replication factor of the blob. + * @param key Key for the blob. + * @param replication The replication factor the + * blob has to be set. + * @param who Is the subject having the update privilege for the blob + * @return BlobReplication object containing the + * updated replication factor for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + * @throws IOException + */ + public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException; + + /** + * Filters keys based on the KeyFilter + * passed as the argument. + * @param filter KeyFilter + * @param <R> Type + * @return Set of filtered keys + */ + public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) { + Set<R> ret = new HashSet<R>(); + Iterator<String> keys = listKeys(); + while (keys.hasNext()) { + String key = keys.next(); + R filtered = filter.filter(key); + if (filtered != null) { + ret.add(filtered); + } + } + return ret; + } + + /** + * Validates key checking for potentially harmful patterns + * @param key Key for the blob. + */ + public static final void validateKey(String key) { + if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) { + LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN); + throw new IllegalArgumentException(key+" does not appear to be a valid blob key"); + } + } + + /** + * Wrapper called to create the blob which contains + * the byte data + * @param key Key for the blob. + * @param data Byte data that needs to be uploaded. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + * @throws IOException + */ + public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { + AtomicOutputStream out = null; + try { + out = createBlob(key, meta, who); + out.write(data); + out.close(); + out = null; + } finally { + if (out != null) { + out.cancel(); + } + } + } + + /** + * Wrapper called to create the blob which contains + * the byte data + * @param key Key for the blob. + * @param in InputStream from which the data is read to be + * written as a part of the blob. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + * @throws IOException + */ + public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { + AtomicOutputStream out = null; + try { + out = createBlob(key, meta, who); + byte[] buffer = new byte[2048]; + int len = 0; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.close(); + } catch (AuthorizationException | IOException | RuntimeException e) { + if (out !=null) { + out.cancel(); + } + } finally { + in.close(); + } + } + + /** + * Reads the blob from the blob store + * and writes it into the output stream. + * @param key Key for the blob. + * @param out Output stream + * @param who Is the subject having read + * privilege for the blob. + * @throws IOException + * @throws KeyNotFoundException + * @throws AuthorizationException + */ + public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { + InputStreamWithMeta in = getBlob(key, who); + if (in == null) { + throw new IOException("Could not find " + key); + } + byte[] buffer = new byte[2048]; + int len = 0; + try{ + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } finally { + in.close(); + out.flush(); + } + } + + /** + * Wrapper around readBlobTo which + * returns a ByteArray output stream. + * @param key Key for the blob. + * @param who Is the subject having + * the read privilege for the blob. + * @return ByteArrayOutputStream + * @throws IOException + * @throws KeyNotFoundException + * @throws AuthorizationException + */ + public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + readBlobTo(key, out, who); + return out.toByteArray(); + } + + /** + * Output stream implementation used for reading the + * metadata and data information. + */ + protected class BlobStoreFileOutputStream extends AtomicOutputStream { + private BlobStoreFile part; + private OutputStream out; + + public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException { + this.part = part; + this.out = part.getOutputStream(); + } + + @Override + public void close() throws IOException { + try { + //close means commit + out.close(); + part.commit(); + } catch (IOException | RuntimeException e) { + cancel(); + throw e; + } + } + + @Override + public void cancel() throws IOException { + try { + out.close(); + } finally { + part.cancel(); + } + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte []b) throws IOException { + out.write(b); + } + + @Override + public void write(byte []b, int offset, int len) throws IOException { + out.write(b, offset, len); + } + } + + /** + * Input stream implementation used for writing + * both the metadata containing the acl information + * and the blob data. + */ + protected class BlobStoreFileInputStream extends InputStreamWithMeta { + private BlobStoreFile part; + private InputStream in; + + public BlobStoreFileInputStream(BlobStoreFile part) throws IOException { + this.part = part; + this.in = part.getInputStream(); + } + + @Override + public long getVersion() throws IOException { + return part.getModTime(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public long getFileLength() throws IOException { + return part.getFileLength(); + } + } + + /** + * Blob store implements its own version of iterator + * to list the blobs + */ + public static class KeyTranslationIterator implements Iterator<String> { + private Iterator<String> it = null; + private String next = null; + private String prefix = null; + + public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException { + this.it = it; + this.prefix = prefix; + primeNext(); + } + + private void primeNext() { + next = null; + while (it.hasNext()) { + String tmp = it.next(); + if (tmp.startsWith(prefix)) { + next = tmp.substring(prefix.length()); + return; + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String current = next; + primeNext(); + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Delete Not Supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java new file mode 100644 index 0000000..c0c4e5c --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.generated.AccessControl; +import backtype.storm.generated.AccessControlType; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.security.auth.AuthUtils; +import backtype.storm.security.auth.IPrincipalToLocal; +import backtype.storm.security.auth.NimbusPrincipal; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Provides common handling of acls for Blobstores. + * Also contains some static utility functions related to Blobstores. + */ +public class BlobStoreAclHandler { + public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class); + private final IPrincipalToLocal _ptol; + + public static final int READ = 0x01; + public static final int WRITE = 0x02; + public static final int ADMIN = 0x04; + public static final List<AccessControl> WORLD_EVERYTHING = + Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN)); + public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>(); + private Set<String> _supervisors; + private Set<String> _admins; + + public BlobStoreAclHandler(Map conf) { + _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf); + _supervisors = new HashSet<String>(); + _admins = new HashSet<String>(); + if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) { + _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS)); + } + if (conf.containsKey(Config.NIMBUS_ADMINS)) { + _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS)); + } + } + + private static AccessControlType parseACLType(String type) { + if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) { + return AccessControlType.OTHER; + } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) { + return AccessControlType.USER; + } + throw new IllegalArgumentException(type+" is not a valid access control type"); + } + + private static int parseAccess(String access) { + int ret = 0; + for (char c: access.toCharArray()) { + if ('r' == c) { + ret = ret | READ; + } else if ('w' == c) { + ret = ret | WRITE; + } else if ('a' == c) { + ret = ret | ADMIN; + } else if ('-' == c) { + //ignored + } else { + throw new IllegalArgumentException(""); + } + } + return ret; + } + + public static AccessControl parseAccessControl(String str) { + String[] parts = str.split(":"); + String type = "other"; + String name = ""; + String access = "-"; + if (parts.length > 3) { + throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value"); + } else if (parts.length == 1) { + type = "other"; + name = ""; + access = parts[0]; + } else if (parts.length == 2) { + type = "user"; + name = parts[0]; + access = parts[1]; + } else if (parts.length == 3) { + type = parts[0]; + name = parts[1]; + access = parts[2]; + } + AccessControl ret = new AccessControl(); + ret.set_type(parseACLType(type)); + ret.set_name(name); + ret.set_access(parseAccess(access)); + return ret; + } + + private static String accessToString(int access) { + StringBuilder ret = new StringBuilder(); + ret.append(((access & READ) > 0) ? "r" : "-"); + ret.append(((access & WRITE) > 0) ? "w" : "-"); + ret.append(((access & ADMIN) > 0) ? "a" : "-"); + return ret.toString(); + } + + public static String accessControlToString(AccessControl ac) { + StringBuilder ret = new StringBuilder(); + switch(ac.get_type()) { + case OTHER: + ret.append("o"); + break; + case USER: + ret.append("u"); + break; + default: + throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means "); + } + ret.append(":"); + if (ac.is_set_name()) { + ret.append(ac.get_name()); + } + ret.append(":"); + ret.append(accessToString(ac.get_access())); + return ret.toString(); + } + + public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException { + Set<String> aclUsers = new HashSet<>(); + List<String> duplicateUsers = new ArrayList<>(); + for (AccessControl acl : acls) { + String aclUser = acl.get_name(); + if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) { + LOG.error("'{}' user can't appear more than once in the ACLs", aclUser); + duplicateUsers.add(aclUser); + } + } + if (duplicateUsers.size() > 0) { + String errorMessage = "user " + Arrays.toString(duplicateUsers.toArray()) + + " can't appear more than once in the ACLs for key [" + key +"]."; + throw new AuthorizationException(errorMessage); + } + } + + private Set<String> constructUserFromPrincipals(Subject who) { + Set<String> user = new HashSet<String>(); + if (who != null) { + for (Principal p : who.getPrincipals()) { + user.add(_ptol.toLocal(p)); + } + } + return user; + } + + private boolean isAdmin(Subject who) { + Set<String> user = constructUserFromPrincipals(who); + for (String u : user) { + if (_admins.contains(u)) { + return true; + } + } + return false; + } + + private boolean isReadOperation(int operation) { + if (operation == 1) { + return true; + } + return false; + } + + private boolean isSupervisor(Subject who, int operation) { + Set<String> user = constructUserFromPrincipals(who); + if (isReadOperation(operation)) { + for (String u : user) { + if (_supervisors.contains(u)) { + return true; + } + } + } + return false; + } + + private boolean isNimbus(Subject who) { + Set<Principal> principals; + boolean isNimbusInstance = false; + if (who != null) { + principals = who.getPrincipals(); + for (Principal principal : principals) { + if (principal instanceof NimbusPrincipal) { + isNimbusInstance = true; + } + } + } + return isNimbusInstance; + } + + public boolean checkForValidUsers(Subject who, int mask) { + return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask); + } + + /** + * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN + */ + public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException { + hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key); + } + + /** + * Validates if the user has any of the permissions + * mentioned in the mask. + * @param acl ACL for the key. + * @param mask mask holds the cumulative value of + * READ = 1, WRITE = 2 or ADMIN = 4 permissions. + * mask = 1 implies READ privilege. + * mask = 5 implies READ and ADMIN privileges. + * @param who Is the user against whom the permissions + * are validated for a key using the ACL and the mask. + * @param key Key used to identify the blob. + * @throws AuthorizationException + */ + public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException { + Set<String> user = constructUserFromPrincipals(who); + LOG.debug("user {}", user); + if (checkForValidUsers(who, mask)) { + return; + } + for (AccessControl ac : acl) { + int allowed = getAllowed(ac, user); + LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key); + if ((allowed & mask) > 0) { + return; + } + } + throw new AuthorizationException( + user + " does not have access to " + key); + } + + /** + * Validates if the user has at least the set of permissions + * mentioned in the mask. + * @param acl ACL for the key. + * @param mask mask holds the cumulative value of + * READ = 1, WRITE = 2 or ADMIN = 4 permissions. + * mask = 1 implies READ privilege. + * mask = 5 implies READ and ADMIN privileges. + * @param who Is the user against whom the permissions + * are validated for a key using the ACL and the mask. + * @param key Key used to identify the blob. + * @throws AuthorizationException + */ + public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException { + Set<String> user = constructUserFromPrincipals(who); + LOG.debug("user {}", user); + if (checkForValidUsers(who, mask)) { + return; + } + for (AccessControl ac : acl) { + int allowed = getAllowed(ac, user); + mask = ~allowed & mask; + LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key); + } + if (mask == 0) { + return; + } + throw new AuthorizationException( + user + " does not have " + namedPerms(mask) + " access to " + key); + } + + public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) { + meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask)); + } + + private String namedPerms(int mask) { + StringBuilder b = new StringBuilder(); + b.append("["); + if ((mask & READ) > 0) { + b.append("READ "); + } + if ((mask & WRITE) > 0) { + b.append("WRITE "); + } + if ((mask & ADMIN) > 0) { + b.append("ADMIN "); + } + b.append("]"); + return b.toString(); + } + + private int getAllowed(AccessControl ac, Set<String> users) { + switch (ac.get_type()) { + case OTHER: + return ac.get_access(); + case USER: + if (users.contains(ac.get_name())) { + return ac.get_access(); + } + return 0; + default: + return 0; + } + } + + private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) { + List<AccessControl> resultAcl = new ArrayList<AccessControl>(); + for (AccessControl control : accessControls) { + if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) { + LOG.debug("Removing invalid blobstore world ACL " + + BlobStoreAclHandler.accessControlToString(control)); + continue; + } + resultAcl.add(control); + } + return resultAcl; + } + + private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who, + int opMask) { + List<AccessControl> cleanAcls = removeBadACLs(acls); + Set<String> userNames = getUserNamesFromSubject(who); + for (String user : userNames) { + fixACLsForUser(cleanAcls, user, opMask); + } + if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) { + cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls); + if (!acls.isEmpty()) + LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key); + } + return cleanAcls; + } + + private boolean worldEverything(List<AccessControl> acls) { + boolean isWorldEverything = false; + for (AccessControl acl : acls) { + if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) { + isWorldEverything = true; + break; + } + } + return isWorldEverything; + } + + private void fixACLsForUser(List<AccessControl> acls, String user, int mask) { + boolean foundUserACL = false; + for (AccessControl control : acls) { + if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) { + int currentAccess = control.get_access(); + if ((currentAccess & mask) != mask) { + control.set_access(currentAccess | mask); + } + foundUserACL = true; + break; + } + } + if (!foundUserACL) { + AccessControl userACL = new AccessControl(); + userACL.set_type(AccessControlType.USER); + userACL.set_name(user); + userACL.set_access(mask); + acls.add(userACL); + } + } + + private Set<String> getUserNamesFromSubject(Subject who) { + Set<String> user = new HashSet<String>(); + if (who != null) { + for(Principal p: who.getPrincipals()) { + user.add(_ptol.toLocal(p)); + } + } + return user; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java new file mode 100644 index 0000000..22ccf97 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.generated.SettableBlobMeta; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.regex.Pattern; + +/** + * Provides an base implementation for creating a blobstore based on file backed storage. + */ +public abstract class BlobStoreFile { + public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class); + + protected static final String TMP_EXT = ".tmp"; + protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$"); + protected static final String BLOBSTORE_DATA_FILE = "data"; + + public abstract void delete() throws IOException; + public abstract String getKey(); + public abstract boolean isTmp(); + public abstract void setMetadata(SettableBlobMeta meta); + public abstract SettableBlobMeta getMetadata(); + public abstract long getModTime() throws IOException; + public abstract InputStream getInputStream() throws IOException; + public abstract OutputStream getOutputStream() throws IOException; + public abstract void commit() throws IOException; + public abstract void cancel() throws IOException; + public abstract long getFileLength() throws IOException; +}
