[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009793#comment-15009793
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45136234
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
    @@ -454,10 +455,82 @@
           (shutdown-worker supervisor id))
         ))
     
    +(defn get-blob-localname
    +  "Given the blob information either gets the localname field if it exists,
    +  else routines the default value passed in."
    +  [blob-info defaultValue]
    +  (if-let [val (if blob-info (get blob-info "localname") nil)] val 
defaultValue))
    +
    +(defn should-uncompress-blob?
    +  "Given the blob information returns the value of the uncompress field, 
handling it either being
    +  a string or a boolean value, or ifs its not specified then returns false"
    +  [blob-info]
    +  (boolean (and blob-info
    +             (if-let [val (get blob-info "uncompress")]
    +               (.booleanValue (Boolean. val))))))
    +
    +(defn remove-blob-references
    +  "Remove a reference to a blob when its no longer needed."
    +  [localizer storm-id conf]
    +  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
    +        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
    +        user (storm-conf TOPOLOGY-SUBMITTER-USER)
    +        topo-name (storm-conf TOPOLOGY-NAME)]
    +    (if blobstore-map
    +      (doseq [[k, v] blobstore-map]
    +        (.removeBlobReference localizer
    +          k
    +          user
    +          topo-name
    +          (should-uncompress-blob? v))))))
    +
    +(defn blobstore-map-to-localresources
    +  "Returns a list of LocalResources based on the blobstore-map passed in."
    +  [blobstore-map]
    +  (if blobstore-map
    +    (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? 
v)))
    +    ()))
    +
    +(defn add-blob-references
    +  "For each of the downloaded topologies, adds references to the blobs 
that the topologies are
    +  using. This is used to reconstruct the cache on restart."
    +  [localizer storm-id conf]
    +  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
    +        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
    +        user (storm-conf TOPOLOGY-SUBMITTER-USER)
    +        topo-name (storm-conf TOPOLOGY-NAME)
    +        localresources (blobstore-map-to-localresources blobstore-map)]
    +    (if blobstore-map
    +      (.addReferences localizer localresources user topo-name))))
    +
    +(defn rm-topo-files
    +  [conf storm-id localizer rm-blob-refs?]
    +  (try
    +    (if (= true rm-blob-refs?)
    +      (remove-blob-references localizer storm-id conf))
    +    (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
    +      (rmr-as-user conf storm-id (supervisor-stormdist-root conf storm-id))
    +      (rmr (supervisor-stormdist-root conf storm-id)))
    +    (catch Exception e
    +      (log-message e (str "Exception removing: " storm-id)))))
    +
    +(defn verify-downloaded-files [conf localizer assigned-storm-ids 
all-downloaded-storm-ids]
    +  "Method written to check for the files exists to avoid supervisor 
crashing
    +   Also makes sure there is no necessity for locking"
    +  (remove nil?
    +    (into #{}
    +      (for [storm-id all-downloaded-storm-ids
    +            :let [rm-blob-refs? false]
    +            :when (contains? assigned-storm-ids storm-id)]
    +        (if (not (required-topo-files-exist? conf storm-id))
    +          (do
    +            (log-debug "Files not present in topology directory")
    +            (rm-topo-files conf storm-id localizer rm-blob-refs?)
    +            storm-id))))))
    --- End diff --
    
    `when-not` instead of `(if (not ... (do`


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to