[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009552#comment-15009552 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45123873 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -1767,6 +1900,169 @@ topology-id (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) + ;;Blobstore implementation code + (^String beginCreateBlob [this + ^String blob-key + ^SettableBlobMeta blob-meta] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) + session-id + (->> (ReqContext/context) + (.subject) + (.createBlob (:blob-store nimbus) blob-key blob-meta))) + (log-message "Created blob for " blob-key + " with session id " session-id) + (str session-id))) + + (^String beginUpdateBlob [this ^String blob-key] + (let [^AtomicOutputStream os (->> (ReqContext/context) + (.subject) + (.updateBlob (:blob-store nimbus) + blob-key))] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) session-id os) + (log-message "Created upload session for " blob-key + " with id " session-id) + (str session-id)))) + + (^void createStateInZookeeper [this ^String blob-key] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus) + conf (:conf nimbus)] + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) + + (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] + (let [uploaders (:blob-uploaders nimbus)] + (if-let [^AtomicOutputStream os (.get uploaders session)] + (let [chunk-array (.array blob-chunk) + remaining (.remaining blob-chunk) + array-offset (.arrayOffset blob-chunk) + position (.position blob-chunk)] + (.write os chunk-array (+ array-offset position) remaining) + (.put uploaders session os)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)")))) + + (^void finishBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.close os) + (log-message "Finished uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^void cancelBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.cancel os) + (log-message "Canceled uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^ReadableBlobMeta getBlobMeta [this ^String blob-key] + (let [^ReadableBlobMeta ret (->> (ReqContext/context) + (.subject) + (.getBlobMeta (:blob-store nimbus) + blob-key))] + ret)) + + (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta] + (->> (ReqContext/context) + (.subject) + (.setBlobMeta (:blob-store nimbus) blob-key blob-meta))) + + (^BeginDownloadResult beginBlobDownload [this ^String blob-key] + (let [^InputStreamWithMeta is (->> (ReqContext/context) + (.subject) + (.getBlob (:blob-store nimbus) + blob-key))] + (let [session-id (uuid) + ret (BeginDownloadResult. (.getVersion is) (str session-id))] + (.set_data_size ret (.getFileLength is)) + (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))) + (log-message "Created download session for " blob-key + " with id " session-id) + ret))) + + (^ByteBuffer downloadBlobChunk [this ^String session] + (let [downloaders (:blob-downloaders nimbus) + ^BufferInputStream is (.get downloaders session)] + (when-not is + (throw (RuntimeException. + "Could not find input stream for session " session))) + (let [ret (.read is)] + (.put downloaders session is) + (when (empty? ret) + (.close is) + (.remove downloaders session)) + (log-debug "Sending " (alength ret) " bytes") + (ByteBuffer/wrap ret)))) + + (^void deleteBlob [this ^String blob-key] + (let [subject (->> (ReqContext/context) + (.subject))] + (.deleteBlob (:blob-store nimbus) blob-key subject) + (when (instance? LocalFsBlobStore blob-store) + (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) + (.remove-key-version! (:storm-cluster-state nimbus) blob-key)) + (log-message "Deleted blob for key " blob-key))) + + (^ListBlobsResult listBlobs [this ^String session] + (let [listers (:blob-listers nimbus) + ^Iterator keys-it (if (clojure.string/blank? session) + (->> (ReqContext/context) + (.subject) + (.listKeys (:blob-store nimbus))) + (.get listers session)) + _ (or keys-it (throw-runtime "Blob list for session " + session + " does not exist (or timed out)")) + + ;; Create a new session id if the user gave an empty session string. + ;; This is the use case when the user wishes to list blobs + ;; starting from the beginning. + session (if (clojure.string/blank? session) + (let [new-session (uuid)] + (log-message "Creating new session for downloading list " new-session) + new-session) + session)] + (if-not (.hasNext keys-it) + (do + (.remove listers session) + (log-message "No more blobs to list for session " session) + ;; A blank result communicates that there are no more blobs. + (ListBlobsResult. (ArrayList. 0) (str session))) + (let [^List list-chunk (->> keys-it + (iterator-seq) + (take 100) ;; Limit to next 100 keys + (ArrayList.)) + _ (log-message session " downloading " (.size list-chunk) " entries")] --- End diff -- This can be moved outside the `let` binding, before the `.put`. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)