[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009532#comment-15009532 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45122625 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -1767,6 +1900,169 @@ topology-id (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) + ;;Blobstore implementation code + (^String beginCreateBlob [this + ^String blob-key + ^SettableBlobMeta blob-meta] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) + session-id + (->> (ReqContext/context) + (.subject) + (.createBlob (:blob-store nimbus) blob-key blob-meta))) + (log-message "Created blob for " blob-key + " with session id " session-id) + (str session-id))) + + (^String beginUpdateBlob [this ^String blob-key] + (let [^AtomicOutputStream os (->> (ReqContext/context) + (.subject) + (.updateBlob (:blob-store nimbus) + blob-key))] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) session-id os) + (log-message "Created upload session for " blob-key + " with id " session-id) + (str session-id)))) + + (^void createStateInZookeeper [this ^String blob-key] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus) + conf (:conf nimbus)] + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) + + (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] + (let [uploaders (:blob-uploaders nimbus)] + (if-let [^AtomicOutputStream os (.get uploaders session)] + (let [chunk-array (.array blob-chunk) + remaining (.remaining blob-chunk) + array-offset (.arrayOffset blob-chunk) + position (.position blob-chunk)] + (.write os chunk-array (+ array-offset position) remaining) + (.put uploaders session os)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)")))) + + (^void finishBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.close os) + (log-message "Finished uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^void cancelBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.cancel os) + (log-message "Canceled uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^ReadableBlobMeta getBlobMeta [this ^String blob-key] + (let [^ReadableBlobMeta ret (->> (ReqContext/context) + (.subject) + (.getBlobMeta (:blob-store nimbus) + blob-key))] + ret)) + + (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta] + (->> (ReqContext/context) + (.subject) + (.setBlobMeta (:blob-store nimbus) blob-key blob-meta))) + + (^BeginDownloadResult beginBlobDownload [this ^String blob-key] + (let [^InputStreamWithMeta is (->> (ReqContext/context) + (.subject) + (.getBlob (:blob-store nimbus) + blob-key))] + (let [session-id (uuid) + ret (BeginDownloadResult. (.getVersion is) (str session-id))] + (.set_data_size ret (.getFileLength is)) + (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))) --- End diff -- `Utils/getInt` returns `Integer`, so we do not need to type hint here. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)