http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java new file mode 100644 index 0000000..5e39743 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreUtils.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.security.auth.NimbusPrincipal; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import backtype.storm.utils.ZookeeperAuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class BlobStoreUtils { + private static final String BLOBSTORE_SUBTREE="/blobstore"; + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + public static CuratorFramework createZKClient(Map conf) { + List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); + CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo); + zkClient.start(); + return zkClient; + } + + public static Subject getNimbusSubject() { + Subject subject = new Subject(); + subject.getPrincipals().add(new NimbusPrincipal()); + return subject; + } + + // Normalize state + public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(String nimbusSeqNumberInfo) { + BlobKeySequenceInfo keySequenceInfo = new BlobKeySequenceInfo(); + int lastIndex = nimbusSeqNumberInfo.lastIndexOf("-"); + keySequenceInfo.setNimbusHostPort(nimbusSeqNumberInfo.substring(0, lastIndex)); + keySequenceInfo.setSequenceNumber(nimbusSeqNumberInfo.substring(lastIndex + 1)); + return keySequenceInfo; + } + + // Check for latest sequence number of a key inside zookeeper and return nimbodes containing the latest sequence number + public static Set<NimbusInfo> getNimbodesWithLatestSequenceNumberOfBlob(CuratorFramework zkClient, String key) throws Exception { + List<String> stateInfoList = zkClient.getChildren().forPath("/blobstore/" + key); + Set<NimbusInfo> nimbusInfoSet = new HashSet<NimbusInfo>(); + int latestSeqNumber = getLatestSequenceNumber(stateInfoList); + LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} version {}", stateInfoList, latestSeqNumber); + // Get the nimbodes with the latest version + for(String state : stateInfoList) { + BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state); + if (latestSeqNumber == Integer.parseInt(sequenceInfo.getSequenceNumber())) { + nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort())); + } + } + LOG.debug("nimbusInfoList {}", nimbusInfoSet); + return nimbusInfoSet; + } + + // Get sequence number details from latest sequence number of the blob + public static int getLatestSequenceNumber(List<String> stateInfoList) { + int seqNumber = 0; + // Get latest sequence number of the blob present in the zookeeper --> possible to refactor this piece of code + for (String state : stateInfoList) { + BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state); + int currentSeqNumber = Integer.parseInt(sequenceInfo.getSequenceNumber()); + if (seqNumber < currentSeqNumber) { + seqNumber = currentSeqNumber; + LOG.debug("Sequence Info {}", seqNumber); + } + } + LOG.debug("Latest Sequence Number {}", seqNumber); + return seqNumber; + } + + // Download missing blobs from potential nimbodes + public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) + throws TTransportException { + NimbusClient client; + ReadableBlobMeta rbm; + ClientBlobStore remoteBlobStore; + InputStreamWithMeta in; + boolean isSuccess = false; + LOG.debug("Download blob NimbusInfos {}", nimbusInfos); + for (NimbusInfo nimbusInfo : nimbusInfos) { + if(isSuccess) { + break; + } + try { + client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null); + rbm = client.getClient().getBlobMeta(key); + remoteBlobStore = new NimbusBlobStore(); + remoteBlobStore.setClient(conf, client); + in = remoteBlobStore.getBlob(key); + blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject()); + // if key already exists while creating the blob else update it + Iterator<String> keyIterator = blobStore.listKeys(); + while (keyIterator.hasNext()) { + if (keyIterator.next().equals(key)) { + LOG.debug("Success creating key, {}", key); + isSuccess = true; + break; + } + } + } catch (IOException | AuthorizationException exception) { + throw new RuntimeException(exception); + } catch (KeyAlreadyExistsException kae) { + LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae); + } catch (KeyNotFoundException knf) { + // Catching and logging KeyNotFoundException because, if + // there is a subsequent update and delete, the non-leader + // nimbodes might throw an exception. + LOG.info("KeyNotFoundException Key: {} {}", key, knf); + } catch (Exception exp) { + // Logging an exception while client is connecting + LOG.error("Exception {}", exp); + } + } + + if (!isSuccess) { + LOG.error("Could not download blob with key" + key); + } + return isSuccess; + } + + // Download updated blobs from potential nimbodes + public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) + throws TTransportException { + NimbusClient client; + ClientBlobStore remoteBlobStore; + InputStreamWithMeta in; + AtomicOutputStream out; + boolean isSuccess = false; + LOG.debug("Download blob NimbusInfos {}", nimbusInfos); + for (NimbusInfo nimbusInfo : nimbusInfos) { + if (isSuccess) { + break; + } + try { + client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null); + remoteBlobStore = new NimbusBlobStore(); + remoteBlobStore.setClient(conf, client); + in = remoteBlobStore.getBlob(key); + out = blobStore.updateBlob(key, getNimbusSubject()); + byte[] buffer = new byte[2048]; + int len = 0; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + if (out != null) { + out.close(); + } + isSuccess = true; + } catch (IOException | AuthorizationException exception) { + throw new RuntimeException(exception); + } catch (KeyNotFoundException knf) { + // Catching and logging KeyNotFoundException because, if + // there is a subsequent update and delete, the non-leader + // nimbodes might throw an exception. + LOG.info("KeyNotFoundException {}", knf); + } catch (Exception exp) { + // Logging an exception while client is connecting + LOG.error("Exception {}", exp); + } + } + + if (!isSuccess) { + LOG.error("Could not update the blob with key" + key); + } + return isSuccess; + } + + // Get the list of keys from blobstore + public static List<String> getKeyListFromBlobStore(BlobStore blobStore) throws Exception { + Iterator<String> keys = blobStore.listKeys(); + List<String> keyList = new ArrayList<String>(); + if (keys != null) { + while (keys.hasNext()) { + keyList.add(keys.next()); + } + } + LOG.debug("KeyList from blobstore {}", keyList); + return keyList; + } + + public static void createStateInZookeeper(Map conf, String key, NimbusInfo nimbusInfo) throws TTransportException { + ClientBlobStore cb = new NimbusBlobStore(); + cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)); + cb.createStateInZookeeper(key); + } + + public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) { + try { + // Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore + // checks for updating the correct version of the blob as a part of nimbus ha before performing any + // operation on it, there is a neccessity to stub several test cases to ignore this method. It is a valid + // trade off to return if nimbusDetails which include the details of the current nimbus host port data are + // not initialized as a part of the test. Moreover, this applies to only local blobstore when used along with + // nimbus ha. + if (nimbusDetails == null) { + return; + } + boolean isListContainsCurrentNimbusInfo = false; + List<String> stateInfo; + if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { + return; + } + stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); + LOG.debug("StateInfo for update {}", stateInfo); + Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); + + for (NimbusInfo nimbusInfo:nimbusInfoList) { + if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) { + isListContainsCurrentNimbusInfo = true; + break; + } + } + + if (!isListContainsCurrentNimbusInfo && downloadUpdatedBlob(conf, blobStore, key, nimbusInfoList)) { + LOG.debug("Updating state inside zookeeper for an update"); + createStateInZookeeper(conf, key, nimbusDetails); + } + } catch (Exception exp) { + throw new RuntimeException(exp); + } + } + +}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java new file mode 100644 index 0000000..abd7c86 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.nimbus.NimbusInfo; +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set;; + +/** + * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper + * for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus. + */ +public class BlobSynchronizer { + private static final Logger LOG = LoggerFactory.getLogger(BlobSynchronizer.class); + private CuratorFramework zkClient; + private Map conf; + private BlobStore blobStore; + private Set<String> blobStoreKeySet = new HashSet<String>(); + private Set<String> zookeeperKeySet = new HashSet<String>(); + private NimbusInfo nimbusInfo; + + public BlobSynchronizer(BlobStore blobStore, Map conf) { + this.blobStore = blobStore; + this.conf = conf; + } + + public void setNimbusInfo(NimbusInfo nimbusInfo) { + this.nimbusInfo = nimbusInfo; + } + + public void setZookeeperKeySet(Set<String> zookeeperKeySet) { + this.zookeeperKeySet = zookeeperKeySet; + } + + public void setBlobStoreKeySet(Set<String> blobStoreKeySet) { + this.blobStoreKeySet = blobStoreKeySet; + } + + public Set<String> getBlobStoreKeySet() { + Set<String> keySet = new HashSet<String>(); + keySet.addAll(blobStoreKeySet); + return keySet; + } + + public Set<String> getZookeeperKeySet() { + Set<String> keySet = new HashSet<String>(); + keySet.addAll(zookeeperKeySet); + return keySet; + } + + public synchronized void syncBlobs() { + try { + LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); + zkClient = BlobStoreUtils.createZKClient(conf); + deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); + updateKeySetForBlobStore(getBlobStoreKeySet()); + Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); + LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload); + + for (String key : keySetToDownload) { + Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); + if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) { + BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); + } + } + if (zkClient !=null) { + zkClient.close(); + } + } catch(InterruptedException exp) { + LOG.error("InterruptedException {}", exp); + } catch(Exception exp) { + throw new RuntimeException(exp); + } + } + + public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> keySetBlobStore, Set<String> keySetZookeeper) throws Exception { + if (keySetBlobStore.removeAll(keySetZookeeper) + || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) { + LOG.debug("Key set to delete in blobstore {}", keySetBlobStore); + for (String key : keySetBlobStore) { + blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject()); + } + } + } + + // Update current key list inside the blobstore if the version changes + public void updateKeySetForBlobStore(Set<String> keySetBlobStore) { + try { + for (String key : keySetBlobStore) { + LOG.debug("updating blob"); + BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, zkClient, key, nimbusInfo); + } + } catch (Exception exp) { + throw new RuntimeException(exp); + } + } + + // Make a key list to download + public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, Set<String> zookeeperKeySet) { + zookeeperKeySet.removeAll(blobStoreKeySet); + LOG.debug("Key list to download {}", zookeeperKeySet); + return zookeeperKeySet; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java new file mode 100644 index 0000000..cc40aff --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.daemon.Shutdownable; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.utils.NimbusClient; + +import java.util.Iterator; +import java.util.Map; + +public abstract class ClientBlobStore implements Shutdownable { + protected Map conf; + + public abstract void prepare(Map conf); + protected abstract AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException; + public abstract AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException; + public abstract ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException; + protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException; + public abstract void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException; + public abstract InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException; + public abstract Iterator<String> listKeys(); + public abstract int getBlobReplication(String Key) throws AuthorizationException, KeyNotFoundException; + public abstract int updateBlobReplication(String Key, int replication) throws AuthorizationException, KeyNotFoundException; + public abstract boolean setClient(Map conf, NimbusClient client); + public abstract void createStateInZookeeper(String key); + + public final AtomicOutputStream createBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException { + if (meta !=null && meta.is_set_acl()) { + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + } + return createBlobToExtend(key, meta); + } + + public final void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { + if (meta !=null && meta.is_set_acl()) { + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + } + setBlobMetaToExtend(key, meta); + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java new file mode 100644 index 0000000..b789335 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Timer; +import java.util.TimerTask; + +/** + * Very basic blob store impl with no ACL handling. + */ +public class FileBlobStoreImpl { + private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l; + private static final int BUCKETS = 1024; + private static final Logger LOG = LoggerFactory.getLogger(FileBlobStoreImpl.class); + private static final Timer timer = new Timer("FileBlobStore cleanup thread", true); + + public class KeyInHashDirIterator implements Iterator<String> { + private int currentBucket = 0; + private Iterator<String> it = null; + private String next = null; + + public KeyInHashDirIterator() throws IOException { + primeNext(); + } + + private void primeNext() throws IOException { + while (it == null && currentBucket < BUCKETS) { + String name = String.valueOf(currentBucket); + File dir = new File(fullPath, name); + try { + it = listKeys(dir); + } catch (FileNotFoundException e) { + it = null; + } + if (it == null || !it.hasNext()) { + it = null; + currentBucket++; + } else { + next = it.next(); + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String current = next; + next = null; + if (it != null) { + if (!it.hasNext()) { + it = null; + currentBucket++; + try { + primeNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + next = it.next(); + } + } + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Delete Not Supported"); + } + } + + private File fullPath; + private TimerTask cleanup = null; + + public FileBlobStoreImpl(File path, Map<String, Object> conf) throws IOException { + LOG.info("Creating new blob store based in {}", path); + fullPath = path; + fullPath.mkdirs(); + Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE); + if (Utils.getBoolean(shouldCleanup, false)) { + LOG.debug("Starting File blobstore cleaner"); + cleanup = new TimerTask() { + @Override + public void run() { + try { + fullCleanup(FULL_CLEANUP_FREQ); + } catch (IOException e) { + LOG.error("Error trying to cleanup", e); + } + } + }; + timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ); + } + } + + /** + * @return all keys that are available for reading. + * @throws IOException on any error. + */ + public Iterator<String> listKeys() throws IOException { + return new KeyInHashDirIterator(); + } + + /** + * Get an input stream for reading a part. + * @param key the key of the part to read. + * @return the where to read the data from. + * @throws IOException on any error + */ + public LocalFsBlobStoreFile read(String key) throws IOException { + return new LocalFsBlobStoreFile(getKeyDir(key), BlobStoreFile.BLOBSTORE_DATA_FILE); + } + + /** + * Get an object tied to writing the data. + * @param key the key of the part to write to. + * @return an object that can be used to both write to, but also commit/cancel the operation. + * @throws IOException on any error + */ + public LocalFsBlobStoreFile write(String key, boolean create) throws IOException { + return new LocalFsBlobStoreFile(getKeyDir(key), true, create); + } + + /** + * Check if the key exists in the blob store. + * @param key the key to check for + * @return true if it exists else false. + */ + public boolean exists(String key) { + return getKeyDir(key).exists(); + } + + /** + * Delete a key from the blob store + * @param key the key to delete + * @throws IOException on any error + */ + public void deleteKey(String key) throws IOException { + File keyDir = getKeyDir(key); + LocalFsBlobStoreFile pf = new LocalFsBlobStoreFile(keyDir, BlobStoreFile.BLOBSTORE_DATA_FILE); + pf.delete(); + delete(keyDir); + } + + private File getKeyDir(String key) { + String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS); + File ret = new File(new File(fullPath, hash), key); + LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, hash}); + return ret; + } + + public void fullCleanup(long age) throws IOException { + long cleanUpIfBefore = System.currentTimeMillis() - age; + Iterator<String> keys = new KeyInHashDirIterator(); + while (keys.hasNext()) { + String key = keys.next(); + File keyDir = getKeyDir(key); + Iterator<LocalFsBlobStoreFile> i = listBlobStoreFiles(keyDir); + if (!i.hasNext()) { + //The dir is empty, so try to delete it, may fail, but that is OK + try { + keyDir.delete(); + } catch (Exception e) { + LOG.warn("Could not delete "+keyDir+" will try again later"); + } + } + while (i.hasNext()) { + LocalFsBlobStoreFile f = i.next(); + if (f.isTmp()) { + if (f.getModTime() <= cleanUpIfBefore) { + f.delete(); + } + } + } + } + } + + protected Iterator<LocalFsBlobStoreFile> listBlobStoreFiles(File path) throws IOException { + ArrayList<LocalFsBlobStoreFile> ret = new ArrayList<LocalFsBlobStoreFile>(); + File[] files = path.listFiles(); + if (files != null) { + for (File sub: files) { + try { + ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), sub.getName())); + } catch (IllegalArgumentException e) { + //Ignored the file did not match + LOG.warn("Found an unexpected file in {} {}",path, sub.getName()); + } + } + } + return ret.iterator(); + } + + protected Iterator<String> listKeys(File path) throws IOException { + String[] files = path.list(); + if (files != null) { + return Arrays.asList(files).iterator(); + } + return new LinkedList<String>().iterator(); + } + + protected void delete(File path) throws IOException { + Files.deleteIfExists(path.toPath()); + } + + public void shutdown() { + if (cleanup != null) { + cleanup.cancel(); + cleanup = null; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java new file mode 100644 index 0000000..1d29fda --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import java.io.IOException; +import java.io.InputStream; + +public abstract class InputStreamWithMeta extends InputStream { + public abstract long getVersion() throws IOException; + public abstract long getFileLength() throws IOException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java new file mode 100644 index 0000000..32bb9fd --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +public interface KeyFilter<R> { + R filter(String key); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java new file mode 100644 index 0000000..1cddac0 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.blobstore; + +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.utils.Utils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.TreeSet; +import java.util.Map; +import java.util.List; + +/** + * Class hands over the key sequence number which implies the number of updates made to a blob. + * The information regarding the keys and the sequence number which represents the number of updates are + * stored within the zookeeper in the following format. + * /storm/blobstore/key_name/nimbushostport-sequencenumber + * Example: + * If there are two nimbodes with nimbus.seeds:leader,non-leader are set, + * then the state inside the zookeeper is eventually stored as: + * /storm/blobstore/key1/leader:8080-1 + * /storm/blobstore/key1/non-leader:8080-1 + * indicates that a new blob with the name key1 has been created on the leader + * nimbus and the non-leader nimbus syncs after a call back is triggered by attempting + * to download the blob and finally updates its state inside the zookeeper. + * + * A watch is placed on the /storm/blobstore/key1 and the znodes leader:8080-1 and + * non-leader:8080-1 are ephemeral which implies that these nodes exist only until the + * connection between the corresponding nimbus and the zookeeper persist. If in case the + * nimbus crashes the node disappears under /storm/blobstore/key1. + * + * The sequence number for the keys are handed over based on the following scenario: + * Lets assume there are three nimbodes up and running, one being the leader and the other + * being the non-leader. + * + * 1. Create is straight forward. + * Check whether the znode -> /storm/blobstore/key1 has been created or not. It implies + * the blob has not been created yet. If not created, it creates it and updates the zookeeper + * states under /storm/blobstore/key1 and /storm/blobstoremaxkeysequencenumber/key1. + * The znodes it creates on these nodes are /storm/blobstore/key1/leader:8080-1, + * /storm/blobstore/key1/non-leader:8080-1 and /storm/blobstoremaxkeysequencenumber/key1/1. + * The latter holds the global sequence number across all nimbodes more like a static variable + * indicating the true value of number of updates for a blob. This node helps to maintain sanity in case + * leadership changes due to crashing. + * + * 2. Delete does not require to hand over the sequence number. + * + * 3. Finally, the update has few scenarios. + * + * The class implements a TreeSet. The basic idea is if all the nimbodes have the same + * sequence number for the blob, then the number of elements in the set is 1 which holds + * the latest value of sequence number. If the number of elements are greater than 1 then it + * implies that there is sequence mismatch and there is need for syncing the blobs across + * nimbodes. + * + * The logic for handing over sequence numbers based on the state are described as follows + * Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2. + * Scenario 1: + * Example: Normal create/update scenario + * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num + * Create-Key1 alive - Leader alive 1 1 + * Sync alive - Leader alive 1 1 (callback -> download) 1 + * Update-Key1 alive - Leader alive 2 1 2 + * Sync alive - Leader alive 2 2 (callback -> download) 2 + * + * Scenario 2: + * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again + * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num + * Create alive - Leader alive 1 1 + * Sync alive - Leader alive 1 1 (callback -> download) 1 + * Update alive - Leader alive 2 1 2 + * Sync alive - Leader alive 2 2 (callback -> download) 2 + * Update alive - Leader alive 3 2 3 + * Crash crash - Leader alive 3 2 3 + * New - Leader crash alive - Leader 3 (Invalid) 2 3 + * Update crash alive - Leader 3 (Invalid) 4 (max-seq-num + 1) 4 + * N1-Restored alive alive - Leader 0 4 4 + * Sync alive alive - Leader 4 4 4 + * + * Scenario 3: + * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again + * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num + * Create alive - Leader alive 1 1 + * Sync alive - Leader alive 1 1 (callback -> download) 1 + * Update alive - Leader alive 2 1 2 + * Sync alive - Leader alive 2 2 (callback -> download) 2 + * Update alive - Leader alive 3 2 3 + * Crash crash - Leader alive 3 2 3 + * Elect Leader crash alive - Leader 3 (Invalid) 2 3 + * N1-Restored alive alive - Leader 3 2 3 + * Read/Update alive alive - Leader 3 4 (Downloads from N1) 4 + * Sync alive alive - Leader 4 (callback) 4 4 + * Here the download is triggered whenever an operation corresponding to the blob is triggered on the + * nimbus like a read or update operation. Here, in the read/update call it is hard to know which call + * is read or update. Hence, by incrementing the sequence number to max-seq-num + 1 we ensure that the + * synchronization happens appropriately and all nimbodes have the same blob. + */ +public class KeySequenceNumber { + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + private final String BLOBSTORE_SUBTREE="/blobstore"; + private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber"; + private final String key; + private final NimbusInfo nimbusInfo; + private final int INT_CAPACITY = 4; + private final int INITIAL_SEQUENCE_NUMBER = 1; + + public KeySequenceNumber(String key, NimbusInfo nimbusInfo) { + this.key = key; + this.nimbusInfo = nimbusInfo; + } + + public int getKeySequenceNumber(Map conf) { + TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); + CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); + try { + // Key has not been created yet and it is the first time it is being created + if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); + zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, + ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array()); + return INITIAL_SEQUENCE_NUMBER; + } + + // When all nimbodes go down and one or few of them come up + // Unfortunately there might not be an exact way to know which one contains the most updated blob, + // if all go down which is unlikely. Hence there might be a need to update the blob if all go down. + List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); + LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList); + if(stateInfoList.isEmpty()) { + return getMaxSequenceNumber(zkClient); + } + + LOG.debug("stateInfoSize {}", stateInfoList.size()); + // In all other cases check for the latest update sequence of the blob on the nimbus + // and assign the appropriate number. Check if all are have same sequence number, + // if not assign the highest sequence number. + for (String stateInfo:stateInfoList) { + sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo) + .getSequenceNumber())); + } + + // Update scenario 2 and 3 explain the code logic written here + // especially when nimbus crashes and comes up after and before update + // respectively. + int currentSeqNumber = getMaxSequenceNumber(zkClient); + if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && !nimbusInfo.isLeader()) { + if (sequenceNumbers.last() < currentSeqNumber) { + return currentSeqNumber; + } else { + return INITIAL_SEQUENCE_NUMBER - 1; + } + } + + // It covers scenarios expalined in scenario 3 when nimbus-1 holding the latest + // update goes down before it is downloaded by nimbus-2. Nimbus-2 gets elected as a leader + // after which nimbus-1 comes back up and a read or update is performed. + if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && nimbusInfo.isLeader()) { + incrementMaxSequenceNumber(zkClient, currentSeqNumber); + return currentSeqNumber + 1; + } + + // This code logic covers the update scenarios in 2 when the nimbus-1 goes down + // before syncing the blob to nimbus-2 and an update happens. + // If seq-num for nimbus-2 is 2 and max-seq-number is 3 then next sequence number is 4 + // (max-seq-number + 1). + // Other scenario it covers is when max-seq-number and nimbus seq number are equal. + if (sequenceNumbers.size() == 1) { + if (sequenceNumbers.first() < currentSeqNumber) { + incrementMaxSequenceNumber(zkClient, currentSeqNumber); + return currentSeqNumber + 1; + } else { + incrementMaxSequenceNumber(zkClient, currentSeqNumber); + return sequenceNumbers.first() + 1; + } + } + } catch(Exception e) { + LOG.error("Exception {}", e); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + // Normal create update sync scenario returns the greatest sequence number in the set + return sequenceNumbers.last(); + } + + private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) { + boolean containsNimbusHost = false; + for(String stateInfo:stateInfoList) { + if(stateInfo.contains(nimbusInfo.getHost())) { + containsNimbusHost = true; + break; + } + } + return containsNimbusHost; + } + + private void incrementMaxSequenceNumber(CuratorFramework zkClient, int count) throws Exception { + zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, + ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array()); + } + + private int getMaxSequenceNumber(CuratorFramework zkClient) throws Exception { + return ByteBuffer.wrap(zkClient.getData() + .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java new file mode 100644 index 0000000..0941b9a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.ReadableBlobMeta; + +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.utils.Utils; +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.FileNotFoundException; +import java.io.InputStream; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set;; + +import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN; +import static backtype.storm.blobstore.BlobStoreAclHandler.READ; +import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE; + +/** + * Provides a local file system backed blob store implementation for Nimbus. + */ +public class LocalFsBlobStore extends BlobStore { + public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); + private static final String DATA_PREFIX = "data_"; + private static final String META_PREFIX = "meta_"; + protected BlobStoreAclHandler _aclHandler; + private final String BLOBSTORE_SUBTREE = "/blobstore/"; + private NimbusInfo nimbusInfo; + private FileBlobStoreImpl fbs; + private final int allPermissions = READ | WRITE | ADMIN; + private Map conf; + + @Override + public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { + this.conf = conf; + this.nimbusInfo = nimbusInfo; + if (overrideBase == null) { + overrideBase = (String)conf.get(Config.BLOBSTORE_DIR); + if (overrideBase == null) { + overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR); + } + } + File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME); + try { + fbs = new FileBlobStoreImpl(baseDir, conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + _aclHandler = new BlobStoreAclHandler(conf); + } + + @Override + public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException { + LOG.debug("Creating Blob for key {}", key); + validateKey(key); + _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); + if (fbs.exists(DATA_PREFIX+key)) { + throw new KeyAlreadyExistsException(key); + } + BlobStoreFileOutputStream mOut = null; + try { + mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, true)); + mOut.write(Utils.thriftSerialize(meta)); + mOut.close(); + mOut = null; + return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, true)); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (mOut != null) { + try { + mOut.cancel(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { + validateKey(key); + checkForBlobOrDownload(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + try { + return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, false)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { + InputStream in = null; + try { + LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key); + try { + in = pf.getInputStream(); + } catch (FileNotFoundException fnf) { + throw new KeyNotFoundException(key); + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte [] buffer = new byte[2048]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + in.close(); + in = null; + return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException { + validateKey(key); + if(!checkForBlobOrDownload(key)) { + checkForBlobUpdate(key); + } + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(meta); + try { + LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key); + rbm.set_version(pf.getModTime()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return rbm; + } + + @Override + public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException { + validateKey(key); + checkForBlobOrDownload(key); + _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + SettableBlobMeta orig = getStoredBlobMeta(key); + _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); + BlobStoreFileOutputStream mOut = null; + try { + mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, false)); + mOut.write(Utils.thriftSerialize(meta)); + mOut.close(); + mOut = null; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (mOut != null) { + try { + mOut.cancel(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { + validateKey(key); + checkForBlobOrDownload(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + try { + fbs.deleteKey(DATA_PREFIX+key); + fbs.deleteKey(META_PREFIX+key); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { + validateKey(key); + if(!checkForBlobOrDownload(key)) { + checkForBlobUpdate(key); + } + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); + try { + return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Iterator<String> listKeys() { + try { + return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void shutdown() { + } + + @Override + public int getBlobReplication(String key, Subject who) throws Exception { + CuratorFramework zkClient = null; + int replicationCount = 0; + try { + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); + zkClient = BlobStoreUtils.createZKClient(conf); + if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) { + zkClient.close(); + return 0; + } + replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size(); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + return replicationCount; + } + + @Override + public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { + throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " + + "Please use HDFS blob store to make this feature available."); + } + + //This additional check and download is for nimbus high availability in case you have more than one nimbus + public boolean checkForBlobOrDownload(String key) { + boolean checkBlobDownload = false; + CuratorFramework zkClient = null; + try { + List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this); + if (!keyList.contains(key)) { + zkClient = BlobStoreUtils.createZKClient(conf); + if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != null) { + Set<NimbusInfo> nimbusSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); + if (BlobStoreUtils.downloadMissingBlob(conf, this, key, nimbusSet)) { + LOG.debug("Updating blobs state"); + BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); + checkBlobDownload = true; + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + return checkBlobDownload; + } + + public void checkForBlobUpdate(String key) { + CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); + BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo); + zkClient.close(); + } + + public void fullCleanup(long age) throws IOException { + fbs.fullCleanup(age); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java new file mode 100644 index 0000000..fb11fa6 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.generated.SettableBlobMeta; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.regex.Matcher; + +public class LocalFsBlobStoreFile extends BlobStoreFile { + + private final String _key; + private final boolean _isTmp; + private final File _path; + private Long _modTime = null; + private final boolean _mustBeNew; + private SettableBlobMeta meta; + + public LocalFsBlobStoreFile(File base, String name) { + if (BlobStoreFile.BLOBSTORE_DATA_FILE.equals(name)) { + _isTmp = false; + } else { + Matcher m = TMP_NAME_PATTERN.matcher(name); + if (!m.matches()) { + throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN); + } + _isTmp = true; + } + _key = base.getName(); + _path = new File(base, name); + _mustBeNew = false; + } + + public LocalFsBlobStoreFile(File base, boolean isTmp, boolean mustBeNew) { + _key = base.getName(); + _isTmp = isTmp; + _mustBeNew = mustBeNew; + if (_isTmp) { + _path = new File(base, System.currentTimeMillis()+TMP_EXT); + } else { + _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE); + } + } + + @Override + public void delete() throws IOException { + _path.delete(); + } + + @Override + public boolean isTmp() { + return _isTmp; + } + + @Override + public String getKey() { + return _key; + } + + @Override + public long getModTime() throws IOException { + if (_modTime == null) { + _modTime = _path.lastModified(); + } + return _modTime; + } + + @Override + public InputStream getInputStream() throws IOException { + if (isTmp()) { + throw new IllegalStateException("Cannot read from a temporary part file."); + } + return new FileInputStream(_path); + } + + @Override + public OutputStream getOutputStream() throws IOException { + if (!isTmp()) { + throw new IllegalStateException("Can only write to a temporary part file."); + } + boolean success = false; + try { + success = _path.createNewFile(); + } catch (IOException e) { + //Try to create the parent directory, may not work + _path.getParentFile().mkdirs(); + success = _path.createNewFile(); + } + if (!success) { + throw new IOException(_path+" already exists"); + } + return new FileOutputStream(_path); + } + + @Override + public void commit() throws IOException { + if (!isTmp()) { + throw new IllegalStateException("Can only write to a temporary part file."); + } + + File dest = new File(_path.getParentFile(), BlobStoreFile.BLOBSTORE_DATA_FILE); + if (_mustBeNew) { + Files.move(_path.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE); + } else { + Files.move(_path.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + } + + @Override + public void cancel() throws IOException { + if (!isTmp()) { + throw new IllegalStateException("Can only write to a temporary part file."); + } + delete(); + } + + @Override + public SettableBlobMeta getMetadata () { + return meta; + } + + @Override + public void setMetadata (SettableBlobMeta meta) { + this.meta = meta; + } + + @Override + public String toString() { + return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key; + } + + @Override + public long getFileLength() { + return _path.length(); + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java new file mode 100644 index 0000000..bf084bb --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import backtype.storm.Config; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.BeginDownloadResult; +import backtype.storm.generated.ListBlobsResult; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +public class NimbusBlobStore extends ClientBlobStore { + private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class); + + public class NimbusKeyIterator implements Iterator<String> { + private ListBlobsResult listBlobs = null; + private int offset = 0; + private boolean eof = false; + + public NimbusKeyIterator(ListBlobsResult listBlobs) { + this.listBlobs = listBlobs; + this.eof = (listBlobs.get_keys_size() == 0); + } + + private boolean isCacheEmpty() { + return listBlobs.get_keys_size() <= offset; + } + + private void readMore() throws TException { + if (!eof) { + offset = 0; + synchronized(client) { + listBlobs = client.getClient().listBlobs(listBlobs.get_session()); + } + if (listBlobs.get_keys_size() == 0) { + eof = true; + } + } + } + + @Override + public synchronized boolean hasNext() { + try { + if (isCacheEmpty()) { + readMore(); + } + } catch (TException e) { + throw new RuntimeException(e); + } + return !eof; + } + + @Override + public synchronized String next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String ret = listBlobs.get_keys().get(offset); + offset++; + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Delete Not Supported"); + } + } + + public class NimbusDownloadInputStream extends InputStreamWithMeta { + private BeginDownloadResult beginBlobDownload; + private byte[] buffer = null; + private int offset = 0; + private int end = 0; + private boolean eof = false; + + public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) { + this.beginBlobDownload = beginBlobDownload; + } + + @Override + public long getVersion() throws IOException { + return beginBlobDownload.get_version(); + } + + @Override + public synchronized int read() throws IOException { + try { + if (isEmpty()) { + readMore(); + if (eof) { + return -1; + } + } + int length = Math.min(1, available()); + if (length == 0) { + return -1; + } + int ret = buffer[offset]; + offset += length; + return ret; + } catch(TException exp) { + throw new IOException(exp); + } + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + try { + if (isEmpty()) { + readMore(); + if (eof) { + return -1; + } + } + int length = Math.min(len, available()); + System.arraycopy(buffer, offset, b, off, length); + offset += length; + return length; + } catch(TException exp) { + throw new IOException(exp); + } + } + + private boolean isEmpty() { + return buffer == null || offset >= end; + } + + private void readMore() throws TException { + if (!eof) { + ByteBuffer buff; + synchronized(client) { + buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session()); + } + buffer = buff.array(); + offset = buff.arrayOffset() + buff.position(); + int length = buff.remaining(); + end = offset + length; + if (length == 0) { + eof = true; + } + } + } + + @Override + public synchronized int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public synchronized int available() { + return buffer == null ? 0 : (end - offset); + } + + @Override + public long getFileLength() { + return beginBlobDownload.get_data_size(); + } + } + + public class NimbusUploadAtomicOutputStream extends AtomicOutputStream { + private String session; + private int maxChunkSize = 4096; + private String key; + + public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) { + this.session = session; + this.maxChunkSize = bufferSize; + this.key = key; + } + + @Override + public void cancel() throws IOException { + try { + synchronized(client) { + client.getClient().cancelBlobUpload(session); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(int b) throws IOException { + try { + synchronized(client) { + client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b})); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(byte []b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte []b, int offset, int len) throws IOException { + try { + int end = offset + len; + for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) { + int realLen = Math.min(end - realOffset, maxChunkSize); + LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset)); + synchronized(client) { + client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen)); + } + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + try { + synchronized(client) { + client.getClient().finishBlobUpload(session); + client.getClient().createStateInZookeeper(key); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + } + + private NimbusClient client; + private int bufferSize = 4096; + + @Override + public void prepare(Map conf) { + this.client = NimbusClient.getConfiguredClient(conf); + if (conf != null) { + this.bufferSize = Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); + } + } + + @Override + protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) + throws AuthorizationException, KeyAlreadyExistsException { + try { + synchronized(client) { + return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key); + } + } catch (AuthorizationException | KeyAlreadyExistsException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public AtomicOutputStream updateBlob(String key) + throws AuthorizationException, KeyNotFoundException { + try { + synchronized(client) { + return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key); + } + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException { + try { + synchronized(client) { + return client.getClient().getBlobMeta(key); + } + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) + throws AuthorizationException, KeyNotFoundException { + try { + synchronized(client) { + client.getClient().setBlobMeta(key, meta); + } + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException { + try { + synchronized(client) { + client.getClient().deleteBlob(key); + } + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createStateInZookeeper(String key) { + try { + synchronized(client) { + client.getClient().createStateInZookeeper(key); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException { + try { + synchronized(client) { + return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key)); + } + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Iterator<String> listKeys() { + try { + synchronized(client) { + return new NimbusKeyIterator(client.getClient().listBlobs("")); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException { + try { + return client.getClient().getBlobReplication(key); + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException { + try { + return client.getClient().updateBlobReplication(key, replication); + } catch (AuthorizationException | KeyNotFoundException exp) { + throw exp; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean setClient(Map conf, NimbusClient client) { + this.client = client; + if (conf != null) { + this.bufferSize = Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); + } + return true; + } + + @Override + protected void finalize() { + shutdown(); + } + + @Override + public void shutdown() { + if (client != null) { + client.close(); + client = null; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java index 1bcc645..1960371 100644 --- a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java +++ b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java @@ -205,4 +205,13 @@ public interface ClusterState { * @param path The path to synchronize. */ void sync_path(String path); + + /** + * Allows us to delete the znodes within /storm/blobstore/key_name + * whose znodes start with the corresponding nimbusHostPortInfo + * @param path /storm/blobstore/key_name + * @param nimbusHostPortInfo Contains the host port information of + * a nimbus node. + */ + void delete_node_blobstore(String path, String nimbusHostPortInfo); } http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java deleted file mode 100644 index c46688f..0000000 --- a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.codedistributor; - - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Interface responsible to distribute code in the cluster. - */ -public interface ICodeDistributor { - /** - * Prepare this code distributor. - * @param conf - */ - void prepare(Map conf) throws Exception; - - /** - * This API will perform the actual upload of the code to the distribution implementation. - * The API should return a Meta file which should have enough information for downloader - * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3 - * it might have the actual directory where all the code is put. - * @param dirPath directory where all the code to be distributed exists. - * @param topologyId the topologyId for which the meta file needs to be created. - * @return metaFile - */ - File upload(String dirPath, String topologyId) throws Exception; - - /** - * Given the topologyId and metafile, download the actual code and return the downloaded file's list. - * @param topologyid - * @param metafile - * @return - */ - List<File> download(String topologyid, File metafile) throws Exception; - - /** - * returns number of nodes to which the code is already replicated for the topology. - * @param topologyId - * @return - */ - short getReplicationCount(String topologyId) throws Exception; - - /** - * Performs the cleanup. - * @param topologyid - */ - void cleanup(String topologyid) throws IOException; - - /** - * Close this distributor. - * @param conf - */ - void close(Map conf); -} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java deleted file mode 100644 index 76993e2..0000000 --- a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.codedistributor; - -import backtype.storm.nimbus.NimbusInfo; -import backtype.storm.utils.ZookeeperAuthInfo; -import com.google.common.collect.Lists; -import org.apache.commons.io.FileUtils; -import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static backtype.storm.Config.*; -import static backtype.storm.utils.Utils.downloadFromHost; -import static backtype.storm.utils.Utils.newCurator; - - -public class LocalFileSystemCodeDistributor implements ICodeDistributor { - private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class); - private CuratorFramework zkClient; - private Map conf; - - @Override - public void prepare(Map conf) throws Exception { - this.conf = conf; - List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS); - int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT); - ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); - zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo); - zkClient.start(); - } - - @Override - public File upload(String dirPath, String topologyId) throws Exception { - ArrayList<File> files = new ArrayList<File>(); - File destDir = new File(dirPath); - File[] localFiles = destDir.listFiles(); - - List<String> filePaths = new ArrayList<String>(3); - for (File file : localFiles) { - filePaths.add(file.getAbsolutePath()); - } - - File metaFile = new File(destDir, "storm-code-distributor.meta"); - boolean isCreated = metaFile.createNewFile(); - if (isCreated) { - FileUtils.writeLines(metaFile, filePaths); - } else { - LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists."); - } - - LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful."); - - return metaFile; - } - - @Override - public List<File> download(String topologyid, File metafile) throws Exception { - List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid); - File destDir = metafile.getParentFile(); - List<File> downloadedFiles = Lists.newArrayList(); - for (String absolutePathOnRemote : FileUtils.readLines(metafile)) { - - File localFile = new File(destDir, new File(absolutePathOnRemote).getName()); - - boolean isSuccess = false; - for (String hostAndPort : hostInfos) { - NimbusInfo nimbusInfo = NimbusInfo.parse(hostAndPort); - try { - LOG.info("Attempting to download meta file {} from remote {}", absolutePathOnRemote, nimbusInfo.toHostPortString()); - downloadFromHost(conf, absolutePathOnRemote, localFile.getAbsolutePath(), nimbusInfo.getHost(), nimbusInfo.getPort()); - downloadedFiles.add(localFile); - isSuccess = true; - break; - } catch (Exception e) { - LOG.error("download failed from {}:{}, will try another endpoint ", nimbusInfo.getHost(), nimbusInfo.getPort(), e); - } - } - - if(!isSuccess) { - throw new RuntimeException("File " + absolutePathOnRemote +" could not be downloaded from any endpoint"); - } - } - - return downloadedFiles; - } - - @Override - public short getReplicationCount(String topologyId) throws Exception { - return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size(); - } - - @Override - public void cleanup(String topologyid) throws IOException { - //no op. - } - - @Override - public void close(Map conf) { - zkClient.close(); - } -}
