Blobstore API STORM- 876
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7029aee5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7029aee5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7029aee5 Branch: refs/heads/master Commit: 7029aee576cff6d7159e9e1a991bb02aaf0d4cd8 Parents: 0acc1ce Author: Sanket <schintap@untilservice-lm> Authored: Mon Nov 30 14:56:15 2015 -0600 Committer: Sanket <schintap@untilservice-lm> Committed: Mon Nov 30 14:56:15 2015 -0600 ---------------------------------------------------------------------- bin/storm.py | 28 +- conf/defaults.yaml | 16 +- external/storm-hdfs/pom.xml | 70 + .../storm/hdfs/blobstore/HdfsBlobStore.java | 370 + .../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 + .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 + .../hdfs/blobstore/HdfsClientBlobStore.java | 115 + .../ha/codedistributor/HDFSCodeDistributor.java | 118 - .../storm/hdfs/blobstore/BlobStoreTest.java | 530 + .../hdfs/blobstore/HdfsBlobStoreImplTest.java | 231 + pom.xml | 16 +- storm-core/pom.xml | 36 +- storm-core/src/clj/backtype/storm/blobstore.clj | 28 + storm-core/src/clj/backtype/storm/cluster.clj | 92 +- .../cluster_state/zookeeper_state_factory.clj | 4 + .../clj/backtype/storm/command/blobstore.clj | 162 + storm-core/src/clj/backtype/storm/config.clj | 24 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 678 +- .../clj/backtype/storm/daemon/supervisor.clj | 480 +- storm-core/src/clj/backtype/storm/testing.clj | 5 +- storm-core/src/clj/backtype/storm/util.clj | 16 +- storm-core/src/clj/backtype/storm/zookeeper.clj | 31 +- storm-core/src/jvm/backtype/storm/Config.java | 123 +- .../storm/blobstore/AtomicOutputStream.java | 32 + .../storm/blobstore/BlobKeySequenceInfo.java | 40 + .../jvm/backtype/storm/blobstore/BlobStore.java | 445 + .../storm/blobstore/BlobStoreAclHandler.java | 399 + .../backtype/storm/blobstore/BlobStoreFile.java | 50 + .../storm/blobstore/BlobStoreUtils.java | 257 + .../storm/blobstore/BlobSynchronizer.java | 124 + .../storm/blobstore/ClientBlobStore.java | 62 + .../storm/blobstore/FileBlobStoreImpl.java | 248 + .../storm/blobstore/InputStreamWithMeta.java | 26 + .../jvm/backtype/storm/blobstore/KeyFilter.java | 22 + .../storm/blobstore/KeySequenceNumber.java | 229 + .../storm/blobstore/LocalFsBlobStore.java | 308 + .../storm/blobstore/LocalFsBlobStoreFile.java | 159 + .../storm/blobstore/NimbusBlobStore.java | 412 + .../backtype/storm/cluster/ClusterState.java | 9 + .../storm/codedistributor/ICodeDistributor.java | 73 - .../LocalFileSystemCodeDistributor.java | 123 - .../backtype/storm/generated/AccessControl.java | 627 + .../storm/generated/AccessControlType.java | 62 + .../backtype/storm/generated/Assignment.java | 244 +- .../storm/generated/BeginDownloadResult.java | 608 + .../storm/generated/ClusterWorkerHeartbeat.java | 52 +- .../jvm/backtype/storm/generated/HBNodes.java | 32 +- .../jvm/backtype/storm/generated/HBRecords.java | 36 +- .../generated/KeyAlreadyExistsException.java | 406 + .../storm/generated/KeyNotFoundException.java | 406 + .../storm/generated/LSApprovedWorkers.java | 44 +- .../generated/LSSupervisorAssignments.java | 48 +- .../backtype/storm/generated/LSTopoHistory.java | 64 +- .../storm/generated/LSTopoHistoryList.java | 36 +- .../storm/generated/LSWorkerHeartbeat.java | 36 +- .../storm/generated/ListBlobsResult.java | 556 + .../storm/generated/LocalAssignment.java | 36 +- .../storm/generated/LocalStateData.java | 48 +- .../jvm/backtype/storm/generated/LogConfig.java | 48 +- .../jvm/backtype/storm/generated/Nimbus.java | 26917 +++++++++++++---- .../jvm/backtype/storm/generated/NodeInfo.java | 32 +- .../storm/generated/ReadableBlobMeta.java | 510 + .../storm/generated/SettableBlobMeta.java | 567 + .../jvm/backtype/storm/generated/StormBase.java | 92 +- .../storm/generated/SupervisorInfo.java | 152 +- .../storm/generated/TopologyHistoryInfo.java | 32 +- .../backtype/storm/localizer/LocalResource.java | 44 + .../storm/localizer/LocalizedResource.java | 130 + .../LocalizedResourceRetentionSet.java | 140 + .../storm/localizer/LocalizedResourceSet.java | 101 + .../jvm/backtype/storm/localizer/Localizer.java | 695 + .../storm/security/auth/NimbusPrincipal.java | 29 + .../backtype/storm/utils/BufferInputStream.java | 53 + .../jvm/backtype/storm/utils/ShellUtils.java | 7 + .../src/jvm/backtype/storm/utils/Utils.java | 545 +- .../storm/validation/ConfigValidation.java | 16 +- .../validation/ConfigValidationAnnotations.java | 11 +- storm-core/src/py/storm/Nimbus-remote | 98 + storm-core/src/py/storm/Nimbus.py | 5991 +++- storm-core/src/py/storm/ttypes.py | 996 +- storm-core/src/storm.thrift | 59 + .../test/clj/backtype/storm/cluster_test.clj | 20 +- .../test/clj/backtype/storm/nimbus_test.clj | 43 +- .../storm/security/auth/ReqContext_test.clj | 1 + .../test/clj/backtype/storm/supervisor_test.clj | 18 +- .../backtype/storm/blobstore/BlobStoreTest.java | 461 + .../storm/blobstore/BlobSynchronizerTest.java | 137 + .../storm/blobstore/ClientBlobStoreTest.java | 179 + .../LocalizedResourceRetentionSetTest.java | 85 + .../localizer/LocalizedResourceSetTest.java | 74 + .../backtype/storm/localizer/LocalizerTest.java | 671 + .../jvm/backtype/storm/localizer/localtest.zip | Bin 0 -> 6378 bytes .../storm/localizer/localtestwithsymlink.jar | Bin 0 -> 6591 bytes .../storm/localizer/localtestwithsymlink.tar | Bin 0 -> 24576 bytes .../storm/localizer/localtestwithsymlink.tar.gz | Bin 0 -> 6106 bytes .../storm/localizer/localtestwithsymlink.tgz | Bin 0 -> 6106 bytes 96 files changed, 39479 insertions(+), 9515 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index 44d54d5..f943778 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -263,6 +263,32 @@ def upload_credentials(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) +def blobstore(*args): + """Syntax: [storm blobstore cmd] + + list [KEY...] - lists blobs currently in the blob store + cat [-f FILE] KEY - read a blob and then either write it to a file, or STDOUT (requires read access). + create [-f FILE] [-a ACL ...] [--replication-factor NUMBER] KEY - create a new blob. Contents comes from a FILE + or STDIN. ACL is in the form [uo]:[username]:[r-][w-][a-] can be comma separated list. + update [-f FILE] KEY - update the contents of a blob. Contents comes from + a FILE or STDIN (requires write access). + delete KEY - delete an entry from the blob store (requires write access). + set-acl [-s ACL] KEY - ACL is in the form [uo]:[username]:[r-][w-][a-] can be comma + separated list (requires admin access). + replication --read KEY - Used to read the replication factor of the blob. + replication --update --replication-factor NUMBER KEY where NUMBER > 0. It is used to update the + replication factor of a blob. + For example, the following would create a mytopo:data.tgz key using the data + stored in data.tgz. User alice would have full access, bob would have + read/write access and everyone else would have read access. + storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r + """ + exec_storm_class( + "backtype.storm.command.blobstore", + args=args, + jvmtype="-client", + extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def heartbeats(*args): """Syntax: [storm heartbeats [cmd]] @@ -658,7 +684,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor, - "upload-credentials": upload_credentials, "pacemaker": pacemaker, "heartbeats": heartbeats, + "upload-credentials": upload_credentials, "pacemaker": pacemaker, "heartbeats": heartbeats, "blobstore": blobstore, "get-errors": get_errors, "set_log_level": set_log_level, "kill_workers": kill_workers, "node-health-check": healthcheck} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 295ac7c..aeda11c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -66,7 +66,7 @@ nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 -nimbus.code.sync.freq.secs: 300 +nimbus.code.sync.freq.secs: 120 nimbus.task.launch.secs: 120 nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" @@ -117,6 +117,20 @@ transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null +## blobstore configs +supervisor.blobstore.class: "backtype.storm.blobstore.NimbusBlobStore" +supervisor.blobstore.download.thread.count: 5 +supervisor.blobstore.download.max_retries: 3 +supervisor.localizer.cache.target.size.mb: 10240 +supervisor.localizer.cleanup.interval.ms: 600000 + +nimbus.blobstore.class: "backtype.storm.blobstore.LocalFsBlobStore" +nimbus.blobstore.expiration.secs: 600 + +storm.blobstore.inputstream.buffer.size.bytes: 65536 +client.blobstore.class: "backtype.storm.blobstore.NimbusBlobStore" +storm.blobstore.replication.factor: 3 + ### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports: http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 1765be9..7fad1a3 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -93,6 +93,17 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> <exclusions> @@ -103,6 +114,65 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>2.4.0</version> + <exclusions> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.directory.server</groupId> + <artifactId>apacheds-kerberos-codec</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java new file mode 100644 index 0000000..144ad71 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.Config; +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.BlobStore; +import backtype.storm.blobstore.BlobStoreAclHandler; +import backtype.storm.blobstore.BlobStoreFile; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.utils.Utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import java.util.Map; + +import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN; +import static backtype.storm.blobstore.BlobStoreAclHandler.READ; +import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE; + +/** + * Provides a HDFS file system backed blob store implementation. + * Note that this provides an api for having HDFS be the backing store for the blobstore, + * it is not a service/daemon. + */ +public class HdfsBlobStore extends BlobStore { + public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class); + private static final String DATA_PREFIX = "data_"; + private static final String META_PREFIX = "meta_"; + private BlobStoreAclHandler _aclHandler; + private HdfsBlobStoreImpl _hbs; + private Subject _localSubject; + private Map conf; + + /** + * Get the subject from Hadoop so we can use it to validate the acls. There is no direct + * interface from UserGroupInformation to get the subject, so do a doAs and get the context. + * We could probably run everything in the doAs but for now just grab the subject. + */ + private Subject getHadoopUser() { + Subject subj; + try { + subj = UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction<Subject>() { + @Override + public Subject run() { + return Subject.getSubject(AccessController.getContext()); + } + }); + } catch (IOException e) { + throw new RuntimeException("Error creating subject and logging user in!", e); + } + return subj; + } + + /** + * If who is null then we want to use the user hadoop says we are. + * Required for the supervisor to call these routines as its not + * logged in as anyone. + */ + private Subject checkAndGetSubject(Subject who) { + if (who == null) { + return _localSubject; + } + return who; + } + + @Override + public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { + this.conf = conf; + prepareInternal(conf, overrideBase, null); + } + + /** + * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs + * must be in your classpath. + */ + protected void prepareInternal(Map conf, String overrideBase, Configuration hadoopConf) { + this.conf = conf; + if (overrideBase == null) { + overrideBase = (String)conf.get(Config.BLOBSTORE_DIR); + } + if (overrideBase == null) { + throw new RuntimeException("You must specify a blobstore directory for HDFS to use!"); + } + LOG.debug("directory is: {}", overrideBase); + try { + // if a HDFS keytab/principal have been supplied login, otherwise assume they are + // logged in already or running insecure HDFS. + String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL); + String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB); + + if (principal != null && keyTab != null) { + UserGroupInformation.loginUserFromKeytab(principal, keyTab); + } else { + if (principal == null && keyTab != null) { + throw new RuntimeException("You must specify an HDFS principal to go with the keytab!"); + + } else { + if (principal != null && keyTab == null) { + throw new RuntimeException("You must specify HDFS keytab go with the principal!"); + } + } + } + } catch (IOException e) { + throw new RuntimeException("Error logging in from keytab!", e); + } + Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME); + try { + if (hadoopConf != null) { + _hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf); + } else { + _hbs = new HdfsBlobStoreImpl(baseDir, conf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + _localSubject = getHadoopUser(); + _aclHandler = new BlobStoreAclHandler(conf); + } + + @Override + public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) + throws AuthorizationException, KeyAlreadyExistsException { + if (meta.get_replication_factor() <= 0) { + meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR)); + } + who = checkAndGetSubject(who); + validateKey(key); + _aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN); + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + _aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); + if (_hbs.exists(DATA_PREFIX+key)) { + throw new KeyAlreadyExistsException(key); + } + BlobStoreFileOutputStream mOut = null; + try { + BlobStoreFile metaFile = _hbs.write(META_PREFIX + key, true); + metaFile.setMetadata(meta); + mOut = new BlobStoreFileOutputStream(metaFile); + mOut.write(Utils.thriftSerialize(meta)); + mOut.close(); + mOut = null; + BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, true); + dataFile.setMetadata(meta); + return new BlobStoreFileOutputStream(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (mOut != null) { + try { + mOut.cancel(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public AtomicOutputStream updateBlob(String key, Subject who) + throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + SettableBlobMeta meta = getStoredBlobMeta(key); + validateKey(key); + _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + try { + BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, false); + dataFile.setMetadata(meta); + return new BlobStoreFileOutputStream(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { + InputStream in = null; + try { + BlobStoreFile pf = _hbs.read(META_PREFIX + key); + try { + in = pf.getInputStream(); + } catch (FileNotFoundException fnf) { + throw new KeyNotFoundException(key); + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[2048]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + in.close(); + in = null; + return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public ReadableBlobMeta getBlobMeta(String key, Subject who) + throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(meta); + try { + BlobStoreFile pf = _hbs.read(DATA_PREFIX + key); + rbm.set_version(pf.getModTime()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return rbm; + } + + @Override + public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) + throws AuthorizationException, KeyNotFoundException { + if (meta.get_replication_factor() <= 0) { + meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR)); + } + who = checkAndGetSubject(who); + validateKey(key); + _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + SettableBlobMeta orig = getStoredBlobMeta(key); + _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); + BlobStoreFileOutputStream mOut = null; + writeMetadata(key, meta); + } + + @Override + public void deleteBlob(String key, Subject who) + throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + try { + _hbs.deleteKey(DATA_PREFIX + key); + _hbs.deleteKey(META_PREFIX + key); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputStreamWithMeta getBlob(String key, Subject who) + throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); + try { + return new BlobStoreFileInputStream(_hbs.read(DATA_PREFIX + key)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Iterator<String> listKeys() { + try { + return new KeyTranslationIterator(_hbs.listKeys(), DATA_PREFIX); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void shutdown() { + //Empty + } + + @Override + public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + _aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); + try { + return _hbs.getBlobReplication(DATA_PREFIX + key); + } catch (IOException exp) { + throw new RuntimeException(exp); + } + } + + @Override + public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + validateKey(key); + SettableBlobMeta meta = getStoredBlobMeta(key); + meta.set_replication_factor(replication); + _aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key); + try { + writeMetadata(key, meta); + return _hbs.updateBlobReplication(DATA_PREFIX + key, replication); + } catch (IOException exp) { + throw new RuntimeException(exp); + } + } + + public void writeMetadata(String key, SettableBlobMeta meta) + throws AuthorizationException, KeyNotFoundException { + BlobStoreFileOutputStream mOut = null; + try { + BlobStoreFile hdfsFile = _hbs.write(META_PREFIX + key, false); + hdfsFile.setMetadata(meta); + mOut = new BlobStoreFileOutputStream(hdfsFile); + mOut.write(Utils.thriftSerialize(meta)); + mOut.close(); + mOut = null; + } catch (IOException exp) { + throw new RuntimeException(exp); + } finally { + if (mOut != null) { + try { + mOut.cancel(); + } catch (IOException e) { + //Ignored + } + } + } + } + + public void fullCleanup(long age) throws IOException { + _hbs.fullCleanup(age); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java new file mode 100644 index 0000000..93b56c1 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.blobstore.BlobStoreFile; +import backtype.storm.generated.SettableBlobMeta; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.regex.Matcher; + +public class HdfsBlobStoreFile extends BlobStoreFile { + public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class); + + private final String _key; + private final boolean _isTmp; + private final Path _path; + private Long _modTime = null; + private final boolean _mustBeNew; + private final Configuration _hadoopConf; + private final FileSystem _fs; + private SettableBlobMeta meta; + + // files are world-wide readable and owner writable + final public static FsPermission BLOBSTORE_FILE_PERMISSION = + FsPermission.createImmutable((short) 0644); // rw-r--r-- + + public HdfsBlobStoreFile(Path base, String name, Configuration hconf) { + if (BLOBSTORE_DATA_FILE.equals(name)) { + _isTmp = false; + } else { + Matcher m = TMP_NAME_PATTERN.matcher(name); + if (!m.matches()) { + throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN); + } + _isTmp = true; + } + _hadoopConf = hconf; + _key = base.getName(); + _path = new Path(base, name); + _mustBeNew = false; + try { + _fs = _path.getFileSystem(_hadoopConf); + } catch (IOException e) { + throw new RuntimeException("Error getting filesystem for path: " + _path, e); + } + } + + public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) { + _key = base.getName(); + _hadoopConf = hconf; + _isTmp = isTmp; + _mustBeNew = mustBeNew; + if (_isTmp) { + _path = new Path(base, System.currentTimeMillis()+TMP_EXT); + } else { + _path = new Path(base, BLOBSTORE_DATA_FILE); + } + try { + _fs = _path.getFileSystem(_hadoopConf); + } catch (IOException e) { + throw new RuntimeException("Error getting filesystem for path: " + _path, e); + } + } + + @Override + public void delete() throws IOException { + _fs.delete(_path, true); + } + + @Override + public boolean isTmp() { + return _isTmp; + } + + @Override + public String getKey() { + return _key; + } + + @Override + public long getModTime() throws IOException { + if (_modTime == null) { + FileSystem fs = _path.getFileSystem(_hadoopConf); + _modTime = fs.getFileStatus(_path).getModificationTime(); + } + return _modTime; + } + + private void checkIsNotTmp() { + if (!isTmp()) { + throw new IllegalStateException("Can only operate on a temporary blobstore file."); + } + } + + private void checkIsTmp() { + if (isTmp()) { + throw new IllegalStateException("Cannot operate on a temporary blobstore file."); + } + } + + @Override + public InputStream getInputStream() throws IOException { + checkIsTmp(); + return _fs.open(_path); + } + + @Override + public OutputStream getOutputStream() throws IOException { + checkIsNotTmp(); + OutputStream out = null; + FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION); + try { + out = _fs.create(_path, (short)this.getMetadata().get_replication_factor()); + _fs.setPermission(_path, fileperms); + _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor()); + } catch (IOException e) { + //Try to create the parent directory, may not work + FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION); + if (!_fs.mkdirs(_path.getParent(), dirperms)) { + LOG.warn("error creating parent dir: " + _path.getParent()); + } + out = _fs.create(_path, (short)this.getMetadata().get_replication_factor()); + _fs.setPermission(_path, dirperms); + _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor()); + } + if (out == null) { + throw new IOException("Error in creating: " + _path); + } + return out; + } + + @Override + public void commit() throws IOException { + checkIsNotTmp(); + // FileContext supports atomic rename, whereas FileSystem doesn't + FileContext fc = FileContext.getFileContext(_hadoopConf); + Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE); + if (_mustBeNew) { + fc.rename(_path, dest); + } else { + fc.rename(_path, dest, Options.Rename.OVERWRITE); + } + // Note, we could add support for setting the replication factor + } + + @Override + public void cancel() throws IOException { + checkIsNotTmp(); + delete(); + } + + @Override + public String toString() { + return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key; + } + + @Override + public long getFileLength() throws IOException { + return _fs.getFileStatus(_path).getLen(); + } + + @Override + public SettableBlobMeta getMetadata() { + return meta; + } + + @Override + public void setMetadata(SettableBlobMeta meta) { + this.meta = meta; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java new file mode 100644 index 0000000..e434752 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.Config; +import backtype.storm.blobstore.BlobStoreFile; +import backtype.storm.utils.Utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Timer; +import java.util.TimerTask; + +/** + * HDFS blob store impl. + */ +public class HdfsBlobStoreImpl { + private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class); + + private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l; + private static final int BUCKETS = 1024; + private static final Timer timer = new Timer("HdfsBlobStore cleanup thread", true); + private static final String BLOBSTORE_DATA = "data"; + + public class KeyInHashDirIterator implements Iterator<String> { + private int currentBucket = 0; + private Iterator<String> it = null; + private String next = null; + + public KeyInHashDirIterator() throws IOException { + primeNext(); + } + + private void primeNext() throws IOException { + while (it == null && currentBucket < BUCKETS) { + String name = String.valueOf(currentBucket); + Path dir = new Path(_fullPath, name); + try { + it = listKeys(dir); + } catch (FileNotFoundException e) { + it = null; + } + if (it == null || !it.hasNext()) { + it = null; + currentBucket++; + } else { + next = it.next(); + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String current = next; + next = null; + if (it != null) { + if (!it.hasNext()) { + it = null; + currentBucket++; + try { + primeNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + next = it.next(); + } + } + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Delete Not Supported"); + } + } + + + private Path _fullPath; + private FileSystem _fs; + private TimerTask _cleanup = null; + private Configuration _hadoopConf; + + // blobstore directory is private! + final public static FsPermission BLOBSTORE_DIR_PERMISSION = + FsPermission.createImmutable((short) 0700); // rwx-------- + + public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException { + this(path, conf, new Configuration()); + } + + public HdfsBlobStoreImpl(Path path, Map<String, Object> conf, + Configuration hconf) throws IOException { + LOG.info("Blob store based in {}", path); + _fullPath = path; + _hadoopConf = hconf; + _fs = path.getFileSystem(_hadoopConf); + + if (!_fs.exists(_fullPath)) { + FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION); + boolean success = _fs.mkdirs(_fullPath, perms); + if (!success) { + throw new IOException("Error creating blobstore directory: " + _fullPath); + } + } + + Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE); + if (Utils.getBoolean(shouldCleanup, false)) { + LOG.debug("Starting hdfs blobstore cleaner"); + _cleanup = new TimerTask() { + @Override + public void run() { + try { + fullCleanup(FULL_CLEANUP_FREQ); + } catch (IOException e) { + LOG.error("Error trying to cleanup", e); + } + } + }; + timer.scheduleAtFixedRate(_cleanup, 0, FULL_CLEANUP_FREQ); + } + } + + /** + * @return all keys that are available for reading. + * @throws IOException on any error. + */ + public Iterator<String> listKeys() throws IOException { + return new KeyInHashDirIterator(); + } + + /** + * Get an input stream for reading a part. + * + * @param key the key of the part to read. + * @return the where to read the data from. + * @throws IOException on any error + */ + public BlobStoreFile read(String key) throws IOException { + return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf); + } + + /** + * Get an object tied to writing the data. + * + * @param key the key of the part to write to. + * @param create whether the file needs to be new or not. + * @return an object that can be used to both write to, but also commit/cancel the operation. + * @throws IOException on any error + */ + public BlobStoreFile write(String key, boolean create) throws IOException { + return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf); + } + + /** + * Check if the key exists in the blob store. + * + * @param key the key to check for + * @return true if it exists else false. + */ + public boolean exists(String key) { + Path dir = getKeyDir(key); + boolean res = false; + try { + _fs = dir.getFileSystem(_hadoopConf); + res = _fs.exists(dir); + } catch (IOException e) { + LOG.warn("Exception checking for exists on: " + key); + } + return res; + } + + /** + * Delete a key from the blob store + * + * @param key the key to delete + * @throws IOException on any error + */ + public void deleteKey(String key) throws IOException { + Path keyDir = getKeyDir(key); + HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA, + _hadoopConf); + pf.delete(); + delete(keyDir); + } + + protected Path getKeyDir(String key) { + String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS); + Path hashDir = new Path(_fullPath, hash); + + Path ret = new Path(hashDir, key); + LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash}); + return ret; + } + + public void fullCleanup(long age) throws IOException { + long cleanUpIfBefore = System.currentTimeMillis() - age; + Iterator<String> keys = new KeyInHashDirIterator(); + while (keys.hasNext()) { + String key = keys.next(); + Path keyDir = getKeyDir(key); + Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir); + if (!i.hasNext()) { + //The dir is empty, so try to delete it, may fail, but that is OK + try { + _fs.delete(keyDir, true); + } catch (Exception e) { + LOG.warn("Could not delete " + keyDir + " will try again later"); + } + } + while (i.hasNext()) { + BlobStoreFile f = i.next(); + if (f.isTmp()) { + if (f.getModTime() <= cleanUpIfBefore) { + f.delete(); + } + } + } + } + } + + protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException { + ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>(); + FileStatus[] files = _fs.listStatus(new Path[]{path}); + if (files != null) { + for (FileStatus sub : files) { + try { + ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(), + _hadoopConf)); + } catch (IllegalArgumentException e) { + //Ignored the file did not match + LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName()); + } + } + } + return ret.iterator(); + } + + protected Iterator<String> listKeys(Path path) throws IOException { + ArrayList<String> ret = new ArrayList<String>(); + FileStatus[] files = _fs.listStatus(new Path[]{path}); + if (files != null) { + for (FileStatus sub : files) { + try { + ret.add(sub.getPath().getName().toString()); + } catch (IllegalArgumentException e) { + //Ignored the file did not match + LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName()); + } + } + } + return ret.iterator(); + } + + protected int getBlobReplication(String key) throws IOException { + Path path = getKeyDir(key); + Path dest = new Path(path, BLOBSTORE_DATA); + return _fs.getFileStatus(dest).getReplication(); + } + + protected int updateBlobReplication(String key, int replication) throws IOException { + Path path = getKeyDir(key); + Path dest = new Path(path, BLOBSTORE_DATA); + _fs.setReplication(dest, (short) replication); + return _fs.getFileStatus(dest).getReplication(); + } + + protected void delete(Path path) throws IOException { + _fs.delete(path, true); + } + + public void shutdown() { + if (_cleanup != null) { + _cleanup.cancel(); + _cleanup = null; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java new file mode 100644 index 0000000..ec17dae --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.ClientBlobStore; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.utils.NimbusClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map; + +/** + * Client to access the HDFS blobStore. At this point, this is meant to only be used by the + * supervisor. Don't trust who the client says they are so pass null for all Subjects. + */ +public class HdfsClientBlobStore extends ClientBlobStore { + private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class); + private HdfsBlobStore _blobStore; + private Map _conf; + + @Override + public void prepare(Map conf) { + this._conf = conf; + _blobStore = new HdfsBlobStore(); + _blobStore.prepare(conf, null, null); + } + + @Override + public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) + throws AuthorizationException, KeyAlreadyExistsException { + return _blobStore.createBlob(key, meta, null); + } + + @Override + public AtomicOutputStream updateBlob(String key) + throws AuthorizationException, KeyNotFoundException { + return _blobStore.updateBlob(key, null); + } + + @Override + public ReadableBlobMeta getBlobMeta(String key) + throws AuthorizationException, KeyNotFoundException { + return _blobStore.getBlobMeta(key, null); + } + + @Override + public void setBlobMetaToExtend(String key, SettableBlobMeta meta) + throws AuthorizationException, KeyNotFoundException { + _blobStore.setBlobMeta(key, meta, null); + } + + @Override + public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException { + _blobStore.deleteBlob(key, null); + } + + @Override + public InputStreamWithMeta getBlob(String key) + throws AuthorizationException, KeyNotFoundException { + return _blobStore.getBlob(key, null); + } + + @Override + public Iterator<String> listKeys() { + return _blobStore.listKeys(); + } + + @Override + public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException { + return _blobStore.getBlobReplication(key, null); + } + + @Override + public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException { + return _blobStore.updateBlobReplication(key, replication, null); + } + + @Override + public boolean setClient(Map conf, NimbusClient client) { + return true; + } + + @Override + public void createStateInZookeeper(String key) { + // Do nothing + } + + @Override + public void shutdown() { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java deleted file mode 100644 index 1e38051..0000000 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.hdfs.ha.codedistributor; - -import backtype.storm.codedistributor.ICodeDistributor; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.Validate; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.List; -import java.util.Map; - -public class HDFSCodeDistributor implements ICodeDistributor { - private static final Logger LOG = LoggerFactory.getLogger(HDFSCodeDistributor.class); - - private final static String HDFS_STORM_DIR = "hdfs.storm.dir"; - - private FileSystem fs; - private Path stormDir; - - @Override - public void prepare(Map conf) throws Exception { - Validate.notNull(conf.get(HDFS_STORM_DIR), "you must specify " + HDFS_STORM_DIR); - - Configuration configuration = new Configuration(); - HdfsSecurityUtil.login(conf, configuration); - this.fs = FileSystem.get(configuration); - this.stormDir = new Path(String.valueOf(conf.get(HDFS_STORM_DIR))); - if(!this.fs.exists(stormDir)) { - this.fs.mkdirs(this.stormDir); - } - } - - @Override - public File upload(String dirPath, String topologyId) throws Exception { - File localStormDir = new File(dirPath); - LOG.info("Copying the storm code from directory: {} to {}{}{}", localStormDir.getAbsolutePath(), - stormDir.toString(), Path.SEPARATOR , topologyId); - - File[] files = localStormDir.listFiles(); - - Path hdfsDestPath = new Path(stormDir, new Path(topologyId)); - fs.mkdirs(hdfsDestPath); - - for(File file : files) { - fs.copyFromLocalFile(new Path(file.getAbsolutePath()), hdfsDestPath); - } - - File file = new File(dirPath, "storm-code-distributor.meta"); - - RemoteIterator<LocatedFileStatus> hdfsFileIterator = fs.listFiles(hdfsDestPath, false); - - BufferedWriter writer = new BufferedWriter(new FileWriter(file)); - while(hdfsFileIterator.hasNext()) { - writer.write(hdfsFileIterator.next().getPath().toString()); - writer.newLine(); - } - writer.close(); - - return file; - } - - @Override - public List<File> download(String topologyId, File metaFile) throws Exception { - File destDir = metaFile.getParentFile(); - - List<String> hdfsPaths = IOUtils.readLines(new FileInputStream(metaFile)); - for(String hdfsFilePath : hdfsPaths) { - fs.copyToLocalFile(new Path(hdfsFilePath), new Path(destDir.getAbsolutePath())); - } - - return Lists.newArrayList(destDir.listFiles()); - } - - @Override - public short getReplicationCount(String topologyId) throws IOException { - Path hdfsDestPath = new Path(stormDir, new Path(topologyId)); - if(fs.exists(hdfsDestPath)) { - FileStatus fileStatus = fs.getFileStatus(hdfsDestPath); - return fileStatus.getReplication(); - } else { - LOG.warn("getReplicationCount called for {} but no such directory exists, returning 0", topologyId); - return 0; - } - } - - @Override - public void cleanup(String topologyId) throws IOException { - Path hdfsDestPath = new Path(stormDir, new Path(topologyId)); - fs.delete(hdfsDestPath, true); - } - - @Override - public void close(Map conf) { - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java new file mode 100644 index 0000000..a8d6172 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.Config; +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.BlobStore; +import backtype.storm.blobstore.BlobStoreAclHandler; +import backtype.storm.generated.AccessControl; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.AccessControlType; + +import backtype.storm.security.auth.NimbusPrincipal; +import backtype.storm.security.auth.SingleUserPrincipal; +import backtype.storm.utils.Utils; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; +import java.util.HashSet; +import java.util.Set; +import java.util.Iterator; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +public class BlobStoreTest { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); + protected static MiniDFSCluster dfscluster = null; + protected static Configuration hadoopConf = null; + URI base; + File baseFile; + private static Map conf = new HashMap(); + public static final int READ = 0x01; + public static final int WRITE = 0x02; + public static final int ADMIN = 0x04; + + @Before + public void init() { + initializeConfigs(); + baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID()); + base = baseFile.toURI(); + } + + @After + public void cleanup() + throws IOException { + FileUtils.deleteDirectory(baseFile); + } + + @AfterClass + public static void cleanupAfterClass() throws IOException { + if (dfscluster != null) { + dfscluster.shutdown(); + } + } + + // Method which initializes nimbus admin + public static void initializeConfigs() { + conf.put(Config.NIMBUS_ADMINS,"admin"); + conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); + } + + //Gets Nimbus Subject with NimbusPrincipal set on it + public static Subject getNimbusSubject() { + Subject nimbus = new Subject(); + nimbus.getPrincipals().add(new NimbusPrincipal()); + return nimbus; + } + + // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization + public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + Set<String> expected = new HashSet<String>(Arrays.asList(keys)); + Set<String> found = new HashSet<String>(); + Iterator<String> c = store.listKeys(); + while (c.hasNext()) { + String keyName = c.next(); + found.add(keyName); + } + Set<String> extra = new HashSet<String>(found); + extra.removeAll(expected); + assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty()); + Set<String> missing = new HashSet<String>(expected); + missing.removeAll(found); + assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty()); + } + + public static void assertStoreHasExactly(BlobStore store, String ... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + assertStoreHasExactly(store, null, keys); + } + + // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) + public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException { + InputStream in = store.getBlob(key, who); + try { + return in.read(); + } finally { + in.close(); + } + } + + public static int readInt(BlobStore store, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + return readInt(store, null, key); + } + + public static void readAssertEquals(BlobStore store, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, key)); + } + + // Checks for assertion when we turn on security + public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, who, key)); + } + + private HdfsBlobStore initHdfs(String dirName) + throws Exception { + if (hadoopConf == null) { + hadoopConf = new Configuration(); + } + try { + if (dfscluster == null) { + dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build(); + dfscluster.waitActive(); + } + } catch (IOException e) { + LOG.error("error creating MiniDFSCluster"); + } + Map conf = new HashMap(); + conf.put(Config.BLOBSTORE_DIR, dirName); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal"); + conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3); + HdfsBlobStore store = new HdfsBlobStore(); + store.prepareInternal(conf, null, dfscluster.getConfiguration(0)); + return store; + } + + @Test + public void testHdfsReplication() + throws Exception { + BlobStore store = initHdfs("/storm/blobstoreReplication"); + testReplication("/storm/blobstoreReplication/test", store); + } + + @Test + public void testBasicHdfs() + throws Exception { + testBasic(initHdfs("/storm/blobstore1")); + } + + @Test + public void testMultipleHdfs() + throws Exception { + // use different blobstore dir so it doesn't conflict with other test + testMultiple(initHdfs("/storm/blobstore2")); + } + + @Test + public void testHdfsWithAuth() + throws Exception { + // use different blobstore dir so it doesn't conflict with other tests + testWithAuthentication(initHdfs("/storm/blobstore3")); + } + + // Test for replication. + public void testReplication(String path, BlobStore store) + throws Exception { + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + metadata.set_replication_factor(4); + AtomicOutputStream out = store.createBlob("test", metadata, null); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4); + store.deleteBlob("test", null); + + //Test for replication with NIMBUS as user + Subject admin = getSubject("admin"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + metadata.set_replication_factor(4); + out = store.createBlob("test", metadata, admin); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4); + store.updateBlobReplication("test", 5, admin); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5); + store.deleteBlob("test", admin); + + //Test for replication using SUPERVISOR access + Subject supervisor = getSubject("supervisor"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + metadata.set_replication_factor(4); + out = store.createBlob("test", metadata, supervisor); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4); + store.updateBlobReplication("test", 5, supervisor); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5); + store.deleteBlob("test", supervisor); + + //Test for a user having read or write or admin access to read replication for a blob + String createSubject = "createSubject"; + String writeSubject = "writeSubject"; + String adminSubject = "adminSubject"; + Subject who = getSubject(createSubject); + AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ); + AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN); + writeAccess.set_name(writeSubject); + adminAccess.set_name(adminSubject); + List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess); + metadata = new SettableBlobMeta(acl); + metadata.set_replication_factor(4); + out = store.createBlob("test", metadata, who); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + who = getSubject(writeSubject); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4); + + //Test for a user having WRITE or ADMIN privileges to change replication of a blob + who = getSubject(adminSubject); + store.updateBlobReplication("test", 5, who); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5); + store.deleteBlob("test", getSubject(createSubject)); + } + + public Subject getSubject(String name) { + Subject subject = new Subject(); + SingleUserPrincipal user = new SingleUserPrincipal(name); + subject.getPrincipals().add(user); + return subject; + } + + // Check for Blobstore with authentication + public void testWithAuthentication(BlobStore store) + throws Exception { + //Test for Nimbus Admin + Subject admin = getSubject("admin"); + assertStoreHasExactly(store); + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + AtomicOutputStream out = store.createBlob("test", metadata, admin); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", admin); + + //Test for Supervisor Admin + Subject supervisor = getSubject("supervisor"); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, supervisor); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", supervisor); + + //Test for Nimbus itself as a user + Subject nimbus = getNimbusSubject(); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, nimbus); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", nimbus); + + // Test with a dummy test_subject for cases where subject !=null (security turned on) + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + out = store.createBlob("test", metadata, who); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls are not set for the blob (DEFAULT) + LOG.info("Creating test again"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because + // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have + // complete access to the blob + assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 2); + + LOG.info("Updating test"); + out = store.updateBlob("test", who); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", who); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + //Test for subject with no principals and acls set to WORLD_EVERYTHING + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test"); + out = store.createBlob("test-empty-subject-WE", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test-empty-subject-WE", "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); + + //Test for subject with no principals and acls set to DEFAULT + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating other"); + out = store.createBlob("test-empty-subject-DEF", metadata, who); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + try { + out.close(); + } catch (IOException e) { + //This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } + + public void testBasic(BlobStore store) + throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING); + AtomicOutputStream out = store.createBlob("test", metadata, null); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEquals(store, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store); + + // The following tests are run for both hdfs and local store to test the + // update blob interface + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test again"); + out = store.createBlob("test", metadata, null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 2); + LOG.info("Updating test"); + out = store.updateBlob("test", null); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", null); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + try { + out.close(); + } catch (IOException e) { + //This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } + + + public void testMultiple(BlobStore store) + throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 1); + + LOG.info("Creating other"); + out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 2); + + LOG.info("Updating other"); + out = store.updateBlob("other", null); + out.write(5); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store, "other"); + readAssertEquals(store, "other", 5); + + LOG.info("Creating test again"); + out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null); + out.write(2); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 2); + readAssertEquals(store, "other", 5); + + LOG.info("Updating test"); + out = store.updateBlob("test", null); + out.write(3); + out.close(); + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 3); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting other"); + store.deleteBlob("other", null); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + out = store.updateBlob("test", null); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + try { + out.close(); + } catch (IOException e) { + //This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java new file mode 100644 index 0000000..cf2f7c1 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.blobstore.BlobStoreFile; +import backtype.storm.generated.SettableBlobMeta; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.*; + +public class HdfsBlobStoreImplTest { + private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class); + + protected static Configuration hadoopConf; + protected static MiniDFSCluster dfscluster; + // key dir needs to be number 0 to number of buckets, choose one so we know where to look + private static String KEYDIR = "0"; + private Path blobDir = new Path("/storm/blobstore1"); + private Path fullKeyDir = new Path(blobDir, KEYDIR); + private String BLOBSTORE_DATA = "data"; + + public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl { + + public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException { + super(path, conf); + } + + public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf, + Configuration hconf) throws IOException { + super(path, conf, hconf); + } + + protected Path getKeyDir(String key) { + return new Path(new Path(blobDir, KEYDIR), key); + } + } + + @BeforeClass + public static void init() { + if (hadoopConf == null) { + hadoopConf = new Configuration(); + } + try { + if (dfscluster == null) { + dfscluster = new MiniDFSCluster.Builder(hadoopConf).build(); + dfscluster.waitActive(); + } + } catch (IOException e) { + LOG.error("error creating MiniDFSCluster"); + } + } + + @AfterClass + public static void cleanup() throws IOException { + if (dfscluster != null) { + dfscluster.shutdown(); + } + } + + // Be careful about adding additional tests as the dfscluster will be shared + + @Test + public void testMultiple() throws Exception { + String testString = "testingblob"; + String validKey = "validkeyBasic"; + + FileSystem fs = dfscluster.getFileSystem(); + Map conf = new HashMap(); + + TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf); + // should have created blobDir + assertTrue("BlobStore dir wasn't created", fs.exists(blobDir)); + assertEquals("BlobStore dir was created with wrong permissions", + HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission()); + + // test exist with non-existent key + assertFalse("file exists but shouldn't", hbs.exists("bogus")); + + // test write + BlobStoreFile pfile = hbs.write(validKey, false); + // Adding metadata to avoid null pointer exception + SettableBlobMeta meta = new SettableBlobMeta(); + meta.set_replication_factor(1); + pfile.setMetadata(meta); + OutputStream ios = pfile.getOutputStream(); + ios.write(testString.getBytes(Charset.forName("UTF-8"))); + ios.close(); + + // test commit creates properly + assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); + pfile.commit(); + Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA); + assertTrue("blob data not committed", fs.exists(dataFile)); + assertEquals("BlobStore dir was created with wrong permissions", + HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission()); + assertTrue("key doesn't exist but should", hbs.exists(validKey)); + + // test read + BlobStoreFile readpFile = hbs.read(validKey); + String readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); + assertEquals("string read from blob doesn't match", testString, readString); + + // test listkeys + Iterator<String> keys = hbs.listKeys(); + assertTrue("blob has one key", keys.hasNext()); + assertEquals("one key in blobstore", validKey, keys.next()); + + // delete + hbs.deleteKey(validKey); + assertFalse("key not deleted", fs.exists(dataFile)); + assertFalse("key not deleted", hbs.exists(validKey)); + + // Now do multiple + String testString2 = "testingblob2"; + String validKey2= "validkey2"; + + // test write + pfile = hbs.write(validKey, false); + pfile.setMetadata(meta); + ios = pfile.getOutputStream(); + ios.write(testString.getBytes(Charset.forName("UTF-8"))); + ios.close(); + + // test commit creates properly + assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); + pfile.commit(); + assertTrue("blob data not committed", fs.exists(dataFile)); + assertEquals("BlobStore dir was created with wrong permissions", + HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission()); + assertTrue("key doesn't exist but should", hbs.exists(validKey)); + + // test write again + pfile = hbs.write(validKey2, false); + pfile.setMetadata(meta); + OutputStream ios2 = pfile.getOutputStream(); + ios2.write(testString2.getBytes(Charset.forName("UTF-8"))); + ios2.close(); + + // test commit second creates properly + pfile.commit(); + Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA); + assertTrue("blob data not committed", fs.exists(dataFile2)); + assertEquals("BlobStore dir was created with wrong permissions", + HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission()); + assertTrue("key doesn't exist but should", hbs.exists(validKey2)); + + // test listkeys + keys = hbs.listKeys(); + int total = 0; + boolean key1Found = false; + boolean key2Found = false; + while(keys.hasNext()) { + total++; + String key = keys.next(); + if (key.equals(validKey)) { + key1Found = true; + } else if (key.equals(validKey2)) { + key2Found = true; + } else { + fail("Found key that wasn't expected: " + key); + } + } + assertEquals("number of keys is wrong", 2, total); + assertTrue("blobstore missing key1", key1Found); + assertTrue("blobstore missing key2", key2Found); + + // test read + readpFile = hbs.read(validKey); + readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); + assertEquals("string read from blob doesn't match", testString, readString); + + // test read + readpFile = hbs.read(validKey2); + readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); + assertEquals("string read from blob doesn't match", testString2, readString); + + hbs.deleteKey(validKey); + assertFalse("key not deleted", hbs.exists(validKey)); + hbs.deleteKey(validKey2); + assertFalse("key not deleted", hbs.exists(validKey2)); + } + + @Test + public void testGetFileLength() throws IOException { + FileSystem fs = dfscluster.getFileSystem(); + Map conf = new HashMap(); + String validKey = "validkeyBasic"; + String testString = "testingblob"; + TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf); + BlobStoreFile pfile = hbs.write(validKey, false); + // Adding metadata to avoid null pointer exception + SettableBlobMeta meta = new SettableBlobMeta(); + meta.set_replication_factor(1); + pfile.setMetadata(meta); + OutputStream ios = pfile.getOutputStream(); + ios.write(testString.getBytes(Charset.forName("UTF-8"))); + ios.close(); + assertEquals(testString.getBytes(Charset.forName("UTF-8")).length, pfile.getFileLength()); + } +} \ No newline at end of file
