http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
new file mode 100644
index 0000000..5e39743
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.security.auth.NimbusPrincipal;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import backtype.storm.utils.ZookeeperAuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BlobStoreUtils {
+    private static final String BLOBSTORE_SUBTREE="/blobstore";
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+    public static CuratorFramework createZKClient(Map conf) {
+        List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
+        ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
+        CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, 
(String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+        zkClient.start();
+        return zkClient;
+    }
+
+    public static Subject getNimbusSubject() {
+        Subject subject = new Subject();
+        subject.getPrincipals().add(new NimbusPrincipal());
+        return subject;
+    }
+
+    // Normalize state
+    public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(String nimbusSeqNumberInfo) {
+        BlobKeySequenceInfo keySequenceInfo = new BlobKeySequenceInfo();
+        int lastIndex = nimbusSeqNumberInfo.lastIndexOf("-");
+        keySequenceInfo.setNimbusHostPort(nimbusSeqNumberInfo.substring(0, 
lastIndex));
+        
keySequenceInfo.setSequenceNumber(nimbusSeqNumberInfo.substring(lastIndex + 1));
+        return keySequenceInfo;
+    }
+
+    // Check for latest sequence number of a key inside zookeeper and return 
nimbodes containing the latest sequence number
+    public static Set<NimbusInfo> 
getNimbodesWithLatestSequenceNumberOfBlob(CuratorFramework zkClient, String 
key) throws Exception {
+        List<String> stateInfoList = 
zkClient.getChildren().forPath("/blobstore/" + key);
+        Set<NimbusInfo> nimbusInfoSet = new HashSet<NimbusInfo>();
+        int latestSeqNumber = getLatestSequenceNumber(stateInfoList);
+        LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} 
version {}", stateInfoList, latestSeqNumber);
+        // Get the nimbodes with the latest version
+        for(String state : stateInfoList) {
+            BlobKeySequenceInfo sequenceInfo = 
normalizeNimbusHostPortSequenceNumberInfo(state);
+            if (latestSeqNumber == 
Integer.parseInt(sequenceInfo.getSequenceNumber())) {
+                
nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort()));
+            }
+        }
+        LOG.debug("nimbusInfoList {}", nimbusInfoSet);
+        return nimbusInfoSet;
+    }
+
+    // Get sequence number details from latest sequence number of the blob
+    public static int getLatestSequenceNumber(List<String> stateInfoList) {
+        int seqNumber = 0;
+        // Get latest sequence number of the blob present in the zookeeper --> 
possible to refactor this piece of code
+        for (String state : stateInfoList) {
+            BlobKeySequenceInfo sequenceInfo = 
normalizeNimbusHostPortSequenceNumberInfo(state);
+            int currentSeqNumber = 
Integer.parseInt(sequenceInfo.getSequenceNumber());
+            if (seqNumber < currentSeqNumber) {
+                seqNumber = currentSeqNumber;
+                LOG.debug("Sequence Info {}", seqNumber);
+            }
+        }
+        LOG.debug("Latest Sequence Number {}", seqNumber);
+        return seqNumber;
+    }
+
+    // Download missing blobs from potential nimbodes
+    public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, 
String key, Set<NimbusInfo> nimbusInfos)
+            throws TTransportException {
+        NimbusClient client;
+        ReadableBlobMeta rbm;
+        ClientBlobStore remoteBlobStore;
+        InputStreamWithMeta in;
+        boolean isSuccess = false;
+        LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
+        for (NimbusInfo nimbusInfo : nimbusInfos) {
+            if(isSuccess) {
+                break;
+            }
+            try {
+                client = new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null);
+                rbm = client.getClient().getBlobMeta(key);
+                remoteBlobStore = new NimbusBlobStore();
+                remoteBlobStore.setClient(conf, client);
+                in = remoteBlobStore.getBlob(key);
+                blobStore.createBlob(key, in, rbm.get_settable(), 
getNimbusSubject());
+                // if key already exists while creating the blob else update it
+                Iterator<String> keyIterator = blobStore.listKeys();
+                while (keyIterator.hasNext()) {
+                    if (keyIterator.next().equals(key)) {
+                        LOG.debug("Success creating key, {}", key);
+                        isSuccess = true;
+                        break;
+                    }
+                }
+            } catch (IOException | AuthorizationException exception) {
+                throw new RuntimeException(exception);
+            } catch (KeyAlreadyExistsException kae) {
+                LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae);
+            } catch (KeyNotFoundException knf) {
+                // Catching and logging KeyNotFoundException because, if
+                // there is a subsequent update and delete, the non-leader
+                // nimbodes might throw an exception.
+                LOG.info("KeyNotFoundException Key: {} {}", key, knf);
+            } catch (Exception exp) {
+                // Logging an exception while client is connecting
+                LOG.error("Exception {}", exp);
+            }
+        }
+
+        if (!isSuccess) {
+            LOG.error("Could not download blob with key" + key);
+        }
+        return isSuccess;
+    }
+
+    // Download updated blobs from potential nimbodes
+    public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, 
String key, Set<NimbusInfo> nimbusInfos)
+            throws TTransportException {
+        NimbusClient client;
+        ClientBlobStore remoteBlobStore;
+        InputStreamWithMeta in;
+        AtomicOutputStream out;
+        boolean isSuccess = false;
+        LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
+        for (NimbusInfo nimbusInfo : nimbusInfos) {
+            if (isSuccess) {
+                break;
+            }
+            try {
+                client = new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null);
+                remoteBlobStore = new NimbusBlobStore();
+                remoteBlobStore.setClient(conf, client);
+                in = remoteBlobStore.getBlob(key);
+                out = blobStore.updateBlob(key, getNimbusSubject());
+                byte[] buffer = new byte[2048];
+                int len = 0;
+                while ((len = in.read(buffer)) > 0) {
+                    out.write(buffer, 0, len);
+                }
+                if (out != null) {
+                    out.close();
+                }
+                isSuccess = true;
+            } catch (IOException | AuthorizationException exception) {
+                throw new RuntimeException(exception);
+            } catch (KeyNotFoundException knf) {
+                // Catching and logging KeyNotFoundException because, if
+                // there is a subsequent update and delete, the non-leader
+                // nimbodes might throw an exception.
+                LOG.info("KeyNotFoundException {}", knf);
+            } catch (Exception exp) {
+                // Logging an exception while client is connecting
+                LOG.error("Exception {}", exp);
+            }
+        }
+
+        if (!isSuccess) {
+            LOG.error("Could not update the blob with key" + key);
+        }
+        return isSuccess;
+    }
+
+    // Get the list of keys from blobstore
+    public static List<String> getKeyListFromBlobStore(BlobStore blobStore) 
throws Exception {
+        Iterator<String> keys = blobStore.listKeys();
+        List<String> keyList = new ArrayList<String>();
+        if (keys != null) {
+            while (keys.hasNext()) {
+                keyList.add(keys.next());
+            }
+        }
+        LOG.debug("KeyList from blobstore {}", keyList);
+        return keyList;
+    }
+
+    public static void createStateInZookeeper(Map conf, String key, NimbusInfo 
nimbusInfo) throws TTransportException {
+        ClientBlobStore cb = new NimbusBlobStore();
+        cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), 
nimbusInfo.getPort(), null));
+        cb.createStateInZookeeper(key);
+    }
+
+    public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, 
CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
+        try {
+            // Most of clojure tests currently try to access the blobs using 
getBlob. Since, updateKeyForBlobStore
+            // checks for updating the correct version of the blob as a part 
of nimbus ha before performing any
+            // operation on it, there is a neccessity to stub several test 
cases to ignore this method. It is a valid
+            // trade off to return if nimbusDetails which include the details 
of the current nimbus host port data are
+            // not initialized as a part of the test. Moreover, this applies 
to only local blobstore when used along with
+            // nimbus ha.
+            if (nimbusDetails == null) {
+                return;
+            }
+            boolean isListContainsCurrentNimbusInfo = false;
+            List<String> stateInfo;
+            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {
+                return;
+            }
+            stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" 
+ key);
+            LOG.debug("StateInfo for update {}", stateInfo);
+            Set<NimbusInfo> nimbusInfoList = 
getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
+
+            for (NimbusInfo nimbusInfo:nimbusInfoList) {
+                if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
+                    isListContainsCurrentNimbusInfo = true;
+                    break;
+                }
+            }
+
+            if (!isListContainsCurrentNimbusInfo && downloadUpdatedBlob(conf, 
blobStore, key, nimbusInfoList)) {
+                LOG.debug("Updating state inside zookeeper for an update");
+                createStateInZookeeper(conf, key, nimbusDetails);
+            }
+        } catch (Exception exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
new file mode 100644
index 0000000..abd7c86
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the state 
stored inside the zookeeper
+ * for a non leader nimbus trying to be in sync with the operations performed 
on the leader nimbus.
+ */
+public class BlobSynchronizer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobSynchronizer.class);
+    private CuratorFramework zkClient;
+    private Map conf;
+    private BlobStore blobStore;
+    private Set<String> blobStoreKeySet = new HashSet<String>();
+    private Set<String> zookeeperKeySet = new HashSet<String>();
+    private NimbusInfo nimbusInfo;
+
+    public BlobSynchronizer(BlobStore blobStore, Map conf) {
+        this.blobStore = blobStore;
+        this.conf = conf;
+    }
+
+    public void setNimbusInfo(NimbusInfo nimbusInfo) {
+        this.nimbusInfo = nimbusInfo;
+    }
+
+    public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
+        this.zookeeperKeySet = zookeeperKeySet;
+    }
+
+    public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
+        this.blobStoreKeySet = blobStoreKeySet;
+    }
+
+    public Set<String> getBlobStoreKeySet() {
+        Set<String> keySet = new HashSet<String>();
+        keySet.addAll(blobStoreKeySet);
+        return keySet;
+    }
+
+    public Set<String> getZookeeperKeySet() {
+        Set<String> keySet = new HashSet<String>();
+        keySet.addAll(zookeeperKeySet);
+        return keySet;
+    }
+
+    public synchronized void syncBlobs() {
+        try {
+            LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys 
{}",getBlobStoreKeySet(), getZookeeperKeySet());
+            zkClient = BlobStoreUtils.createZKClient(conf);
+            deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
+            updateKeySetForBlobStore(getBlobStoreKeySet());
+            Set<String> keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
+            LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> 
{}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
+
+            for (String key : keySetToDownload) {
+                Set<NimbusInfo> nimbusInfoSet = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
+                if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, 
nimbusInfoSet)) {
+                    BlobStoreUtils.createStateInZookeeper(conf, key, 
nimbusInfo);
+                }
+            }
+            if (zkClient !=null) {
+                zkClient.close();
+            }
+        } catch(InterruptedException exp) {
+            LOG.error("InterruptedException {}", exp);
+        } catch(Exception exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> 
keySetBlobStore, Set<String> keySetZookeeper) throws Exception {
+        if (keySetBlobStore.removeAll(keySetZookeeper)
+                || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
+            LOG.debug("Key set to delete in blobstore {}", keySetBlobStore);
+            for (String key : keySetBlobStore) {
+                blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
+            }
+        }
+    }
+
+    // Update current key list inside the blobstore if the version changes
+    public void updateKeySetForBlobStore(Set<String> keySetBlobStore) {
+        try {
+            for (String key : keySetBlobStore) {
+                LOG.debug("updating blob");
+                BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, 
zkClient, key, nimbusInfo);
+            }
+        } catch (Exception exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    // Make a key list to download
+    public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, 
Set<String> zookeeperKeySet) {
+        zookeeperKeySet.removeAll(blobStoreKeySet);
+        LOG.debug("Key list to download {}", zookeeperKeySet);
+        return zookeeperKeySet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
new file mode 100644
index 0000000..cc40aff
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public abstract class ClientBlobStore implements Shutdownable {
+    protected Map conf;
+
+    public abstract void prepare(Map conf);
+    protected abstract AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException;
+    public abstract AtomicOutputStream updateBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+    public abstract ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException;
+    protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyNotFoundException;
+    public abstract void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException;
+    public abstract InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException;
+    public abstract Iterator<String> listKeys();
+    public abstract int getBlobReplication(String Key) throws 
AuthorizationException, KeyNotFoundException;
+    public abstract int updateBlobReplication(String Key, int replication) 
throws AuthorizationException, KeyNotFoundException;
+    public abstract boolean setClient(Map conf, NimbusClient client);
+    public abstract void createStateInZookeeper(String key);
+
+    public final AtomicOutputStream createBlob(String key, SettableBlobMeta 
meta) throws AuthorizationException, KeyAlreadyExistsException {
+        if (meta !=null && meta.is_set_acl()) {
+            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        }
+        return createBlobToExtend(key, meta);
+    }
+
+    public final void setBlobMeta(String key, SettableBlobMeta meta) throws 
AuthorizationException, KeyNotFoundException {
+        if (meta !=null && meta.is_set_acl()) {
+            BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        }
+        setBlobMetaToExtend(key, meta);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java 
b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
new file mode 100644
index 0000000..b789335
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Very basic blob store impl with no ACL handling.
+ */
+public class FileBlobStoreImpl {
+    private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+    private static final int BUCKETS = 1024;
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileBlobStoreImpl.class);
+    private static final Timer timer = new Timer("FileBlobStore cleanup 
thread", true);
+
+    public class KeyInHashDirIterator implements Iterator<String> {
+        private int currentBucket = 0;
+        private Iterator<String> it = null;
+        private String next = null;
+
+        public KeyInHashDirIterator() throws IOException {
+            primeNext();
+        }
+
+        private void primeNext() throws IOException {
+            while (it == null && currentBucket < BUCKETS) {
+                String name = String.valueOf(currentBucket);
+                File dir = new File(fullPath, name);
+                try {
+                    it = listKeys(dir);
+                } catch (FileNotFoundException e) {
+                    it = null;
+                }
+                if (it == null || !it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                } else {
+                    next = it.next();
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            next = null;
+            if (it != null) {
+                if (!it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                    try {
+                        primeNext();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    next = it.next();
+                }
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+
+    private File fullPath;
+    private TimerTask cleanup = null;
+
+    public FileBlobStoreImpl(File path, Map<String, Object> conf) throws 
IOException {
+        LOG.info("Creating new blob store based in {}", path);
+        fullPath = path;
+        fullPath.mkdirs();
+        Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+        if (Utils.getBoolean(shouldCleanup, false)) {
+            LOG.debug("Starting File blobstore cleaner");
+            cleanup = new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        fullCleanup(FULL_CLEANUP_FREQ);
+                    } catch (IOException e) {
+                        LOG.error("Error trying to cleanup", e);
+                    }
+                }
+            };
+            timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
+        }
+    }
+
+    /**
+     * @return all keys that are available for reading.
+     * @throws IOException on any error.
+     */
+    public Iterator<String> listKeys() throws IOException {
+        return new KeyInHashDirIterator();
+    }
+
+    /**
+     * Get an input stream for reading a part.
+     * @param key the key of the part to read.
+     * @return the where to read the data from.
+     * @throws IOException on any error
+     */
+    public LocalFsBlobStoreFile read(String key) throws IOException {
+        return new LocalFsBlobStoreFile(getKeyDir(key), 
BlobStoreFile.BLOBSTORE_DATA_FILE);
+    }
+
+    /**
+     * Get an object tied to writing the data.
+     * @param key the key of the part to write to.
+     * @return an object that can be used to both write to, but also 
commit/cancel the operation.
+     * @throws IOException on any error
+     */
+    public LocalFsBlobStoreFile write(String key, boolean create) throws 
IOException {
+        return new LocalFsBlobStoreFile(getKeyDir(key), true, create);
+    }
+
+    /**
+     * Check if the key exists in the blob store.
+     * @param key the key to check for
+     * @return true if it exists else false.
+     */
+    public boolean exists(String key) {
+        return getKeyDir(key).exists();
+    }
+
+    /**
+     * Delete a key from the blob store
+     * @param key the key to delete
+     * @throws IOException on any error
+     */
+    public void deleteKey(String key) throws IOException {
+        File keyDir = getKeyDir(key);
+        LocalFsBlobStoreFile pf = new LocalFsBlobStoreFile(keyDir, 
BlobStoreFile.BLOBSTORE_DATA_FILE);
+        pf.delete();
+        delete(keyDir);
+    }
+
+    private File getKeyDir(String key) {
+        String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS);
+        File ret = new File(new File(fullPath, hash), key);
+        LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, 
hash});
+        return ret;
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        long cleanUpIfBefore = System.currentTimeMillis() - age;
+        Iterator<String> keys = new KeyInHashDirIterator();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            File keyDir = getKeyDir(key);
+            Iterator<LocalFsBlobStoreFile> i = listBlobStoreFiles(keyDir);
+            if (!i.hasNext()) {
+                //The dir is empty, so try to delete it, may fail, but that is 
OK
+                try {
+                    keyDir.delete();
+                } catch (Exception e) {
+                    LOG.warn("Could not delete "+keyDir+" will try again 
later");
+                }
+            }
+            while (i.hasNext()) {
+                LocalFsBlobStoreFile f = i.next();
+                if (f.isTmp()) {
+                    if (f.getModTime() <= cleanUpIfBefore) {
+                        f.delete();
+                    }
+                }
+            }
+        }
+    }
+
+    protected Iterator<LocalFsBlobStoreFile> listBlobStoreFiles(File path) 
throws IOException {
+        ArrayList<LocalFsBlobStoreFile> ret = new 
ArrayList<LocalFsBlobStoreFile>();
+        File[] files = path.listFiles();
+        if (files != null) {
+            for (File sub: files) {
+                try {
+                    ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), 
sub.getName()));
+                } catch (IllegalArgumentException e) {
+                    //Ignored the file did not match
+                    LOG.warn("Found an unexpected file in {} {}",path, 
sub.getName());
+                }
+            }
+        }
+        return ret.iterator();
+    }
+
+    protected Iterator<String> listKeys(File path) throws IOException {
+        String[] files = path.list();
+        if (files != null) {
+            return Arrays.asList(files).iterator();
+        }
+        return new LinkedList<String>().iterator();
+    }
+
+    protected void delete(File path) throws IOException {
+        Files.deleteIfExists(path.toPath());
+    }
+
+    public void shutdown() {
+        if (cleanup != null) {
+            cleanup.cancel();
+            cleanup = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java 
b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
new file mode 100644
index 0000000..1d29fda
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class InputStreamWithMeta extends InputStream {
+    public abstract long getVersion() throws IOException;
+    public abstract long getFileLength() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java 
b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
new file mode 100644
index 0000000..32bb9fd
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+public interface KeyFilter<R> {
+    R filter(String key);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java 
b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
new file mode 100644
index 0000000..1cddac0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the key sequence number which implies the number of 
updates made to a blob.
+ * The information regarding the keys and the sequence number which represents 
the number of updates are
+ * stored within the zookeeper in the following format.
+ * /storm/blobstore/key_name/nimbushostport-sequencenumber
+ * Example:
+ * If there are two nimbodes with nimbus.seeds:leader,non-leader are set,
+ * then the state inside the zookeeper is eventually stored as:
+ * /storm/blobstore/key1/leader:8080-1
+ * /storm/blobstore/key1/non-leader:8080-1
+ * indicates that a new blob with the name key1 has been created on the leader
+ * nimbus and the non-leader nimbus syncs after a call back is triggered by 
attempting
+ * to download the blob and finally updates its state inside the zookeeper.
+ *
+ * A watch is placed on the /storm/blobstore/key1 and the znodes leader:8080-1 
and
+ * non-leader:8080-1 are ephemeral which implies that these nodes exist only 
until the
+ * connection between the corresponding nimbus and the zookeeper persist. If 
in case the
+ * nimbus crashes the node disappears under /storm/blobstore/key1.
+ *
+ * The sequence number for the keys are handed over based on the following 
scenario:
+ * Lets assume there are three nimbodes up and running, one being the leader 
and the other
+ * being the non-leader.
+ *
+ * 1. Create is straight forward.
+ * Check whether the znode -> /storm/blobstore/key1 has been created or not. 
It implies
+ * the blob has not been created yet. If not created, it creates it and 
updates the zookeeper
+ * states under /storm/blobstore/key1 and 
/storm/blobstoremaxkeysequencenumber/key1.
+ * The znodes it creates on these nodes are 
/storm/blobstore/key1/leader:8080-1,
+ * /storm/blobstore/key1/non-leader:8080-1 and 
/storm/blobstoremaxkeysequencenumber/key1/1.
+ * The latter holds the global sequence number across all nimbodes more like a 
static variable
+ * indicating the true value of number of updates for a blob. This node helps 
to maintain sanity in case
+ * leadership changes due to crashing.
+ *
+ * 2. Delete does not require to hand over the sequence number.
+ *
+ * 3. Finally, the update has few scenarios.
+ *
+ *  The class implements a TreeSet. The basic idea is if all the nimbodes have 
the same
+ *  sequence number for the blob, then the number of elements in the set is 1 
which holds
+ *  the latest value of sequence number. If the number of elements are greater 
than 1 then it
+ *  implies that there is sequence mismatch and there is need for syncing the 
blobs across
+ *  nimbodes.
+ *
+ *  The logic for handing over sequence numbers based on the state are 
described as follows
+ *  Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2.
+ *  Scenario 1:
+ *  Example: Normal create/update scenario
+ *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
+ *  Create-Key1   alive - Leader     alive              1                      
                     1
+ *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
+ *  Update-Key1   alive - Leader     alive              2                 1    
                     2
+ *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
+ *
+ *  Scenario 2:
+ *  Example: Leader nimbus crash followed by leader election, update and 
ex-leader restored again
+ *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
+ *  Create        alive - Leader     alive              1                      
                     1
+ *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
+ *  Update        alive - Leader     alive              2                 1    
                     2
+ *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
+ *  Update        alive - Leader     alive              3                 2    
                     3
+ *  Crash         crash - Leader     alive              3                 2    
                     3
+ *  New - Leader  crash              alive - Leader     3 (Invalid)       2    
                     3
+ *  Update        crash              alive - Leader     3 (Invalid)       4 
(max-seq-num + 1)       4
+ *  N1-Restored   alive              alive - Leader     0                 4    
                     4
+ *  Sync          alive              alive - Leader     4                 4    
                     4
+ *
+ *  Scenario 3:
+ *  Example: Leader nimbus crash followed by leader election, update and 
ex-leader restored again
+ *  Operation     Nimbus-1:state     Nimbus-2:state     Seq-Num-Nimbus-1  
Seq-Num-Nimbus-2          Max-Seq-Num
+ *  Create        alive - Leader     alive              1                      
                     1
+ *  Sync          alive - Leader     alive              1                 1 
(callback -> download)  1
+ *  Update        alive - Leader     alive              2                 1    
                     2
+ *  Sync          alive - Leader     alive              2                 2 
(callback -> download)  2
+ *  Update        alive - Leader     alive              3                 2    
                     3
+ *  Crash         crash - Leader     alive              3                 2    
                     3
+ *  Elect Leader  crash              alive - Leader     3 (Invalid)       2    
                     3
+ *  N1-Restored   alive              alive - Leader     3                 2    
                     3
+ *  Read/Update   alive              alive - Leader     3                 4 
(Downloads from N1)     4
+ *  Sync          alive              alive - Leader     4 (callback)      4    
                     4
+ *  Here the download is triggered whenever an operation corresponding to the 
blob is triggered on the
+ *  nimbus like a read or update operation. Here, in the read/update call it 
is hard to know which call
+ *  is read or update. Hence, by incrementing the sequence number to 
max-seq-num + 1 we ensure that the
+ *  synchronization happens appropriately and all nimbodes have the same blob.
+ */
+public class KeySequenceNumber {
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+    private final String BLOBSTORE_SUBTREE="/blobstore";
+    private final String 
BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
+    private final String key;
+    private final NimbusInfo nimbusInfo;
+    private final int INT_CAPACITY = 4;
+    private final int INITIAL_SEQUENCE_NUMBER = 1;
+
+    public KeySequenceNumber(String key, NimbusInfo nimbusInfo) {
+        this.key = key;
+        this.nimbusInfo = nimbusInfo;
+    }
+
+    public int getKeySequenceNumber(Map conf) {
+        TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
+        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
+        try {
+            // Key has not been created yet and it is the first time it is 
being created
+            if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {
+                
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+                        
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
 + "/" + key);
+                zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE 
+ "/" + key,
+                        
ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array());
+                return INITIAL_SEQUENCE_NUMBER;
+            }
+
+            // When all nimbodes go down and one or few of them come up
+            // Unfortunately there might not be an exact way to know which one 
contains the most updated blob,
+            // if all go down which is unlikely. Hence there might be a need 
to update the blob if all go down.
+            List<String> stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+            LOG.debug("stateInfoList-size {} stateInfoList-data {}", 
stateInfoList.size(), stateInfoList);
+            if(stateInfoList.isEmpty()) {
+                return getMaxSequenceNumber(zkClient);
+            }
+
+            LOG.debug("stateInfoSize {}", stateInfoList.size());
+            // In all other cases check for the latest update sequence of the 
blob on the nimbus
+            // and assign the appropriate number. Check if all are have same 
sequence number,
+            // if not assign the highest sequence number.
+            for (String stateInfo:stateInfoList) {
+                
sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
+                        .getSequenceNumber()));
+            }
+
+            // Update scenario 2 and 3 explain the code logic written here
+            // especially when nimbus crashes and comes up after and before 
update
+            // respectively.
+            int currentSeqNumber = getMaxSequenceNumber(zkClient);
+            if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
+                if (sequenceNumbers.last() < currentSeqNumber) {
+                    return currentSeqNumber;
+                } else {
+                    return INITIAL_SEQUENCE_NUMBER - 1;
+                }
+            }
+
+            // It covers scenarios expalined in scenario 3 when nimbus-1 
holding the latest
+            // update goes down before it is downloaded by nimbus-2. Nimbus-2 
gets elected as a leader
+            // after which nimbus-1 comes back up and a read or update is 
performed.
+            if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && nimbusInfo.isLeader()) {
+                incrementMaxSequenceNumber(zkClient, currentSeqNumber);
+                return currentSeqNumber + 1;
+            }
+
+            // This code logic covers the update scenarios in 2 when the 
nimbus-1 goes down
+            // before syncing the blob to nimbus-2 and an update happens.
+            // If seq-num for nimbus-2 is 2 and max-seq-number is 3 then next 
sequence number is 4
+            // (max-seq-number + 1).
+            // Other scenario it covers is when max-seq-number and nimbus seq 
number are equal.
+            if (sequenceNumbers.size() == 1) {
+                if (sequenceNumbers.first() < currentSeqNumber) {
+                    incrementMaxSequenceNumber(zkClient, currentSeqNumber);
+                    return currentSeqNumber + 1;
+                } else {
+                    incrementMaxSequenceNumber(zkClient, currentSeqNumber);
+                    return sequenceNumbers.first() + 1;
+                }
+            }
+        } catch(Exception e) {
+            LOG.error("Exception {}", e);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+        // Normal create update sync scenario returns the greatest sequence 
number in the set
+        return sequenceNumbers.last();
+    }
+
+    private boolean checkIfStateContainsCurrentNimbusHost(List<String> 
stateInfoList, NimbusInfo nimbusInfo) {
+        boolean containsNimbusHost = false;
+        for(String stateInfo:stateInfoList) {
+            if(stateInfo.contains(nimbusInfo.getHost())) {
+                containsNimbusHost = true;
+                break;
+            }
+        }
+        return containsNimbusHost;
+    }
+
+    private void incrementMaxSequenceNumber(CuratorFramework zkClient, int 
count) throws Exception {
+        zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key,
+                ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array());
+    }
+
+    private int getMaxSequenceNumber(CuratorFramework zkClient) throws 
Exception {
+        return ByteBuffer.wrap(zkClient.getData()
+                .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + 
key)).getInt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
new file mode 100644
index 0000000..0941b9a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;;
+
+import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN;
+import static backtype.storm.blobstore.BlobStoreAclHandler.READ;
+import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE;
+
+/**
+ * Provides a local file system backed blob store implementation for Nimbus.
+ */
+public class LocalFsBlobStore extends BlobStore {
+    public static final Logger LOG = 
LoggerFactory.getLogger(LocalFsBlobStore.class);
+    private static final String DATA_PREFIX = "data_";
+    private static final String META_PREFIX = "meta_";
+    protected BlobStoreAclHandler _aclHandler;
+    private final String BLOBSTORE_SUBTREE = "/blobstore/";
+    private NimbusInfo nimbusInfo;
+    private FileBlobStoreImpl fbs;
+    private final int allPermissions = READ | WRITE | ADMIN;
+    private Map conf;
+
+    @Override
+    public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
+        this.conf = conf;
+        this.nimbusInfo = nimbusInfo;
+        if (overrideBase == null) {
+            overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+            if (overrideBase == null) {
+                overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR);
+            }
+        }
+        File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME);
+        try {
+            fbs = new FileBlobStoreImpl(baseDir, conf);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        _aclHandler = new BlobStoreAclHandler(conf);
+    }
+
+    @Override
+    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException {
+        LOG.debug("Creating Blob for key {}", key);
+        validateKey(key);
+        _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key);
+        if (fbs.exists(DATA_PREFIX+key)) {
+            throw new KeyAlreadyExistsException(key);
+        }
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
true));
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
true));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+        validateKey(key);
+        checkForBlobOrDownload(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
false));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SettableBlobMeta getStoredBlobMeta(String key) throws 
KeyNotFoundException {
+        InputStream in = null;
+        try {
+            LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key);
+            try {
+                in = pf.getInputStream();
+            } catch (FileNotFoundException fnf) {
+                throw new KeyNotFoundException(key);
+            }
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            byte [] buffer = new byte[2048];
+            int len;
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            in.close();
+            in = null;
+            return Utils.thriftDeserialize(SettableBlobMeta.class, 
out.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+        validateKey(key);
+        if(!checkForBlobOrDownload(key)) {
+            checkForBlobUpdate(key);
+        }
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(meta);
+        try {
+            LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key);
+            rbm.set_version(pf.getModTime());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return rbm;
+    }
+
+    @Override
+    public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) 
throws AuthorizationException, KeyNotFoundException {
+        validateKey(key);
+        checkForBlobOrDownload(key);
+        _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        SettableBlobMeta orig = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, 
false));
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public void deleteBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+        validateKey(key);
+        checkForBlobOrDownload(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            fbs.deleteKey(DATA_PREFIX+key);
+            fbs.deleteKey(META_PREFIX+key);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
+        validateKey(key);
+        if(!checkForBlobOrDownload(key)) {
+            checkForBlobUpdate(key);
+        }
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+        try {
+            return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        try {
+            return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public int getBlobReplication(String key, Subject who) throws Exception {
+        CuratorFramework zkClient = null;
+        int replicationCount = 0;
+        try {
+            validateKey(key);
+            SettableBlobMeta meta = getStoredBlobMeta(key);
+            _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+            zkClient = BlobStoreUtils.createZKClient(conf);
+            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == 
null) {
+                zkClient.close();
+                return 0;
+            }
+            replicationCount = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+        return replicationCount;
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication, Subject who) 
throws AuthorizationException, KeyNotFoundException {
+        throw new UnsupportedOperationException("For local file system blob 
store the update blobs function does not work. " +
+                "Please use HDFS blob store to make this feature available.");
+    }
+
+    //This additional check and download is for nimbus high availability in 
case you have more than one nimbus
+    public boolean checkForBlobOrDownload(String key) {
+        boolean checkBlobDownload = false;
+        CuratorFramework zkClient = null;
+        try {
+            List<String> keyList = 
BlobStoreUtils.getKeyListFromBlobStore(this);
+            if (!keyList.contains(key)) {
+                zkClient = BlobStoreUtils.createZKClient(conf);
+                if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != 
null) {
+                    Set<NimbusInfo> nimbusSet = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
+                    if (BlobStoreUtils.downloadMissingBlob(conf, this, key, 
nimbusSet)) {
+                        LOG.debug("Updating blobs state");
+                        BlobStoreUtils.createStateInZookeeper(conf, key, 
nimbusInfo);
+                        checkBlobDownload = true;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (zkClient != null) {
+                zkClient.close();
+            }
+        }
+        return checkBlobDownload;
+    }
+
+    public void checkForBlobUpdate(String key) {
+        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
+        BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, 
nimbusInfo);
+        zkClient.close();
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        fbs.fullCleanup(age);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java 
b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
new file mode 100644
index 0000000..fb11fa6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.generated.SettableBlobMeta;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.regex.Matcher;
+
+public class LocalFsBlobStoreFile extends BlobStoreFile {
+
+    private final String _key;
+    private final boolean _isTmp;
+    private final File _path;
+    private Long _modTime = null;
+    private final boolean _mustBeNew;
+    private SettableBlobMeta meta;
+
+    public LocalFsBlobStoreFile(File base, String name) {
+        if (BlobStoreFile.BLOBSTORE_DATA_FILE.equals(name)) {
+            _isTmp = false;
+        } else {
+            Matcher m = TMP_NAME_PATTERN.matcher(name);
+            if (!m.matches()) {
+                throw new IllegalArgumentException("File name does not match 
'"+name+"' !~ "+TMP_NAME_PATTERN);
+            }
+            _isTmp = true;
+        }
+        _key = base.getName();
+        _path = new File(base, name);
+        _mustBeNew = false;
+    }
+
+    public LocalFsBlobStoreFile(File base, boolean isTmp, boolean mustBeNew) {
+        _key = base.getName();
+        _isTmp = isTmp;
+        _mustBeNew = mustBeNew;
+        if (_isTmp) {
+            _path = new File(base, System.currentTimeMillis()+TMP_EXT);
+        } else {
+            _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        _path.delete();
+    }
+
+    @Override
+    public boolean isTmp() {
+        return _isTmp;
+    }
+
+    @Override
+    public String getKey() {
+        return _key;
+    }
+
+    @Override
+    public long getModTime() throws IOException {
+        if (_modTime == null) {
+            _modTime = _path.lastModified();
+        }
+        return _modTime;
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        if (isTmp()) {
+            throw new IllegalStateException("Cannot read from a temporary part 
file.");
+        }
+        return new FileInputStream(_path);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        if (!isTmp()) {
+            throw new IllegalStateException("Can only write to a temporary 
part file.");
+        }
+        boolean success = false;
+        try {
+            success = _path.createNewFile();
+        } catch (IOException e) {
+            //Try to create the parent directory, may not work
+            _path.getParentFile().mkdirs();
+            success = _path.createNewFile();
+        }
+        if (!success) {
+            throw new IOException(_path+" already exists");
+        }
+        return new FileOutputStream(_path);
+    }
+
+    @Override
+    public void commit() throws IOException {
+        if (!isTmp()) {
+            throw new IllegalStateException("Can only write to a temporary 
part file.");
+        }
+
+        File dest = new File(_path.getParentFile(), 
BlobStoreFile.BLOBSTORE_DATA_FILE);
+        if (_mustBeNew) {
+            Files.move(_path.toPath(), dest.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+        } else {
+            Files.move(_path.toPath(), dest.toPath(), 
StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+        }
+    }
+
+    @Override
+    public void cancel() throws IOException {
+        if (!isTmp()) {
+            throw new IllegalStateException("Can only write to a temporary 
part file.");
+        }
+        delete();
+    }
+
+    @Override
+    public SettableBlobMeta getMetadata () {
+        return meta;
+    }
+
+    @Override
+    public void setMetadata (SettableBlobMeta meta) {
+        this.meta = meta;
+    }
+
+    @Override
+    public String toString() {
+        return _path+":"+(_isTmp ? "tmp": 
BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
+    }
+
+    @Override
+    public long getFileLength() {
+        return _path.length();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
new file mode 100644
index 0000000..bf084bb
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.BeginDownloadResult;
+import backtype.storm.generated.ListBlobsResult;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class NimbusBlobStore extends ClientBlobStore {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NimbusBlobStore.class);
+
+    public class NimbusKeyIterator implements Iterator<String> {
+        private ListBlobsResult listBlobs = null;
+        private int offset = 0;
+        private boolean eof = false;
+
+        public NimbusKeyIterator(ListBlobsResult listBlobs) {
+            this.listBlobs = listBlobs;
+            this.eof = (listBlobs.get_keys_size() == 0);
+        }
+
+        private boolean isCacheEmpty() {
+            return listBlobs.get_keys_size() <= offset;
+        }
+
+        private void readMore() throws TException {
+            if (!eof) {
+                offset = 0;
+                synchronized(client) {
+                    listBlobs = 
client.getClient().listBlobs(listBlobs.get_session());
+                }
+                if (listBlobs.get_keys_size() == 0) {
+                    eof = true;
+                }
+            }
+        }
+
+        @Override
+        public synchronized boolean hasNext() {
+            try {
+                if (isCacheEmpty()) {
+                    readMore();
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+            return !eof;
+        }
+
+        @Override
+        public synchronized String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String ret = listBlobs.get_keys().get(offset);
+            offset++;
+            return ret;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+
+    public class NimbusDownloadInputStream extends InputStreamWithMeta {
+        private BeginDownloadResult beginBlobDownload;
+        private byte[] buffer = null;
+        private int offset = 0;
+        private int end = 0;
+        private boolean eof = false;
+
+        public NimbusDownloadInputStream(BeginDownloadResult 
beginBlobDownload) {
+            this.beginBlobDownload = beginBlobDownload;
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return beginBlobDownload.get_version();
+        }
+
+        @Override
+        public synchronized int read() throws IOException {
+            try {
+                if (isEmpty()) {
+                    readMore();
+                    if (eof) {
+                        return -1;
+                    }
+                }
+                int length = Math.min(1, available());
+                if (length == 0) {
+                    return -1;
+                }
+                int ret = buffer[offset];
+                offset += length;
+                return ret;
+            } catch(TException exp) {
+                throw new IOException(exp);
+            }
+        }
+
+        @Override
+        public synchronized int read(byte[] b, int off, int len) throws 
IOException {
+            try {
+                if (isEmpty()) {
+                    readMore();
+                    if (eof) {
+                        return -1;
+                    }
+                }
+                int length = Math.min(len, available());
+                System.arraycopy(buffer, offset, b, off, length);
+                offset += length;
+                return length;
+            } catch(TException exp) {
+                throw new IOException(exp);
+            }
+        }
+
+        private boolean isEmpty() {
+            return buffer == null || offset >= end;
+        }
+
+        private void readMore() throws TException {
+            if (!eof) {
+                ByteBuffer buff;
+                synchronized(client) {
+                    buff = 
client.getClient().downloadBlobChunk(beginBlobDownload.get_session());
+                }
+                buffer = buff.array();
+                offset = buff.arrayOffset() + buff.position();
+                int length = buff.remaining();
+                end = offset + length;
+                if (length == 0) {
+                    eof = true;
+                }
+            }
+        }
+
+        @Override
+        public synchronized int read(byte[] b) throws IOException {
+            return read(b, 0, b.length);
+        }
+
+        @Override
+        public synchronized int available() {
+            return buffer == null ? 0 : (end - offset);
+        }
+
+        @Override
+        public long getFileLength() {
+            return beginBlobDownload.get_data_size();
+        }
+    }
+
+    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
+        private String session;
+        private int maxChunkSize = 4096;
+        private String key;
+
+        public NimbusUploadAtomicOutputStream(String session, int bufferSize, 
String key) {
+            this.session = session;
+            this.maxChunkSize = bufferSize;
+            this.key = key;
+        }
+
+        @Override
+        public void cancel() throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().cancelBlobUpload(session);
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().uploadBlobChunk(session, 
ByteBuffer.wrap(new byte[] {(byte)b}));
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void write(byte []b) throws IOException {
+            write(b, 0, b.length);
+        }
+
+        @Override
+        public void write(byte []b, int offset, int len) throws IOException {
+            try {
+                int end = offset + len;
+                for (int realOffset = offset; realOffset < end; realOffset += 
maxChunkSize) {
+                    int realLen = Math.min(end - realOffset, maxChunkSize);
+                    LOG.debug("Writing {} bytes of {} 
remaining",realLen,(end-realOffset));
+                    synchronized(client) {
+                        client.getClient().uploadBlobChunk(session, 
ByteBuffer.wrap(b, realOffset, realLen));
+                    }
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                synchronized(client) {
+                    client.getClient().finishBlobUpload(session);
+                    client.getClient().createStateInZookeeper(key);
+                }
+            } catch (TException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private NimbusClient client;
+    private int bufferSize = 4096;
+
+    @Override
+    public void prepare(Map conf) {
+        this.client = NimbusClient.getConfiguredClient(conf);
+        if (conf != null) {
+            this.bufferSize = 
Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), 
bufferSize);
+        }
+    }
+
+    @Override
+    protected AtomicOutputStream createBlobToExtend(String key, 
SettableBlobMeta meta)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        try {
+            synchronized(client) {
+                return new 
NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), 
this.bufferSize, key);
+            }
+        } catch (AuthorizationException | KeyAlreadyExistsException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return new 
NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), 
this.bufferSize, key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return client.getClient().getBlobMeta(key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                client.getClient().setBlobMeta(key, meta);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteBlob(String key) throws AuthorizationException, 
KeyNotFoundException {
+        try {
+            synchronized(client) {
+                client.getClient().deleteBlob(key);
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createStateInZookeeper(String key) {
+        try {
+            synchronized(client) {
+                client.getClient().createStateInZookeeper(key);
+            }
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key) throws 
AuthorizationException, KeyNotFoundException {
+        try {
+            synchronized(client) {
+                return new 
NimbusDownloadInputStream(client.getClient().beginBlobDownload(key));
+            }
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        try {
+            synchronized(client) {
+                return new NimbusKeyIterator(client.getClient().listBlobs(""));
+            }
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int getBlobReplication(String key) throws AuthorizationException, 
KeyNotFoundException {
+        try {
+            return client.getClient().getBlobReplication(key);
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication) throws 
AuthorizationException, KeyNotFoundException {
+        try {
+            return client.getClient().updateBlobReplication(key, replication);
+        } catch (AuthorizationException | KeyNotFoundException exp) {
+            throw exp;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean setClient(Map conf, NimbusClient client) {
+        this.client = client;
+        if (conf != null) {
+            this.bufferSize = 
Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), 
bufferSize);
+        }
+        return true;
+    }
+
+    @Override
+    protected void finalize() {
+        shutdown();
+    }
+
+    @Override
+    public void shutdown() {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java 
b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
index 1bcc645..1960371 100644
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
+++ b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
@@ -205,4 +205,13 @@ public interface ClusterState {
      * @param path The path to synchronize.
      */
     void sync_path(String path);
+
+    /**
+     * Allows us to delete the znodes within /storm/blobstore/key_name
+     * whose znodes start with the corresponding nimbusHostPortInfo
+     * @param path /storm/blobstore/key_name
+     * @param nimbusHostPortInfo Contains the host port information of
+     * a nimbus node.
+     */
+    void delete_node_blobstore(String path, String nimbusHostPortInfo);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java 
b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
deleted file mode 100644
index c46688f..0000000
--- a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.codedistributor;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Interface responsible to distribute code in the cluster.
- */
-public interface ICodeDistributor {
-    /**
-     * Prepare this code distributor.
-     * @param conf
-     */
-    void prepare(Map conf) throws Exception;
-
-    /**
-     * This API will perform the actual upload of the code to the distribution 
implementation.
-     * The API should return a Meta file which should have enough information 
for downloader
-     * so it can download the code e.g. for bittorrent it will be a torrent 
file, in case of something like HDFS or s3
-     * it might have the actual directory where all the code is put.
-     * @param dirPath directory where all the code to be distributed exists.
-     * @param topologyId the topologyId for which the meta file needs to be 
created.
-     * @return metaFile
-     */
-    File upload(String dirPath, String topologyId) throws Exception;
-
-    /**
-     * Given the topologyId and metafile, download the actual code and return 
the downloaded file's list.
-     * @param topologyid
-     * @param metafile
-     * @return
-     */
-    List<File> download(String topologyid, File metafile) throws Exception;
-
-    /**
-     * returns number of nodes to which the code is already replicated for the 
topology.
-     * @param topologyId
-     * @return
-     */
-    short getReplicationCount(String topologyId) throws Exception;
-
-    /**
-     * Performs the cleanup.
-     * @param topologyid
-     */
-    void cleanup(String topologyid) throws IOException;
-
-    /**
-     * Close this distributor.
-     * @param conf
-     */
-    void close(Map conf);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
 
b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
deleted file mode 100644
index 76993e2..0000000
--- 
a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.codedistributor;
-
-import backtype.storm.nimbus.NimbusInfo;
-import backtype.storm.utils.ZookeeperAuthInfo;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static backtype.storm.Config.*;
-import static backtype.storm.utils.Utils.downloadFromHost;
-import static backtype.storm.utils.Utils.newCurator;
-
-
-public class LocalFileSystemCodeDistributor implements ICodeDistributor {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
-    private CuratorFramework zkClient;
-    private Map conf;
-
-    @Override
-    public void prepare(Map conf) throws Exception {
-        this.conf = conf;
-        List<String> zkServers = (List<String>) 
conf.get(STORM_ZOOKEEPER_SERVERS);
-        int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
-        ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
-        zkClient = newCurator(conf, zkServers, port, (String) 
conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
-        zkClient.start();
-    }
-
-    @Override
-    public File upload(String dirPath, String topologyId) throws Exception {
-        ArrayList<File> files = new ArrayList<File>();
-        File destDir = new File(dirPath);
-        File[] localFiles = destDir.listFiles();
-
-        List<String> filePaths = new ArrayList<String>(3);
-        for (File file : localFiles) {
-            filePaths.add(file.getAbsolutePath());
-        }
-
-        File metaFile = new File(destDir, "storm-code-distributor.meta");
-        boolean isCreated = metaFile.createNewFile();
-        if (isCreated) {
-            FileUtils.writeLines(metaFile, filePaths);
-        } else {
-            LOG.warn("metafile " + metaFile.getAbsolutePath() + " already 
exists.");
-        }
-
-        LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload 
successful.");
-
-        return metaFile;
-    }
-
-    @Override
-    public List<File> download(String topologyid, File metafile) throws 
Exception {
-        List<String> hostInfos = 
zkClient.getChildren().forPath("/code-distributor/" + topologyid);
-        File destDir = metafile.getParentFile();
-        List<File> downloadedFiles = Lists.newArrayList();
-        for (String absolutePathOnRemote : FileUtils.readLines(metafile)) {
-
-            File localFile = new File(destDir, new 
File(absolutePathOnRemote).getName());
-
-            boolean isSuccess = false;
-            for (String hostAndPort : hostInfos) {
-                NimbusInfo nimbusInfo = NimbusInfo.parse(hostAndPort);
-                try {
-                    LOG.info("Attempting to download meta file {} from remote 
{}", absolutePathOnRemote, nimbusInfo.toHostPortString());
-                    downloadFromHost(conf, absolutePathOnRemote, 
localFile.getAbsolutePath(), nimbusInfo.getHost(), nimbusInfo.getPort());
-                    downloadedFiles.add(localFile);
-                    isSuccess = true;
-                    break;
-                } catch (Exception e) {
-                    LOG.error("download failed from {}:{}, will try another 
endpoint ", nimbusInfo.getHost(), nimbusInfo.getPort(), e);
-                }
-            }
-
-            if(!isSuccess) {
-                throw new RuntimeException("File " + absolutePathOnRemote +" 
could not be downloaded from any endpoint");
-            }
-        }
-
-        return downloadedFiles;
-    }
-
-    @Override
-    public short getReplicationCount(String topologyId) throws Exception {
-        return (short) zkClient.getChildren().forPath("/code-distributor/" + 
topologyId).size();
-    }
-
-    @Override
-    public void cleanup(String topologyid) throws IOException {
-        //no op.
-    }
-
-    @Override
-    public void close(Map conf) {
-       zkClient.close();
-    }
-}

Reply via email to