Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45123873
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
    @@ -1767,6 +1900,169 @@
                                       topology-id
                                       (doto (GetInfoOptions.) 
(.set_num_err_choice NumErrorsChoice/ALL))))
     
    +      ;;Blobstore implementation code
    +      (^String beginCreateBlob [this
    +                                ^String blob-key
    +                                ^SettableBlobMeta blob-meta]
    +        (let [session-id (uuid)]
    +          (.put (:blob-uploaders nimbus)
    +            session-id
    +            (->> (ReqContext/context)
    +              (.subject)
    +              (.createBlob (:blob-store nimbus) blob-key blob-meta)))
    +          (log-message "Created blob for " blob-key
    +            " with session id " session-id)
    +          (str session-id)))
    +
    +      (^String beginUpdateBlob [this ^String blob-key]
    +        (let [^AtomicOutputStream os (->> (ReqContext/context)
    +                                       (.subject)
    +                                       (.updateBlob (:blob-store nimbus)
    +                                         blob-key))]
    +          (let [session-id (uuid)]
    +            (.put (:blob-uploaders nimbus) session-id os)
    +            (log-message "Created upload session for " blob-key
    +              " with id " session-id)
    +            (str session-id))))
    +
    +      (^void createStateInZookeeper [this ^String blob-key]
    +        (let [storm-cluster-state (:storm-cluster-state nimbus)
    +              blob-store (:blob-store nimbus)
    +              nimbus-host-port-info (:nimbus-host-port-info nimbus)
    +              conf (:conf nimbus)]
    +          (if (instance? LocalFsBlobStore blob-store)
    +              (.setup-blobstore! storm-cluster-state blob-key 
nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info 
conf)))
    +          (log-debug "Created state in zookeeper" storm-cluster-state 
blob-store nimbus-host-port-info)))
    +
    +      (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
    +        (let [uploaders (:blob-uploaders nimbus)]
    +          (if-let [^AtomicOutputStream os (.get uploaders session)]
    +            (let [chunk-array (.array blob-chunk)
    +                  remaining (.remaining blob-chunk)
    +                  array-offset (.arrayOffset blob-chunk)
    +                  position (.position blob-chunk)]
    +              (.write os chunk-array (+ array-offset position) remaining)
    +              (.put uploaders session os))
    +            (throw-runtime "Blob for session "
    +              session
    +              " does not exist (or timed out)"))))
    +
    +      (^void finishBlobUpload [this ^String session]
    +        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) 
session)]
    +          (do
    +            (.close os)
    +            (log-message "Finished uploading blob for session "
    +              session
    +              ". Closing session.")
    +            (.remove (:blob-uploaders nimbus) session))
    +          (throw-runtime "Blob for session "
    +            session
    +            " does not exist (or timed out)")))
    +
    +      (^void cancelBlobUpload [this ^String session]
    +        (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) 
session)]
    +          (do
    +            (.cancel os)
    +            (log-message "Canceled uploading blob for session "
    +              session
    +              ". Closing session.")
    +            (.remove (:blob-uploaders nimbus) session))
    +          (throw-runtime "Blob for session "
    +            session
    +            " does not exist (or timed out)")))
    +
    +      (^ReadableBlobMeta getBlobMeta [this ^String blob-key]
    +        (let [^ReadableBlobMeta ret (->> (ReqContext/context)
    +                                      (.subject)
    +                                      (.getBlobMeta (:blob-store nimbus)
    +                                        blob-key))]
    +          ret))
    +
    +      (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta 
blob-meta]
    +        (->> (ReqContext/context)
    +          (.subject)
    +          (.setBlobMeta (:blob-store nimbus) blob-key blob-meta)))
    +
    +      (^BeginDownloadResult beginBlobDownload [this ^String blob-key]
    +        (let [^InputStreamWithMeta is (->> (ReqContext/context)
    +                                        (.subject)
    +                                        (.getBlob (:blob-store nimbus)
    +                                          blob-key))]
    +          (let [session-id (uuid)
    +                ret (BeginDownloadResult. (.getVersion is) (str 
session-id))]
    +            (.set_data_size ret (.getFileLength is))
    +            (.put (:blob-downloaders nimbus) session-id 
(BufferInputStream. is ^Integer (Utils/getInt (conf 
STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536))))
    +            (log-message "Created download session for " blob-key
    +              " with id " session-id)
    +            ret)))
    +
    +      (^ByteBuffer downloadBlobChunk [this ^String session]
    +        (let [downloaders (:blob-downloaders nimbus)
    +              ^BufferInputStream is (.get downloaders session)]
    +          (when-not is
    +            (throw (RuntimeException.
    +                     "Could not find input stream for session " session)))
    +          (let [ret (.read is)]
    +            (.put downloaders session is)
    +            (when (empty? ret)
    +              (.close is)
    +              (.remove downloaders session))
    +            (log-debug "Sending " (alength ret) " bytes")
    +            (ByteBuffer/wrap ret))))
    +
    +      (^void deleteBlob [this ^String blob-key]
    +        (let [subject (->> (ReqContext/context)
    +                           (.subject))]
    +        (.deleteBlob (:blob-store nimbus) blob-key subject)
    +        (when (instance? LocalFsBlobStore blob-store)
    +          (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
    +          (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
    +        (log-message "Deleted blob for key " blob-key)))
    +
    +      (^ListBlobsResult listBlobs [this ^String session]
    +        (let [listers (:blob-listers nimbus)
    +              ^Iterator keys-it (if (clojure.string/blank? session)
    +                                  (->> (ReqContext/context)
    +                                    (.subject)
    +                                    (.listKeys (:blob-store nimbus)))
    +                                  (.get listers session))
    +              _ (or keys-it (throw-runtime "Blob list for session "
    +                              session
    +                              " does not exist (or timed out)"))
    +
    +              ;; Create a new session id if the user gave an empty session 
string.
    +              ;; This is the use case when the user wishes to list blobs
    +              ;; starting from the beginning.
    +              session (if (clojure.string/blank? session)
    +                        (let [new-session (uuid)]
    +                          (log-message "Creating new session for 
downloading list " new-session)
    +                          new-session)
    +                        session)]
    +          (if-not (.hasNext keys-it)
    +            (do
    +              (.remove listers session)
    +              (log-message "No more blobs to list for session " session)
    +              ;; A blank result communicates that there are no more blobs.
    +              (ListBlobsResult. (ArrayList. 0) (str session)))
    +            (let [^List list-chunk (->> keys-it
    +                                     (iterator-seq)
    +                                     (take 100) ;; Limit to next 100 keys
    +                                     (ArrayList.))
    +                  _ (log-message session " downloading " (.size 
list-chunk) " entries")]
    --- End diff --
    
    This can be moved outside the `let` binding, before the `.put`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to