Blobstore API STORM- 876

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7029aee5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7029aee5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7029aee5

Branch: refs/heads/master
Commit: 7029aee576cff6d7159e9e1a991bb02aaf0d4cd8
Parents: 0acc1ce
Author: Sanket <schintap@untilservice-lm>
Authored: Mon Nov 30 14:56:15 2015 -0600
Committer: Sanket <schintap@untilservice-lm>
Committed: Mon Nov 30 14:56:15 2015 -0600

----------------------------------------------------------------------
 bin/storm.py                                    |    28 +-
 conf/defaults.yaml                              |    16 +-
 external/storm-hdfs/pom.xml                     |    70 +
 .../storm/hdfs/blobstore/HdfsBlobStore.java     |   370 +
 .../storm/hdfs/blobstore/HdfsBlobStoreFile.java |   196 +
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java |   312 +
 .../hdfs/blobstore/HdfsClientBlobStore.java     |   115 +
 .../ha/codedistributor/HDFSCodeDistributor.java |   118 -
 .../storm/hdfs/blobstore/BlobStoreTest.java     |   530 +
 .../hdfs/blobstore/HdfsBlobStoreImplTest.java   |   231 +
 pom.xml                                         |    16 +-
 storm-core/pom.xml                              |    36 +-
 storm-core/src/clj/backtype/storm/blobstore.clj |    28 +
 storm-core/src/clj/backtype/storm/cluster.clj   |    92 +-
 .../cluster_state/zookeeper_state_factory.clj   |     4 +
 .../clj/backtype/storm/command/blobstore.clj    |   162 +
 storm-core/src/clj/backtype/storm/config.clj    |    24 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   678 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   480 +-
 storm-core/src/clj/backtype/storm/testing.clj   |     5 +-
 storm-core/src/clj/backtype/storm/util.clj      |    16 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |    31 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   123 +-
 .../storm/blobstore/AtomicOutputStream.java     |    32 +
 .../storm/blobstore/BlobKeySequenceInfo.java    |    40 +
 .../jvm/backtype/storm/blobstore/BlobStore.java |   445 +
 .../storm/blobstore/BlobStoreAclHandler.java    |   399 +
 .../backtype/storm/blobstore/BlobStoreFile.java |    50 +
 .../storm/blobstore/BlobStoreUtils.java         |   257 +
 .../storm/blobstore/BlobSynchronizer.java       |   124 +
 .../storm/blobstore/ClientBlobStore.java        |    62 +
 .../storm/blobstore/FileBlobStoreImpl.java      |   248 +
 .../storm/blobstore/InputStreamWithMeta.java    |    26 +
 .../jvm/backtype/storm/blobstore/KeyFilter.java |    22 +
 .../storm/blobstore/KeySequenceNumber.java      |   229 +
 .../storm/blobstore/LocalFsBlobStore.java       |   308 +
 .../storm/blobstore/LocalFsBlobStoreFile.java   |   159 +
 .../storm/blobstore/NimbusBlobStore.java        |   412 +
 .../backtype/storm/cluster/ClusterState.java    |     9 +
 .../storm/codedistributor/ICodeDistributor.java |    73 -
 .../LocalFileSystemCodeDistributor.java         |   123 -
 .../backtype/storm/generated/AccessControl.java |   627 +
 .../storm/generated/AccessControlType.java      |    62 +
 .../backtype/storm/generated/Assignment.java    |   244 +-
 .../storm/generated/BeginDownloadResult.java    |   608 +
 .../storm/generated/ClusterWorkerHeartbeat.java |    52 +-
 .../jvm/backtype/storm/generated/HBNodes.java   |    32 +-
 .../jvm/backtype/storm/generated/HBRecords.java |    36 +-
 .../generated/KeyAlreadyExistsException.java    |   406 +
 .../storm/generated/KeyNotFoundException.java   |   406 +
 .../storm/generated/LSApprovedWorkers.java      |    44 +-
 .../generated/LSSupervisorAssignments.java      |    48 +-
 .../backtype/storm/generated/LSTopoHistory.java |    64 +-
 .../storm/generated/LSTopoHistoryList.java      |    36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |    36 +-
 .../storm/generated/ListBlobsResult.java        |   556 +
 .../storm/generated/LocalAssignment.java        |    36 +-
 .../storm/generated/LocalStateData.java         |    48 +-
 .../jvm/backtype/storm/generated/LogConfig.java |    48 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 26917 +++++++++++++----
 .../jvm/backtype/storm/generated/NodeInfo.java  |    32 +-
 .../storm/generated/ReadableBlobMeta.java       |   510 +
 .../storm/generated/SettableBlobMeta.java       |   567 +
 .../jvm/backtype/storm/generated/StormBase.java |    92 +-
 .../storm/generated/SupervisorInfo.java         |   152 +-
 .../storm/generated/TopologyHistoryInfo.java    |    32 +-
 .../backtype/storm/localizer/LocalResource.java |    44 +
 .../storm/localizer/LocalizedResource.java      |   130 +
 .../LocalizedResourceRetentionSet.java          |   140 +
 .../storm/localizer/LocalizedResourceSet.java   |   101 +
 .../jvm/backtype/storm/localizer/Localizer.java |   695 +
 .../storm/security/auth/NimbusPrincipal.java    |    29 +
 .../backtype/storm/utils/BufferInputStream.java |    53 +
 .../jvm/backtype/storm/utils/ShellUtils.java    |     7 +
 .../src/jvm/backtype/storm/utils/Utils.java     |   545 +-
 .../storm/validation/ConfigValidation.java      |    16 +-
 .../validation/ConfigValidationAnnotations.java |    11 +-
 storm-core/src/py/storm/Nimbus-remote           |    98 +
 storm-core/src/py/storm/Nimbus.py               |  5991 +++-
 storm-core/src/py/storm/ttypes.py               |   996 +-
 storm-core/src/storm.thrift                     |    59 +
 .../test/clj/backtype/storm/cluster_test.clj    |    20 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    43 +-
 .../storm/security/auth/ReqContext_test.clj     |     1 +
 .../test/clj/backtype/storm/supervisor_test.clj |    18 +-
 .../backtype/storm/blobstore/BlobStoreTest.java |   461 +
 .../storm/blobstore/BlobSynchronizerTest.java   |   137 +
 .../storm/blobstore/ClientBlobStoreTest.java    |   179 +
 .../LocalizedResourceRetentionSetTest.java      |    85 +
 .../localizer/LocalizedResourceSetTest.java     |    74 +
 .../backtype/storm/localizer/LocalizerTest.java |   671 +
 .../jvm/backtype/storm/localizer/localtest.zip  |   Bin 0 -> 6378 bytes
 .../storm/localizer/localtestwithsymlink.jar    |   Bin 0 -> 6591 bytes
 .../storm/localizer/localtestwithsymlink.tar    |   Bin 0 -> 24576 bytes
 .../storm/localizer/localtestwithsymlink.tar.gz |   Bin 0 -> 6106 bytes
 .../storm/localizer/localtestwithsymlink.tgz    |   Bin 0 -> 6106 bytes
 96 files changed, 39479 insertions(+), 9515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 44d54d5..f943778 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -263,6 +263,32 @@ def upload_credentials(*args):
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
 
+def blobstore(*args):
+    """Syntax: [storm blobstore cmd]
+
+    list [KEY...] - lists blobs currently in the blob store
+    cat [-f FILE] KEY - read a blob and then either write it to a file, or 
STDOUT (requires read access).
+    create [-f FILE] [-a ACL ...] [--replication-factor NUMBER] KEY - create a 
new blob. Contents comes from a FILE
+         or STDIN. ACL is in the form [uo]:[username]:[r-][w-][a-] can be 
comma separated list.
+    update [-f FILE] KEY - update the contents of a blob.  Contents comes from
+         a FILE or STDIN (requires write access).
+    delete KEY - delete an entry from the blob store (requires write access).
+    set-acl [-s ACL] KEY - ACL is in the form [uo]:[username]:[r-][w-][a-] can 
be comma
+         separated list (requires admin access).
+    replication --read KEY - Used to read the replication factor of the blob.
+    replication --update --replication-factor NUMBER KEY where NUMBER > 0. It 
is used to update the
+        replication factor of a blob.
+    For example, the following would create a mytopo:data.tgz key using the 
data
+    stored in data.tgz.  User alice would have full access, bob would have
+    read/write access and everyone else would have read access.
+    storm blobstore create mytopo:data.tgz -f data.tgz -a 
u:alice:rwa,u:bob:rw,o::r
+    """
+    exec_storm_class(
+        "backtype.storm.command.blobstore",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
 def heartbeats(*args):
     """Syntax: [storm heartbeats [cmd]]
 
@@ -658,7 +684,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, 
"nimbus": nimbus, "ui": ui
             "remoteconfvalue": print_remoteconfvalue, "repl": repl, 
"classpath": print_classpath,
             "activate": activate, "deactivate": deactivate, "rebalance": 
rebalance, "help": print_usage,
             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": 
version, "monitor": monitor,
-            "upload-credentials": upload_credentials, "pacemaker": pacemaker, 
"heartbeats": heartbeats,
+            "upload-credentials": upload_credentials, "pacemaker": pacemaker, 
"heartbeats": heartbeats, "blobstore": blobstore,
             "get-errors": get_errors, "set_log_level": set_log_level, 
"kill_workers": kill_workers,
             "node-health-check": healthcheck}
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 295ac7c..aeda11c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -66,7 +66,7 @@ nimbus.supervisor.timeout.secs: 60
 nimbus.monitor.freq.secs: 10
 nimbus.cleanup.inbox.freq.secs: 600
 nimbus.inbox.jar.expiration.secs: 3600
-nimbus.code.sync.freq.secs: 300
+nimbus.code.sync.freq.secs: 120
 nimbus.task.launch.secs: 120
 nimbus.file.copy.expiration.secs: 600
 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
@@ -117,6 +117,20 @@ transactional.zookeeper.root: "/transactional"
 transactional.zookeeper.servers: null
 transactional.zookeeper.port: null
 
+## blobstore configs
+supervisor.blobstore.class: "backtype.storm.blobstore.NimbusBlobStore"
+supervisor.blobstore.download.thread.count: 5
+supervisor.blobstore.download.max_retries: 3
+supervisor.localizer.cache.target.size.mb: 10240
+supervisor.localizer.cleanup.interval.ms: 600000
+
+nimbus.blobstore.class: "backtype.storm.blobstore.LocalFsBlobStore"
+nimbus.blobstore.expiration.secs: 600
+
+storm.blobstore.inputstream.buffer.size.bytes: 65536
+client.blobstore.class: "backtype.storm.blobstore.NimbusBlobStore"
+storm.blobstore.replication.factor: 3
+
 ### supervisor.* configs are for node supervisors
 # Define the amount of workers that can be run on this machine. Each worker is 
assigned a port to use for communication
 supervisor.slots.ports:

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 1765be9..7fad1a3 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -93,6 +93,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minicluster</artifactId>
             <version>${hadoop.version}</version>
             <exclusions>
@@ -103,6 +114,65 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>2.4.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mockito</groupId>
+                    <artifactId>mockito-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-minikdc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.directory.server</groupId>
+                    <artifactId>apacheds-kerberos-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
new file mode 100644
index 0000000..144ad71
--- /dev/null
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.Map;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for the 
blobstore,
+ * it is not a service/daemon.
+ */
+public class HdfsBlobStore extends BlobStore {
+    public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStore.class);
+    private static final String DATA_PREFIX = "data_";
+    private static final String META_PREFIX = "meta_";
+    private BlobStoreAclHandler _aclHandler;
+    private HdfsBlobStoreImpl _hbs;
+    private Subject _localSubject;
+    private Map conf;
+
+    /**
+     * Get the subject from Hadoop so we can use it to validate the acls. 
There is no direct
+     * interface from UserGroupInformation to get the subject, so do a doAs 
and get the context.
+     * We could probably run everything in the doAs but for now just grab the 
subject.
+     */
+    private Subject getHadoopUser() {
+        Subject subj;
+        try {
+            subj = UserGroupInformation.getCurrentUser().doAs(
+                    new PrivilegedAction<Subject>() {
+                        @Override
+                        public Subject run() {
+                            return 
Subject.getSubject(AccessController.getContext());
+                        }
+                    });
+        } catch (IOException e) {
+            throw new RuntimeException("Error creating subject and logging 
user in!", e);
+        }
+        return subj;
+    }
+
+    /**
+     * If who is null then we want to use the user hadoop says we are.
+     * Required for the supervisor to call these routines as its not
+     * logged in as anyone.
+     */
+    private Subject checkAndGetSubject(Subject who) {
+        if (who == null) {
+            return _localSubject;
+        }
+        return who;
+    }
+
+    @Override
+    public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
+        this.conf = conf;
+        prepareInternal(conf, overrideBase, null);
+    }
+
+    /**
+     * Allow a Hadoop Configuration to be passed for testing. If it's null 
then the hadoop configs
+     * must be in your classpath.
+     */
+    protected void prepareInternal(Map conf, String overrideBase, 
Configuration hadoopConf) {
+        this.conf = conf;
+        if (overrideBase == null) {
+            overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+        }
+        if (overrideBase == null) {
+            throw new RuntimeException("You must specify a blobstore directory 
for HDFS to use!");
+        }
+        LOG.debug("directory is: {}", overrideBase);
+        try {
+            // if a HDFS keytab/principal have been supplied login, otherwise 
assume they are
+            // logged in already or running insecure HDFS.
+            String principal = (String) 
conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+            String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
+
+            if (principal != null && keyTab != null) {
+                UserGroupInformation.loginUserFromKeytab(principal, keyTab);
+            } else {
+                if (principal == null && keyTab != null) {
+                    throw new RuntimeException("You must specify an HDFS 
principal to go with the keytab!");
+
+                } else {
+                    if (principal != null && keyTab == null) {
+                        throw new RuntimeException("You must specify HDFS 
keytab go with the principal!");
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error logging in from keytab!", e);
+        }
+        Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
+        try {
+            if (hadoopConf != null) {
+                _hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
+            } else {
+                _hbs = new HdfsBlobStoreImpl(baseDir, conf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        _localSubject = getHadoopUser();
+        _aclHandler = new BlobStoreAclHandler(conf);
+    }
+
+    @Override
+    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        if (meta.get_replication_factor() <= 0) {
+            
meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+        }
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        _aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | 
ADMIN);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        _aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, 
key);
+        if (_hbs.exists(DATA_PREFIX+key)) {
+            throw new KeyAlreadyExistsException(key);
+        }
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            BlobStoreFile metaFile = _hbs.write(META_PREFIX + key, true);
+            metaFile.setMetadata(meta);
+            mOut = new BlobStoreFileOutputStream(metaFile);
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+            BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, true);
+            dataFile.setMetadata(meta);
+            return new BlobStoreFileOutputStream(dataFile);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        validateKey(key);
+        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, false);
+            dataFile.setMetadata(meta);
+            return new BlobStoreFileOutputStream(dataFile);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SettableBlobMeta getStoredBlobMeta(String key) throws 
KeyNotFoundException {
+        InputStream in = null;
+        try {
+            BlobStoreFile pf = _hbs.read(META_PREFIX + key);
+            try {
+                in = pf.getInputStream();
+            } catch (FileNotFoundException fnf) {
+                throw new KeyNotFoundException(key);
+            }
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            byte[] buffer = new byte[2048];
+            int len;
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            in.close();
+            in = null;
+            return Utils.thriftDeserialize(SettableBlobMeta.class, 
out.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(meta);
+        try {
+            BlobStoreFile pf = _hbs.read(DATA_PREFIX + key);
+            rbm.set_version(pf.getModTime());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return rbm;
+    }
+
+    @Override
+    public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        if (meta.get_replication_factor() <= 0) {
+            
meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+        }
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        _aclHandler.normalizeSettableBlobMeta(key,  meta, who, ADMIN);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        SettableBlobMeta orig = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
+        BlobStoreFileOutputStream mOut = null;
+        writeMetadata(key, meta);
+    }
+
+    @Override
+    public void deleteBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            _hbs.deleteKey(DATA_PREFIX + key);
+            _hbs.deleteKey(META_PREFIX + key);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+        try {
+            return new BlobStoreFileInputStream(_hbs.read(DATA_PREFIX + key));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        try {
+            return new KeyTranslationIterator(_hbs.listKeys(), DATA_PREFIX);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        //Empty
+    }
+
+    @Override
+    public int getBlobReplication(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, 
who, key);
+        try {
+            return _hbs.getBlobReplication(DATA_PREFIX + key);
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication, Subject who) 
throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        meta.set_replication_factor(replication);
+        _aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
+        try {
+            writeMetadata(key, meta);
+            return _hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    public void writeMetadata(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            BlobStoreFile hdfsFile = _hbs.write(META_PREFIX + key, false);
+            hdfsFile.setMetadata(meta);
+            mOut = new BlobStoreFileOutputStream(hdfsFile);
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        _hbs.fullCleanup(age);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
new file mode 100644
index 0000000..93b56c1
--- /dev/null
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.generated.SettableBlobMeta;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Matcher;
+
+public class HdfsBlobStoreFile extends BlobStoreFile {
+    public static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreFile.class);
+
+    private final String _key;
+    private final boolean _isTmp;
+    private final Path _path;
+    private Long _modTime = null;
+    private final boolean _mustBeNew;
+    private final Configuration _hadoopConf;
+    private final FileSystem _fs;
+    private SettableBlobMeta meta;
+
+    // files are world-wide readable and owner writable
+    final public static FsPermission BLOBSTORE_FILE_PERMISSION =
+            FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+    public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
+        if (BLOBSTORE_DATA_FILE.equals(name)) {
+            _isTmp = false;
+        } else {
+            Matcher m = TMP_NAME_PATTERN.matcher(name);
+            if (!m.matches()) {
+                throw new IllegalArgumentException("File name does not match 
'"+name+"' !~ "+TMP_NAME_PATTERN);
+            }
+            _isTmp = true;
+        }
+        _hadoopConf = hconf;
+        _key = base.getName();
+        _path = new Path(base, name);
+        _mustBeNew = false;
+        try {
+            _fs = _path.getFileSystem(_hadoopConf);
+        } catch (IOException e) {
+            throw new RuntimeException("Error getting filesystem for path: " + 
_path, e);
+        }
+    }
+
+    public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, 
Configuration hconf) {
+        _key = base.getName();
+        _hadoopConf = hconf;
+        _isTmp = isTmp;
+        _mustBeNew = mustBeNew;
+        if (_isTmp) {
+            _path = new Path(base, System.currentTimeMillis()+TMP_EXT);
+        } else {
+            _path = new Path(base, BLOBSTORE_DATA_FILE);
+        }
+        try {
+            _fs = _path.getFileSystem(_hadoopConf);
+        } catch (IOException e) {
+            throw new RuntimeException("Error getting filesystem for path: " + 
_path, e);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        _fs.delete(_path, true);
+    }
+
+    @Override
+    public boolean isTmp() {
+        return _isTmp;
+    }
+
+    @Override
+    public String getKey() {
+        return _key;
+    }
+
+    @Override
+    public long getModTime() throws IOException {
+        if (_modTime == null) {
+            FileSystem fs = _path.getFileSystem(_hadoopConf);
+            _modTime = fs.getFileStatus(_path).getModificationTime();
+        }
+        return _modTime;
+    }
+
+    private void checkIsNotTmp() {
+        if (!isTmp()) {
+            throw new IllegalStateException("Can only operate on a temporary 
blobstore file.");
+        }
+    }
+
+    private void checkIsTmp() {
+        if (isTmp()) {
+            throw new IllegalStateException("Cannot operate on a temporary 
blobstore file.");
+        }
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        checkIsTmp();
+        return _fs.open(_path);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        checkIsNotTmp();
+        OutputStream out = null;
+        FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
+        try {
+            out = _fs.create(_path, 
(short)this.getMetadata().get_replication_factor());
+            _fs.setPermission(_path, fileperms);
+            _fs.setReplication(_path, 
(short)this.getMetadata().get_replication_factor());
+        } catch (IOException e) {
+            //Try to create the parent directory, may not work
+            FsPermission dirperms = new 
FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
+            if (!_fs.mkdirs(_path.getParent(), dirperms)) {
+                LOG.warn("error creating parent dir: " + _path.getParent());
+            }
+            out = _fs.create(_path, 
(short)this.getMetadata().get_replication_factor());
+            _fs.setPermission(_path, dirperms);
+            _fs.setReplication(_path, 
(short)this.getMetadata().get_replication_factor());
+        }
+        if (out == null) {
+            throw new IOException("Error in creating: " + _path);
+        }
+        return out;
+    }
+
+    @Override
+    public void commit() throws IOException {
+        checkIsNotTmp();
+        // FileContext supports atomic rename, whereas FileSystem doesn't
+        FileContext fc = FileContext.getFileContext(_hadoopConf);
+        Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
+        if (_mustBeNew) {
+            fc.rename(_path, dest);
+        } else {
+            fc.rename(_path, dest, Options.Rename.OVERWRITE);
+        }
+        // Note, we could add support for setting the replication factor
+    }
+
+    @Override
+    public void cancel() throws IOException {
+        checkIsNotTmp();
+        delete();
+    }
+
+    @Override
+    public String toString() {
+        return _path+":"+(_isTmp ? "tmp": 
BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
+    }
+
+    @Override
+    public long getFileLength() throws IOException {
+        return _fs.getFileStatus(_path).getLen();
+    }
+
+    @Override
+    public SettableBlobMeta getMetadata() {
+        return meta;
+    }
+
+    @Override
+    public void setMetadata(SettableBlobMeta meta) {
+        this.meta = meta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
new file mode 100644
index 0000000..e434752
--- /dev/null
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.utils.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * HDFS blob store impl.
+ */
+public class HdfsBlobStoreImpl {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
+
+    private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+    private static final int BUCKETS = 1024;
+    private static final Timer timer = new Timer("HdfsBlobStore cleanup 
thread", true);
+    private static final String BLOBSTORE_DATA = "data";
+
+    public class KeyInHashDirIterator implements Iterator<String> {
+        private int currentBucket = 0;
+        private Iterator<String> it = null;
+        private String next = null;
+
+        public KeyInHashDirIterator() throws IOException {
+            primeNext();
+        }
+
+        private void primeNext() throws IOException {
+            while (it == null && currentBucket < BUCKETS) {
+                String name = String.valueOf(currentBucket);
+                Path dir = new Path(_fullPath, name);
+                try {
+                    it = listKeys(dir);
+                } catch (FileNotFoundException e) {
+                    it = null;
+                }
+                if (it == null || !it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                } else {
+                    next = it.next();
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            next = null;
+            if (it != null) {
+                if (!it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                    try {
+                        primeNext();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    next = it.next();
+                }
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+
+
+    private Path _fullPath;
+    private FileSystem _fs;
+    private TimerTask _cleanup = null;
+    private Configuration _hadoopConf;
+
+    // blobstore directory is private!
+    final public static FsPermission BLOBSTORE_DIR_PERMISSION =
+            FsPermission.createImmutable((short) 0700); // rwx--------
+
+    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws 
IOException {
+        this(path, conf, new Configuration());
+    }
+
+    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+                             Configuration hconf) throws IOException {
+        LOG.info("Blob store based in {}", path);
+        _fullPath = path;
+        _hadoopConf = hconf;
+        _fs = path.getFileSystem(_hadoopConf);
+
+        if (!_fs.exists(_fullPath)) {
+            FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION);
+            boolean success = _fs.mkdirs(_fullPath, perms);
+            if (!success) {
+                throw new IOException("Error creating blobstore directory: " + 
_fullPath);
+            }
+        }
+
+        Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+        if (Utils.getBoolean(shouldCleanup, false)) {
+            LOG.debug("Starting hdfs blobstore cleaner");
+            _cleanup = new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        fullCleanup(FULL_CLEANUP_FREQ);
+                    } catch (IOException e) {
+                        LOG.error("Error trying to cleanup", e);
+                    }
+                }
+            };
+            timer.scheduleAtFixedRate(_cleanup, 0, FULL_CLEANUP_FREQ);
+        }
+    }
+
+    /**
+     * @return all keys that are available for reading.
+     * @throws IOException on any error.
+     */
+    public Iterator<String> listKeys() throws IOException {
+        return new KeyInHashDirIterator();
+    }
+
+    /**
+     * Get an input stream for reading a part.
+     *
+     * @param key the key of the part to read.
+     * @return the where to read the data from.
+     * @throws IOException on any error
+     */
+    public BlobStoreFile read(String key) throws IOException {
+        return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, 
_hadoopConf);
+    }
+
+    /**
+     * Get an object tied to writing the data.
+     *
+     * @param key the key of the part to write to.
+     * @param create whether the file needs to be new or not.
+     * @return an object that can be used to both write to, but also 
commit/cancel the operation.
+     * @throws IOException on any error
+     */
+    public BlobStoreFile write(String key, boolean create) throws IOException {
+        return new HdfsBlobStoreFile(getKeyDir(key), true, create, 
_hadoopConf);
+    }
+
+    /**
+     * Check if the key exists in the blob store.
+     *
+     * @param key the key to check for
+     * @return true if it exists else false.
+     */
+    public boolean exists(String key) {
+        Path dir = getKeyDir(key);
+        boolean res = false;
+        try {
+            _fs = dir.getFileSystem(_hadoopConf);
+            res = _fs.exists(dir);
+        } catch (IOException e) {
+            LOG.warn("Exception checking for exists on: " + key);
+        }
+        return res;
+    }
+
+    /**
+     * Delete a key from the blob store
+     *
+     * @param key the key to delete
+     * @throws IOException on any error
+     */
+    public void deleteKey(String key) throws IOException {
+        Path keyDir = getKeyDir(key);
+        HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA,
+                _hadoopConf);
+        pf.delete();
+        delete(keyDir);
+    }
+
+    protected Path getKeyDir(String key) {
+        String hash = String.valueOf(Math.abs((long) key.hashCode()) % 
BUCKETS);
+        Path hashDir = new Path(_fullPath, hash);
+
+        Path ret = new Path(hashDir, key);
+        LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, 
hash});
+        return ret;
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        long cleanUpIfBefore = System.currentTimeMillis() - age;
+        Iterator<String> keys = new KeyInHashDirIterator();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            Path keyDir = getKeyDir(key);
+            Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir);
+            if (!i.hasNext()) {
+                //The dir is empty, so try to delete it, may fail, but that is 
OK
+                try {
+                    _fs.delete(keyDir, true);
+                } catch (Exception e) {
+                    LOG.warn("Could not delete " + keyDir + " will try again 
later");
+                }
+            }
+            while (i.hasNext()) {
+                BlobStoreFile f = i.next();
+                if (f.isTmp()) {
+                    if (f.getModTime() <= cleanUpIfBefore) {
+                        f.delete();
+                    }
+                }
+            }
+        }
+    }
+
+    protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws 
IOException {
+        ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>();
+        FileStatus[] files = _fs.listStatus(new Path[]{path});
+        if (files != null) {
+            for (FileStatus sub : files) {
+                try {
+                    ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), 
sub.getPath().getName(),
+                            _hadoopConf));
+                } catch (IllegalArgumentException e) {
+                    //Ignored the file did not match
+                    LOG.warn("Found an unexpected file in {} {}", path, 
sub.getPath().getName());
+                }
+            }
+        }
+        return ret.iterator();
+    }
+
+    protected Iterator<String> listKeys(Path path) throws IOException {
+        ArrayList<String> ret = new ArrayList<String>();
+        FileStatus[] files = _fs.listStatus(new Path[]{path});
+        if (files != null) {
+            for (FileStatus sub : files) {
+                try {
+                    ret.add(sub.getPath().getName().toString());
+                } catch (IllegalArgumentException e) {
+                    //Ignored the file did not match
+                    LOG.debug("Found an unexpected file in {} {}", path, 
sub.getPath().getName());
+                }
+            }
+        }
+        return ret.iterator();
+    }
+
+    protected int getBlobReplication(String key) throws IOException {
+        Path path = getKeyDir(key);
+        Path dest = new Path(path, BLOBSTORE_DATA);
+        return _fs.getFileStatus(dest).getReplication();
+    }
+
+    protected int updateBlobReplication(String key, int replication) throws 
IOException {
+        Path path = getKeyDir(key);
+        Path dest = new Path(path, BLOBSTORE_DATA);
+        _fs.setReplication(dest, (short) replication);
+        return _fs.getFileStatus(dest).getReplication();
+    }
+
+    protected void delete(Path path) throws IOException {
+        _fs.delete(path, true);
+    }
+
+    public void shutdown() {
+        if (_cleanup != null) {
+            _cleanup.cancel();
+            _cleanup = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
new file mode 100644
index 0000000..ec17dae
--- /dev/null
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ *  Client to access the HDFS blobStore. At this point, this is meant to only 
be used by the
+ *  supervisor.  Don't trust who the client says they are so pass null for all 
Subjects.
+ */
+public class HdfsClientBlobStore extends ClientBlobStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsClientBlobStore.class);
+    private HdfsBlobStore _blobStore;
+    private Map _conf;
+
+    @Override
+    public void prepare(Map conf) {
+        this._conf = conf;
+        _blobStore = new HdfsBlobStore();
+        _blobStore.prepare(conf, null, null);
+    }
+
+    @Override
+    public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta 
meta)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        return _blobStore.createBlob(key, meta, null);
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.updateBlob(key, null);
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.getBlobMeta(key, null);
+    }
+
+    @Override
+    public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        _blobStore.setBlobMeta(key, meta, null);
+    }
+
+    @Override
+    public void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException {
+        _blobStore.deleteBlob(key, null);
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.getBlob(key, null);
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        return _blobStore.listKeys();
+    }
+
+    @Override
+    public int getBlobReplication(String key) throws AuthorizationException, 
KeyNotFoundException {
+        return _blobStore.getBlobReplication(key, null);
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication) throws 
AuthorizationException, KeyNotFoundException {
+        return _blobStore.updateBlobReplication(key, replication, null);
+    }
+
+    @Override
+    public boolean setClient(Map conf, NimbusClient client) {
+        return true;
+    }
+
+    @Override
+    public void createStateInZookeeper(String key) {
+        // Do nothing
+    }
+
+    @Override
+    public void shutdown() {
+        // do nothing
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
deleted file mode 100644
index 1e38051..0000000
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.ha.codedistributor;
-
-import backtype.storm.codedistributor.ICodeDistributor;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.List;
-import java.util.Map;
-
-public class HDFSCodeDistributor implements ICodeDistributor {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HDFSCodeDistributor.class);
-
-    private final static String HDFS_STORM_DIR = "hdfs.storm.dir";
-
-    private FileSystem fs;
-    private Path stormDir;
-
-    @Override
-    public void prepare(Map conf) throws Exception {
-        Validate.notNull(conf.get(HDFS_STORM_DIR), "you must specify " + 
HDFS_STORM_DIR);
-
-        Configuration configuration = new Configuration();
-        HdfsSecurityUtil.login(conf, configuration);
-        this.fs = FileSystem.get(configuration);
-        this.stormDir = new Path(String.valueOf(conf.get(HDFS_STORM_DIR)));
-        if(!this.fs.exists(stormDir)) {
-            this.fs.mkdirs(this.stormDir);
-        }
-    }
-
-    @Override
-    public File upload(String dirPath, String topologyId) throws Exception {
-        File localStormDir = new File(dirPath);
-        LOG.info("Copying the storm code from directory: {} to {}{}{}", 
localStormDir.getAbsolutePath(),
-                stormDir.toString(), Path.SEPARATOR , topologyId);
-
-        File[] files = localStormDir.listFiles();
-
-        Path hdfsDestPath = new Path(stormDir, new Path(topologyId));
-        fs.mkdirs(hdfsDestPath);
-
-        for(File file : files) {
-            fs.copyFromLocalFile(new Path(file.getAbsolutePath()), 
hdfsDestPath);
-        }
-
-        File file = new File(dirPath, "storm-code-distributor.meta");
-
-        RemoteIterator<LocatedFileStatus> hdfsFileIterator = 
fs.listFiles(hdfsDestPath, false);
-
-        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
-        while(hdfsFileIterator.hasNext()) {
-            writer.write(hdfsFileIterator.next().getPath().toString());
-            writer.newLine();
-        }
-        writer.close();
-
-        return file;
-    }
-
-    @Override
-    public List<File> download(String topologyId, File metaFile) throws 
Exception {
-        File destDir = metaFile.getParentFile();
-
-        List<String> hdfsPaths = IOUtils.readLines(new 
FileInputStream(metaFile));
-        for(String hdfsFilePath : hdfsPaths) {
-            fs.copyToLocalFile(new Path(hdfsFilePath), new 
Path(destDir.getAbsolutePath()));
-        }
-
-        return Lists.newArrayList(destDir.listFiles());
-    }
-
-    @Override
-    public short getReplicationCount(String topologyId) throws IOException {
-        Path hdfsDestPath = new Path(stormDir, new Path(topologyId));
-        if(fs.exists(hdfsDestPath)) {
-            FileStatus fileStatus = fs.getFileStatus(hdfsDestPath);
-            return fileStatus.getReplication();
-        } else {
-            LOG.warn("getReplicationCount called for {} but no such directory 
exists, returning 0", topologyId);
-            return 0;
-        }
-    }
-
-    @Override
-    public void cleanup(String topologyId) throws IOException {
-        Path hdfsDestPath = new Path(stormDir, new Path(topologyId));
-        fs.delete(hdfsDestPath, true);
-    }
-
-    @Override
-    public void close(Map conf) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
new file mode 100644
index 0000000..a8d6172
--- /dev/null
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AccessControlType;
+
+import backtype.storm.security.auth.NimbusPrincipal;
+import backtype.storm.security.auth.SingleUserPrincipal;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+public class BlobStoreTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
+  protected static MiniDFSCluster dfscluster = null;
+  protected static Configuration hadoopConf = null;
+  URI base;
+  File baseFile;
+  private static Map conf = new HashMap();
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+
+  @Before
+  public void init() {
+    initializeConfigs();
+    baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
+    base = baseFile.toURI();
+  }
+
+  @After
+  public void cleanup()
+          throws IOException {
+    FileUtils.deleteDirectory(baseFile);
+  }
+
+  @AfterClass
+  public static void cleanupAfterClass() throws IOException {
+    if (dfscluster != null) {
+      dfscluster.shutdown();
+    }
+  }
+
+  // Method which initializes nimbus admin
+  public static void initializeConfigs() {
+    conf.put(Config.NIMBUS_ADMINS,"admin");
+    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
+  }
+
+  //Gets Nimbus Subject with NimbusPrincipal set on it
+  public static Subject getNimbusSubject() {
+    Subject nimbus = new Subject();
+    nimbus.getPrincipals().add(new NimbusPrincipal());
+    return nimbus;
+  }
+
+  // Overloading the assertStoreHasExactly method accomodate Subject in order 
to check for authorization
+  public static void assertStoreHasExactly(BlobStore store, Subject who, 
String ... keys)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    Set<String> expected = new HashSet<String>(Arrays.asList(keys));
+    Set<String> found = new HashSet<String>();
+    Iterator<String> c = store.listKeys();
+    while (c.hasNext()) {
+      String keyName = c.next();
+      found.add(keyName);
+    }
+    Set<String> extra = new HashSet<String>(found);
+    extra.removeAll(expected);
+    assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
+    Set<String> missing = new HashSet<String>(expected);
+    missing.removeAll(found);
+    assertTrue("Found keys missing from the blob store "+missing, 
missing.isEmpty());
+  }
+
+  public static void assertStoreHasExactly(BlobStore store, String ... keys)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    assertStoreHasExactly(store, null, keys);
+  }
+
+  // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
+  public static int readInt(BlobStore store, Subject who, String key) throws 
IOException, KeyNotFoundException, AuthorizationException {
+    InputStream in = store.getBlob(key, who);
+    try {
+      return in.read();
+    } finally {
+      in.close();
+    }
+  }
+
+  public static int readInt(BlobStore store, String key)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    return readInt(store, null, key);
+  }
+
+  public static void readAssertEquals(BlobStore store, String key, int value)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    assertEquals(value, readInt(store, key));
+  }
+
+  // Checks for assertion when we turn on security
+  public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    assertEquals(value, readInt(store, who, key));
+  }
+
+  private HdfsBlobStore initHdfs(String dirName)
+          throws Exception {
+    if (hadoopConf == null) {
+      hadoopConf = new Configuration();
+    }
+    try {
+      if (dfscluster == null) {
+        dfscluster = new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
+        dfscluster.waitActive();
+      }
+    } catch (IOException e) {
+      LOG.error("error creating MiniDFSCluster");
+    }
+    Map conf = new HashMap();
+    conf.put(Config.BLOBSTORE_DIR, dirName);
+    
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal");
+    conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
+    HdfsBlobStore store = new HdfsBlobStore();
+    store.prepareInternal(conf, null, dfscluster.getConfiguration(0));
+    return store;
+  }
+
+  @Test
+  public void testHdfsReplication()
+          throws Exception {
+    BlobStore store = initHdfs("/storm/blobstoreReplication");
+    testReplication("/storm/blobstoreReplication/test", store);
+  }
+
+  @Test
+  public void testBasicHdfs()
+          throws Exception {
+    testBasic(initHdfs("/storm/blobstore1"));
+  }
+
+  @Test
+  public void testMultipleHdfs()
+          throws Exception {
+    // use different blobstore dir so it doesn't conflict with other test
+    testMultiple(initHdfs("/storm/blobstore2"));
+  }
+
+  @Test
+  public void testHdfsWithAuth()
+          throws Exception {
+    // use different blobstore dir so it doesn't conflict with other tests
+    testWithAuthentication(initHdfs("/storm/blobstore3"));
+  }
+
+  // Test for replication.
+  public void testReplication(String path, BlobStore store)
+          throws Exception {
+    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    metadata.set_replication_factor(4);
+    AtomicOutputStream out = store.createBlob("test", metadata, null);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", null), 4);
+    store.deleteBlob("test", null);
+
+    //Test for replication with NIMBUS as user
+    Subject admin = getSubject("admin");
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    metadata.set_replication_factor(4);
+    out = store.createBlob("test", metadata, admin);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 4);
+    store.updateBlobReplication("test", 5, admin);
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 5);
+    store.deleteBlob("test", admin);
+
+    //Test for replication using SUPERVISOR access
+    Subject supervisor = getSubject("supervisor");
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    metadata.set_replication_factor(4);
+    out = store.createBlob("test", metadata, supervisor);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 4);
+    store.updateBlobReplication("test", 5, supervisor);
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 5);
+    store.deleteBlob("test", supervisor);
+
+    //Test for a user having read or write or admin access to read replication 
for a blob
+    String createSubject = "createSubject";
+    String writeSubject = "writeSubject";
+    String adminSubject = "adminSubject";
+    Subject who = getSubject(createSubject);
+    AccessControl writeAccess = new AccessControl(AccessControlType.USER, 
READ);
+    AccessControl adminAccess = new AccessControl(AccessControlType.USER, 
ADMIN);
+    writeAccess.set_name(writeSubject);
+    adminAccess.set_name(adminSubject);
+    List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
+    metadata = new SettableBlobMeta(acl);
+    metadata.set_replication_factor(4);
+    out = store.createBlob("test", metadata, who);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    who = getSubject(writeSubject);
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 4);
+
+    //Test for a user having WRITE or ADMIN privileges to change replication 
of a blob
+    who = getSubject(adminSubject);
+    store.updateBlobReplication("test", 5, who);
+    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 5);
+    store.deleteBlob("test", getSubject(createSubject));
+  }
+
+  public Subject getSubject(String name) {
+    Subject subject = new Subject();
+    SingleUserPrincipal user = new SingleUserPrincipal(name);
+    subject.getPrincipals().add(user);
+    return subject;
+  }
+
+  // Check for Blobstore with authentication
+  public void testWithAuthentication(BlobStore store)
+          throws Exception {
+    //Test for Nimbus Admin
+    Subject admin = getSubject("admin");
+    assertStoreHasExactly(store);
+    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    AtomicOutputStream out = store.createBlob("test", metadata, admin);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", admin);
+
+    //Test for Supervisor Admin
+    Subject supervisor = getSubject("supervisor");
+    assertStoreHasExactly(store);
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, supervisor);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", supervisor);
+
+    //Test for Nimbus itself as a user
+    Subject nimbus = getNimbusSubject();
+    assertStoreHasExactly(store);
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, nimbus);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", nimbus);
+
+    // Test with a dummy test_subject for cases where subject !=null (security 
turned on)
+    Subject who = getSubject("test_subject");
+    assertStoreHasExactly(store);
+
+    // Tests for case when subject != null (security turned on) and
+    // acls for the blob are set to WORLD_EVERYTHING
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    out = store.createBlob("test", metadata, who);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test", 1);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", who);
+    assertStoreHasExactly(store);
+
+    // Tests for case when subject != null (security turned on) and
+    // acls are not set for the blob (DEFAULT)
+    LOG.info("Creating test again");
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should 
not contain WORLD_EVERYTHING because
+    // the subject is neither null nor empty. The ACL should however contain 
USER_EVERYTHING as user needs to have
+    // complete access to the blob
+    assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test", 2);
+
+    LOG.info("Updating test");
+    out = store.updateBlob("test", who);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEqualsWithAuth(store, who, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", who);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+    assertStoreHasExactly(store, "test");
+    readAssertEqualsWithAuth(store, who, "test", 3);
+
+    //Test for subject with no principals and acls set to WORLD_EVERYTHING
+    who = new Subject();
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    LOG.info("Creating test");
+    out = store.createBlob("test-empty-subject-WE", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+    //Test for subject with no principals and acls set to DEFAULT
+    who = new Subject();
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    LOG.info("Creating other");
+    out = store.createBlob("test-empty-subject-DEF", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+    if (store instanceof HdfsBlobStore) {
+      ((HdfsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }
+    try {
+      out.close();
+    } catch (IOException e) {
+      //This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+
+  public void testBasic(BlobStore store)
+          throws Exception {
+    assertStoreHasExactly(store);
+    LOG.info("Creating test");
+    // Tests for case when subject == null (security turned off) and
+    // acls for the blob are set to WORLD_EVERYTHING
+    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
+            .WORLD_EVERYTHING);
+    AtomicOutputStream out = store.createBlob("test", metadata, null);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEquals(store, "test", 1);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", null);
+    assertStoreHasExactly(store);
+
+    // The following tests are run for both hdfs and local store to test the
+    // update blob interface
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    LOG.info("Creating test again");
+    out = store.createBlob("test", metadata, null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 2);
+    LOG.info("Updating test");
+    out = store.updateBlob("test", null);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", null);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+
+    if (store instanceof HdfsBlobStore) {
+      ((HdfsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }
+    try {
+      out.close();
+    } catch (IOException e) {
+      //This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+
+
+  public void testMultiple(BlobStore store)
+          throws Exception {
+    assertStoreHasExactly(store);
+    LOG.info("Creating test");
+    AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
+            .WORLD_EVERYTHING), null);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 1);
+
+    LOG.info("Creating other");
+    out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 1);
+    readAssertEquals(store, "other", 2);
+
+    LOG.info("Updating other");
+    out = store.updateBlob("other", null);
+    out.write(5);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 1);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", null);
+    assertStoreHasExactly(store, "other");
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Creating test again");
+    out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 2);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Updating test");
+    out = store.updateBlob("test", null);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 3);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Deleting other");
+    store.deleteBlob("other", null);
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", null);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+
+    if (store instanceof HdfsBlobStore) {
+      ((HdfsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+    try {
+      out.close();
+    } catch (IOException e) {
+      //This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
new file mode 100644
index 0000000..cf2f7c1
--- /dev/null
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import backtype.storm.blobstore.BlobStoreFile;
+import backtype.storm.generated.SettableBlobMeta;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class HdfsBlobStoreImplTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
+
+    protected static Configuration hadoopConf;
+    protected static MiniDFSCluster dfscluster;
+    // key dir needs to be number 0 to number of buckets, choose one so we 
know where to look
+    private static String KEYDIR = "0";
+    private Path blobDir = new Path("/storm/blobstore1");
+    private Path fullKeyDir = new Path(blobDir, KEYDIR);
+    private String BLOBSTORE_DATA = "data";
+
+    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl {
+
+        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) 
throws IOException {
+            super(path, conf);
+        }
+
+        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+                                     Configuration hconf) throws IOException {
+            super(path, conf, hconf);
+        }
+
+        protected Path getKeyDir(String key) {
+            return new Path(new Path(blobDir, KEYDIR), key);
+        }
+    }
+
+    @BeforeClass
+    public static void init() {
+        if (hadoopConf == null) {
+            hadoopConf = new Configuration();
+        }
+        try {
+            if (dfscluster == null) {
+                dfscluster = new MiniDFSCluster.Builder(hadoopConf).build();
+                dfscluster.waitActive();
+            }
+        } catch (IOException e) {
+            LOG.error("error creating MiniDFSCluster");
+        }
+    }
+
+    @AfterClass
+    public static void cleanup() throws IOException {
+        if (dfscluster != null) {
+            dfscluster.shutdown();
+        }
+    }
+
+    // Be careful about adding additional tests as the dfscluster will be 
shared
+
+    @Test
+    public void testMultiple() throws Exception {
+        String testString = "testingblob";
+        String validKey = "validkeyBasic";
+
+        FileSystem fs = dfscluster.getFileSystem();
+        Map conf = new HashMap();
+
+        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
+        // should have created blobDir
+        assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
+        assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, 
fs.getFileStatus(blobDir).getPermission());
+
+        // test exist with non-existent key
+        assertFalse("file exists but shouldn't", hbs.exists("bogus"));
+
+        // test write
+        BlobStoreFile pfile = hbs.write(validKey, false);
+        // Adding metadata to avoid null pointer exception
+        SettableBlobMeta meta = new SettableBlobMeta();
+        meta.set_replication_factor(1);
+        pfile.setMetadata(meta);
+        OutputStream ios = pfile.getOutputStream();
+        ios.write(testString.getBytes(Charset.forName("UTF-8")));
+        ios.close();
+
+        // test commit creates properly
+        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+        pfile.commit();
+        Path dataFile = new Path(new Path(fullKeyDir, validKey), 
BLOBSTORE_DATA);
+        assertTrue("blob data not committed", fs.exists(dataFile));
+        assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
+        assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+        // test read
+        BlobStoreFile readpFile = hbs.read(validKey);
+        String readString = IOUtils.toString(readpFile.getInputStream(), 
"UTF-8");
+        assertEquals("string read from blob doesn't match", testString, 
readString);
+
+        // test listkeys
+        Iterator<String> keys = hbs.listKeys();
+        assertTrue("blob has one key", keys.hasNext());
+        assertEquals("one key in blobstore", validKey, keys.next());
+
+        // delete
+        hbs.deleteKey(validKey);
+        assertFalse("key not deleted", fs.exists(dataFile));
+        assertFalse("key not deleted", hbs.exists(validKey));
+
+        // Now do multiple
+        String testString2 = "testingblob2";
+        String validKey2= "validkey2";
+
+        // test write
+        pfile = hbs.write(validKey, false);
+        pfile.setMetadata(meta);
+        ios = pfile.getOutputStream();
+        ios.write(testString.getBytes(Charset.forName("UTF-8")));
+        ios.close();
+
+        // test commit creates properly
+        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+        pfile.commit();
+        assertTrue("blob data not committed", fs.exists(dataFile));
+        assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
+        assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+        // test write again
+        pfile = hbs.write(validKey2, false);
+        pfile.setMetadata(meta);
+        OutputStream ios2 = pfile.getOutputStream();
+        ios2.write(testString2.getBytes(Charset.forName("UTF-8")));
+        ios2.close();
+
+        // test commit second creates properly
+        pfile.commit();
+        Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), 
BLOBSTORE_DATA);
+        assertTrue("blob data not committed", fs.exists(dataFile2));
+        assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile2).getPermission());
+        assertTrue("key doesn't exist but should", hbs.exists(validKey2));
+
+        // test listkeys
+        keys = hbs.listKeys();
+        int total = 0;
+        boolean key1Found = false;
+        boolean key2Found = false;
+        while(keys.hasNext()) {
+            total++;
+            String key = keys.next();
+            if (key.equals(validKey)) {
+                key1Found = true;
+            } else if (key.equals(validKey2)) {
+                key2Found = true;
+            } else {
+                fail("Found key that wasn't expected: " + key);
+            }
+        }
+        assertEquals("number of keys is wrong", 2, total);
+        assertTrue("blobstore missing key1", key1Found);
+        assertTrue("blobstore missing key2", key2Found);
+
+        // test read
+        readpFile = hbs.read(validKey);
+        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
+        assertEquals("string read from blob doesn't match", testString, 
readString);
+
+        // test read
+        readpFile = hbs.read(validKey2);
+        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
+        assertEquals("string read from blob doesn't match", testString2, 
readString);
+
+        hbs.deleteKey(validKey);
+        assertFalse("key not deleted", hbs.exists(validKey));
+        hbs.deleteKey(validKey2);
+        assertFalse("key not deleted", hbs.exists(validKey2));
+    }
+
+    @Test
+    public void testGetFileLength() throws IOException {
+        FileSystem fs = dfscluster.getFileSystem();
+        Map conf = new HashMap();
+        String validKey = "validkeyBasic";
+        String testString = "testingblob";
+        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
+        BlobStoreFile pfile = hbs.write(validKey, false);
+        // Adding metadata to avoid null pointer exception
+        SettableBlobMeta meta = new SettableBlobMeta();
+        meta.set_replication_factor(1);
+        pfile.setMetadata(meta);
+        OutputStream ios = pfile.getOutputStream();
+        ios.write(testString.getBytes(Charset.forName("UTF-8")));
+        ios.close();
+        assertEquals(testString.getBytes(Charset.forName("UTF-8")).length, 
pfile.getFileLength());
+    }
+}
\ No newline at end of file

Reply via email to