[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009552#comment-15009552
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45123873
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
    @@ -1767,6 +1900,169 @@
                                       topology-id
                                       (doto (GetInfoOptions.) 
(.set_num_err_choice NumErrorsChoice/ALL))))
     
    +      ;;Blobstore implementation code
    +      (^String beginCreateBlob [this
    +                                ^String blob-key
    +                                ^SettableBlobMeta blob-meta]
    +        (let [session-id (uuid)]
    +          (.put (:blob-uploaders nimbus)
    +            session-id
    +            (->> (ReqContext/context)
    +              (.subject)
    +              (.createBlob (:blob-store nimbus) blob-key blob-meta)))
    +          (log-message "Created blob for " blob-key
    +            " with session id " session-id)
    +          (str session-id)))
    +
    +      (^String beginUpdateBlob [this ^String blob-key]
    +        (let [^AtomicOutputStream os (->> (ReqContext/context)
    +                                       (.subject)
    +                                       (.updateBlob (:blob-store nimbus)
    +                                         blob-key))]
    +          (let [session-id (uuid)]
    +            (.put (:blob-uploaders nimbus) session-id os)
    +            (log-message "Created upload session for " blob-key
    +              " with id " session-id)
    +            (str session-id))))
    +
    +      (^void createStateInZookeeper [this ^String blob-key]
    +        (let [storm-cluster-state (:storm-cluster-state nimbus)
    +              blob-store (:blob-store nimbus)
    +              nimbus-host-port-info (:nimbus-host-port-info nimbus)
    +              conf (:conf nimbus)]
    +          (if (instance? LocalFsBlobStore blob-store)
    +              (.setup-blobstore! storm-cluster-state blob-key 
nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info 
conf)))
    +          (log-debug "Created state in zookeeper" storm-cluster-state 
blob-store nimbus-host-port-info)))
    +
    +      (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
    +        (let [uploaders (:blob-uploaders nimbus)]
    +          (if-let [^AtomicOutputStream os (.get uploaders session)]
    +            (let [chunk-array (.array blob-chunk)
    +                  remaining (.remaining blob-chunk)
    +                  array-offset (.arrayOffset blob-chunk)
    +                  position (.position blob-chunk)]
    +              (.write os chunk-array (+ array-offset position) remaining)
    +              (.put uploaders session os))
    +            (throw-runtime "Blob for session "
    +              session
    +              " does not exist (or timed out)"))))
    +
    +      (^void finishBlobUpload [this ^String session]
    +        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) 
session)]
    +          (do
    +            (.close os)
    +            (log-message "Finished uploading blob for session "
    +              session
    +              ". Closing session.")
    +            (.remove (:blob-uploaders nimbus) session))
    +          (throw-runtime "Blob for session "
    +            session
    +            " does not exist (or timed out)")))
    +
    +      (^void cancelBlobUpload [this ^String session]
    +        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) 
session)]
    +          (do
    +            (.cancel os)
    +            (log-message "Canceled uploading blob for session "
    +              session
    +              ". Closing session.")
    +            (.remove (:blob-uploaders nimbus) session))
    +          (throw-runtime "Blob for session "
    +            session
    +            " does not exist (or timed out)")))
    +
    +      (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
    +        (let [^ReadableBlobMeta ret (->> (ReqContext/context)
    +                                      (.subject)
    +                                      (.getBlobMeta (:blob-store nimbus)
    +                                        blob-key))]
    +          ret))
    +
    +      (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta 
blob-meta]
    +        (->> (ReqContext/context)
    +          (.subject)
    +          (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
    +
    +      (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
    +        (let [^InputStreamWithMeta is (->> (ReqContext/context)
    +                                        (.subject)
    +                                        (.getBlob (:blob-store nimbus)
    +                                          blob-key))]
    +          (let [session-id (uuid)
    +                ret (BeginDownloadResult. (.getVersion is) (str 
session-id))]
    +            (.set_data_size ret (.getFileLength is))
    +            (.put (:blob-downloaders nimbus) session-id 
(BufferInputStream. is ^Integer (Utils/getInt (conf 
STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
    +            (log-message "Created download session for " blob-key
    +              " with id " session-id)
    +            ret)))
    +
    +      (^ByteBuffer downloadBlobChunk [this ^String session]
    +        (let [downloaders (:blob-downloaders nimbus)
    +              ^BufferInputStream is (.get downloaders session)]
    +          (when-not is
    +            (throw (RuntimeException.
    +                     "Could not find input stream for session " session)))
    +          (let [ret (.read is)]
    +            (.put downloaders session is)
    +            (when (empty? ret)
    +              (.close is)
    +              (.remove downloaders session))
    +            (log-debug "Sending " (alength ret) " bytes")
    +            (ByteBuffer/wrap ret))))
    +
    +      (^void deleteBlob [this ^String blob-key]
    +        (let [subject (->> (ReqContext/context)
    +                           (.subject))]
    +        (.deleteBlob (:blob-store nimbus) blob-key subject)
    +        (when (instance? LocalFsBlobStore blob-store)
    +          (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
    +          (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
    +        (log-message "Deleted blob for key " blob-key)))
    +
    +      (^ListBlobsResult listBlobs [this ^String session]
    +        (let [listers (:blob-listers nimbus)
    +              ^Iterator keys-it (if (clojure.string/blank? session)
    +                                  (->> (ReqContext/context)
    +                                    (.subject)
    +                                    (.listKeys (:blob-store nimbus)))
    +                                  (.get listers session))
    +              _ (or keys-it (throw-runtime "Blob list for session "
    +                              session
    +                              " does not exist (or timed out)"))
    +
    +              ;; Create a new session id if the user gave an empty session 
string.
    +              ;; This is the use case when the user wishes to list blobs
    +              ;; starting from the beginning.
    +              session (if (clojure.string/blank? session)
    +                        (let [new-session (uuid)]
    +                          (log-message "Creating new session for 
downloading list " new-session)
    +                          new-session)
    +                        session)]
    +          (if-not (.hasNext keys-it)
    +            (do
    +              (.remove listers session)
    +              (log-message "No more blobs to list for session " session)
    +              ;; A blank result communicates that there are no more blobs.
    +              (ListBlobsResult. (ArrayList. 0) (str session)))
    +            (let [^List list-chunk (->> keys-it
    +                                     (iterator-seq)
    +                                     (take 100) ;; Limit to next 100 keys
    +                                     (ArrayList.))
    +                  _ (log-message session " downloading " (.size 
list-chunk) " entries")]
    --- End diff --
    
    This can be moved outside the `let` binding, before the `.put`.


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to