[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15011984#comment-15011984
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45261397
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -454,10 +448,79 @@
(shutdown-worker supervisor id))
))
+(defn get-blob-localname
+ "Given the blob information either gets the localname field if it exists,
+ else routines the default value passed in."
+ [blob-info defaultValue]
+ (or (get blob-info "localname") defaultValue))
+
+(defn should-uncompress-blob?
+ "Given the blob information returns the value of the uncompress field,
handling it either being
+ a string or a boolean value, or if it's not specified then returns false"
+ [blob-info]
+ (Boolean. (get blob-info "uncompress")))
+
+(defn remove-blob-references
+ "Remove a reference to a blob when its no longer needed."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)]
+ (if blobstore-map
+ (doseq [[k, v] blobstore-map]
+ (.removeBlobReference localizer
+ k
+ user
+ topo-name
+ (should-uncompress-blob? v))))))
+
+(defn blobstore-map-to-localresources
+ "Returns a list of LocalResources based on the blobstore-map passed in."
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob?
v)))
+ ()))
+
+(defn add-blob-references
+ "For each of the downloaded topologies, adds references to the blobs
that the topologies are
+ using. This is used to reconstruct the cache on restart."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (if blobstore-map
+ (.addReferences localizer localresources user topo-name))))
+
+(defn rm-topo-files
+ [conf storm-id localizer rm-blob-refs?]
+ (let [path (supervisor-stormdist-root conf storm-id)]
+ (try
+ (if rm-blob-refs?
+ (remove-blob-references localizer storm-id conf))
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (rmr-as-user conf storm-id path)
+ (rmr (supervisor-stormdist-root conf storm-id)))
+ (catch Exception e
+ (log-message e (str "Exception removing: " storm-id))))))
--- End diff --
Need to indent the `try` inside the `let`.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)