[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009739#comment-15009739
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45133203
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
    @@ -372,66 +389,50 @@
              ". State: " state
              ", Heartbeat: " (pr-str heartbeat))
             (shutdown-worker supervisor id)
    -        (if (:code-distributor supervisor)
    -          (.cleanup (:code-distributor supervisor) id))
    -        ))
    -
    -    (doseq [id (vals new-worker-ids)]
    -      (local-mkdirs (worker-pids-root conf id))
    -      (local-mkdirs (worker-heartbeats-root conf id)))
    -    (ls-approved-workers! local-state
    -          (merge
    -           (select-keys (ls-approved-workers local-state)
    -                        (keys keepers))
    -           (zipmap (vals new-worker-ids) (keys new-worker-ids))
    -           ))
    -
    -    ;; check storm topology code dir exists before launching workers
    -    (doseq [[port assignment] reassign-executors]
    -      (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
    -            storm-id (:storm-id assignment)
    -            cached-assignment-info @(:assignment-versions supervisor)
    -            assignment-info (if (and (not-nil? cached-assignment-info) 
(contains? cached-assignment-info storm-id ))
    -                              (get cached-assignment-info storm-id)
    -                              (.assignment-info-with-version 
storm-cluster-state storm-id nil))
    -       storm-code-map (read-storm-code-locations assignment-info)
    -            master-code-dir (if (contains? storm-code-map :data) 
(storm-code-map :data))
    -            stormroot (supervisor-stormdist-root conf storm-id)]
    -        (if-not (or (contains? downloaded-storm-ids storm-id) (.exists 
(File. stormroot)) (nil? master-code-dir))
    -          (download-storm-code conf storm-id master-code-dir supervisor 
download-lock))
             ))
    -
    -    (wait-for-workers-launch
    -     conf
    -     (dofor [[port assignment] reassign-executors]
    -            (let [id (new-worker-ids port)
    -                  storm-id (:storm-id assignment)
    -                  ^WorkerResources resources (:resources assignment)
    -                  mem-onheap (.get_mem_on_heap resources)]
    -              (try
    -                (log-message "Launching worker with assignment "
    -                             (pr-str assignment)
    -                             " for this supervisor "
    -                             (:supervisor-id supervisor)
    -                             " on port "
    -                             port
    -                             " with id "
    -                             id
    -                             )
    -                (launch-worker supervisor
    -                               (:storm-id assignment)
    -                               port
    -                               id
    -                               mem-onheap)
    -                (mark! supervisor:num-workers-launched)
    -                (catch java.io.FileNotFoundException e
    -                  (log-message "Unable to launch worker due to "
    -                               (.getMessage e)))
    -                (catch java.io.IOException e
    -                  (log-message "Unable to launch worker due to "
    -                               (.getMessage e))))
    -         id)))
    -    ))
    +    (let [valid-new-worker-ids
    +          (into {}
    +            (remove nil?
    +              (dofor [[port assignment] reassign-executors]
    +                (let [id (new-worker-ids port)
    +                      storm-id (:storm-id assignment)
    +                      ^WorkerResources resources (:resources assignment)
    +                      mem-onheap (.get_mem_on_heap resources)]
    +                  ;; This condition checks for required files exist before 
launching the worker
    +                  (if (required-topo-files-exist? conf storm-id)
    +                    (do
    +                      (log-message "Launching worker with assignment "
    +                        (pr-str assignment)
    +                        " for this supervisor "
    +                        (:supervisor-id supervisor)
    +                        " on port "
    +                        port
    +                        " with id "
    +                        id)
    +                      (local-mkdirs (worker-pids-root conf id))
    +                      (local-mkdirs (worker-heartbeats-root conf id))
    +                      (launch-worker supervisor
    +                        (:storm-id assignment)
    +                        port
    +                        id
    +                        mem-onheap)
    +                      [port id])
    --- End diff --
    
    Also, the resulting name of the bound var could be something like 
`id->port` or `valid-new-id->port`.


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to