[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009218#comment-15009218
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45100258
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
    @@ -390,53 +444,98 @@
           [(.getNodeId slot) (.getPort slot)]
           )))
     
    +(defn- get-version-for-key [key nimbus-host-port-info conf]
    +  (let [version (KeyVersion. key nimbus-host-port-info)]
    +    (.getKeyVersion version conf)))
    +
    +(defn get-key-seq-from-blob-store [blob-store]
    +  (let [key-iter (.listKeys blob-store nimbus-subject)]
    +    (iterator-seq key-iter)))
    +
     (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
    -  (let [stormroot (master-stormdist-root conf storm-id)]
    -   (log-message "nimbus file location:" stormroot)
    -   (FileUtils/forceMkdir (File. stormroot))
    -   (FileUtils/cleanDirectory (File. stormroot))
    -   (setup-jar conf tmp-jar-location stormroot)
    -   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
    -   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
    -   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
    -   ))
    +  (let [subject (get-subject)
    +        storm-cluster-state (:storm-cluster-state nimbus)
    +        blob-store (:blob-store nimbus)
    +        jar-key (master-stormjar-key storm-id)
    +        code-key (master-stormcode-key storm-id)
    +        conf-key (master-stormconf-key storm-id)
    +        nimbus-host-port-info (:nimbus-host-port-info nimbus)]
    +    (when tmp-jar-location ;;in local mode there is no jar
    +      (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
    +      (if (instance? LocalFsBlobStore blob-store)
    +        (.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf))))
    +    (.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
    +    (if (instance? LocalFsBlobStore blob-store)
    +      (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
    +    (.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
    +    (if (instance? LocalFsBlobStore blob-store)
    +      (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)))))
    +
    +(defn- read-storm-topology [storm-id blob-store]
    +  (Utils/deserialize
    +    (.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
    +
    +(defn- get-blob-replication-count [blob-key nimbus]
    +  (if (:blob-store nimbus)
    +          (-> (:blob-store nimbus)
    +            (.getBlobReplication  blob-key nimbus-subject))))
     
     (defn- wait-for-desired-code-replication [nimbus conf storm-id]
       (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
             max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
    -        total-wait-time (atom 0)
    -        current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
    -  (if (:code-distributor nimbus)
    -    (while (and (> min-replication-count @current-replication-count)
    -             (or (= -1 max-replication-wait-time)
    -               (< @total-wait-time max-replication-wait-time)))
    +        current-replication-count-jar (if (not (local-mode? conf))
    +                                        (atom (get-blob-replication-count 
(master-stormjar-key storm-id) nimbus))
    +                                        (atom min-replication-count))
    +        current-replication-count-code (atom (get-blob-replication-count 
(master-stormcode-key storm-id) nimbus))
    +        current-replication-count-conf (atom (get-blob-replication-count 
(master-stormconf-key storm-id) nimbus))
    +        total-wait-time (atom 0)]
    +    (if (:blob-store nimbus)
    +      (while (and
    +               (or (> min-replication-count @current-replication-count-jar)
    +                   (> min-replication-count 
@current-replication-count-code)
    +                   (> min-replication-count 
@current-replication-count-conf))
    +               (or (= -1 max-replication-wait-time)
    +                   (< @total-wait-time max-replication-wait-time)))
             (sleep-secs 1)
             (log-debug "waiting for desired replication to be achieved.
               min-replication-count = " min-replication-count  " 
max-replication-wait-time = " max-replication-wait-time
    -          "current-replication-count = " @current-replication-count " 
total-wait-time " @total-wait-time)
    +          (if (not (local-mode? conf))"current-replication-count for jar 
key = " @current-replication-count-jar)
    +          "current-replication-count for code key = " 
@current-replication-count-code
    +          "current-replication-count for conf key = " 
@current-replication-count-conf
    +          " total-wait-time " @total-wait-time)
             (swap! total-wait-time inc)
    -        (reset! current-replication-count  (.getReplicationCount 
(:code-distributor nimbus) storm-id))))
    -  (if (< min-replication-count @current-replication-count)
    -    (log-message "desired replication count "  min-replication-count " 
achieved,
    -      current-replication-count" @current-replication-count)
    -    (log-message "desired replication count of "  min-replication-count " 
not achieved but we have hit the max wait time "
    -      max-replication-wait-time " so moving on with replication count = " 
@current-replication-count)
    -    )))
    -
    -(defn- read-storm-topology [conf storm-id]
    -  (let [stormroot (master-stormdist-root conf storm-id)]
    -    (Utils/deserialize
    -      (FileUtils/readFileToByteArray
    -        (File. (master-stormcode-path stormroot))
    -        ) StormTopology)))
    +        (if (not (local-mode? conf))
    +          (reset! current-replication-count-conf  
(get-blob-replication-count (master-stormconf-key storm-id))))
    +        (reset! current-replication-count-code  
(get-blob-replication-count (master-stormcode-key storm-id)))
    +        (reset! current-replication-count-jar  (get-blob-replication-count 
(master-stormjar-key storm-id)))))
    --- End diff --
    
    Re-posting since my previous comment was not addressed:
    
    Here we are calling `get-blob-replication-count` three times with one 
argument, while the arity of the funciton is 2.  I think the missing argument 
here might be `nimbus`.


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to