[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009536#comment-15009536
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45122806
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -1767,6 +1900,169 @@
topology-id
(doto (GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/ALL))))
+ ;;Blobstore implementation code
+ (^String beginCreateBlob [this
+ ^String blob-key
+ ^SettableBlobMeta blob-meta]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus)
+ session-id
+ (->> (ReqContext/context)
+ (.subject)
+ (.createBlob (:blob-store nimbus) blob-key blob-meta)))
+ (log-message "Created blob for " blob-key
+ " with session id " session-id)
+ (str session-id)))
+
+ (^String beginUpdateBlob [this ^String blob-key]
+ (let [^AtomicOutputStream os (->> (ReqContext/context)
+ (.subject)
+ (.updateBlob (:blob-store nimbus)
+ blob-key))]
+ (let [session-id (uuid)]
+ (.put (:blob-uploaders nimbus) session-id os)
+ (log-message "Created upload session for " blob-key
+ " with id " session-id)
+ (str session-id))))
+
+ (^void createStateInZookeeper [this ^String blob-key]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)
+ conf (:conf nimbus)]
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state blob-key
nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info
conf)))
+ (log-debug "Created state in zookeeper" storm-cluster-state
blob-store nimbus-host-port-info)))
+
+ (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
+ (let [uploaders (:blob-uploaders nimbus)]
+ (if-let [^AtomicOutputStream os (.get uploaders session)]
+ (let [chunk-array (.array blob-chunk)
+ remaining (.remaining blob-chunk)
+ array-offset (.arrayOffset blob-chunk)
+ position (.position blob-chunk)]
+ (.write os chunk-array (+ array-offset position) remaining)
+ (.put uploaders session os))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)"))))
+
+ (^void finishBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus)
session)]
+ (do
+ (.close os)
+ (log-message "Finished uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^void cancelBlobUpload [this ^String session]
+ (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus)
session)]
+ (do
+ (.cancel os)
+ (log-message "Canceled uploading blob for session "
+ session
+ ". Closing session.")
+ (.remove (:blob-uploaders nimbus) session))
+ (throw-runtime "Blob for session "
+ session
+ " does not exist (or timed out)")))
+
+ (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
+ (let [^ReadableBlobMeta ret (->> (ReqContext/context)
+ (.subject)
+ (.getBlobMeta (:blob-store nimbus)
+ blob-key))]
+ ret))
+
+ (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta
blob-meta]
+ (->> (ReqContext/context)
+ (.subject)
+ (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
+
+ (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
+ (let [^InputStreamWithMeta is (->> (ReqContext/context)
+ (.subject)
+ (.getBlob (:blob-store nimbus)
+ blob-key))]
+ (let [session-id (uuid)
+ ret (BeginDownloadResult. (.getVersion is) (str
session-id))]
+ (.set_data_size ret (.getFileLength is))
+ (.put (:blob-downloaders nimbus) session-id
(BufferInputStream. is ^Integer (Utils/getInt (conf
STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
+ (log-message "Created download session for " blob-key
+ " with id " session-id)
+ ret)))
+
+ (^ByteBuffer downloadBlobChunk [this ^String session]
+ (let [downloaders (:blob-downloaders nimbus)
+ ^BufferInputStream is (.get downloaders session)]
+ (when-not is
+ (throw (RuntimeException.
+ "Could not find input stream for session " session)))
+ (let [ret (.read is)]
+ (.put downloaders session is)
+ (when (empty? ret)
+ (.close is)
+ (.remove downloaders session))
+ (log-debug "Sending " (alength ret) " bytes")
+ (ByteBuffer/wrap ret))))
+
+ (^void deleteBlob [this ^String blob-key]
+ (let [subject (->> (ReqContext/context)
+ (.subject))]
+ (.deleteBlob (:blob-store nimbus) blob-key subject)
+ (when (instance? LocalFsBlobStore blob-store)
+ (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
+ (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+ (log-message "Deleted blob for key " blob-key)))
--- End diff --
Check indentation after `let`.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)