[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15007559#comment-15007559
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user knusbaum commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r44997133
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -526,6 +605,41 @@
{Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf
SUPERVISOR-MEMORY-CAPACITY-MB))
Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+(defn update-blobs-for-topology!
+ "Update each blob listed in the topology configuration if the latest
version of the blob
+ has not been downloaded."
+ [conf storm-id localizer]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ user-dir (.getLocalUserFileCacheDir localizer user)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (try
+ (.updateBlobs localizer localresources user)
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf)))))
+
+(defn update-blobs-for-all-topologies-fn
+ "Returns a function that downloads all blobs listed in the topology
configuration for all topologies assigned
+ to this supervisor, and creates version files with a suffix. The
returned function is intended to be run periodically
+ by a timer, created elsewhere."
+ [supervisor]
+ (fn this []
+ (try
+ (let [conf (:conf supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ new-assignment @(:curr-assignment supervisor)
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments
new-assignment)]
+ (doseq [topology-id downloaded-storm-ids]
+ (let [storm-root (supervisor-stormdist-root conf topology-id)]
+ (when (assigned-storm-ids topology-id)
+ (log-debug "Checking Blob updates for storm topology id "
topology-id " With target_dir: " storm-root)
+ (update-blobs-for-topology! conf topology-id (:localizer
supervisor))))))
+ (catch Exception e (log-error e "Error updating blobs, will retry
again later")))))
--- End diff --
Exception body on its own line.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)