[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989950#comment-14989950 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r43909516 --- Diff: storm-core/src/clj/backtype/storm/command/blobstore.clj --- @@ -0,0 +1,163 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns backtype.storm.command.blobstore + (:import [java.io InputStream OutputStream]) + (:use [backtype.storm config]) + (:import [backtype.storm.generated SettableBlobMeta AccessControl AuthorizationException + KeyNotFoundException]) + (:import [backtype.storm.blobstore BlobStoreAclHandler]) + (:use [clojure.string :only [split]]) + (:use [clojure.tools.cli :only [cli]]) + (:use [clojure.java.io :only [copy input-stream output-stream]]) + (:use [backtype.storm blobstore log util]) + (:gen-class)) + +(defn update-blob-from-stream + "Update a blob in the blob store from an InputStream" + [key ^InputStream in] + (with-configured-blob-client blobstore + (let [out (.updateBlob blobstore key)] + (try + (copy in out) + (.close out) + (catch Exception e + (log-message e) + (.cancel out) + (throw e)))))) + +(defn create-blob-from-stream + "Create a blob in the blob store from an InputStream" + [key ^InputStream in ^SettableBlobMeta meta] + (with-configured-blob-client blobstore + (let [out (.createBlob blobstore key meta)] + (try + (copy in out) + (.close out) + (catch Exception e + (.cancel out) + (throw e)))))) + +(defn read-blob + "Read a blob in the blob store and write to an OutputStream" + [key ^OutputStream out] + (with-configured-blob-client blobstore + (with-open [in (.getBlob blobstore key)] + (copy in out)))) + +(defn as-access-control + "Convert a parameter to an AccessControl object" + [param] + (BlobStoreAclHandler/parseAccessControl (str param))) + +(defn as-acl + [param] + (map as-access-control (split param #","))) + +(defn access-control-str + [^AccessControl acl] + (BlobStoreAclHandler/accessControlToString acl)) + +(defn read-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (output-stream file)] + (read-blob key f)) + (read-blob key System/out)))) + +(defn update-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (input-stream file)] + (update-blob-from-stream key f)) + (update-blob-from-stream key System/in)) + (log-message "Successfully updated " key))) + +(defn create-cli [args] + (let [[{file :file acl :acl replication-factor :replication-fctr} [key] _] (cli args ["-f" "--file" :default nil] + ["-a" "--acl" :default [] :parse-fn as-acl] + ["-r" "--replication-factor" :default -1 :parse-fn parse-int]) --- End diff -- That's fine. Thank you. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)