This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 6549a51 STORM-3418: Fix checkstyle violations in storm-hdfs-blobstore new bba702e Merge pull request #3042 from krichter722/checkstyle-hdfs-blobstore 6549a51 is described below commit 6549a5107137a79585f5b81ab61e2b7b5df92aee Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Sat Jun 15 20:32:25 2019 +0200 STORM-3418: Fix checkstyle violations in storm-hdfs-blobstore --- external/storm-hdfs-blobstore/pom.xml | 2 +- .../apache/storm/hdfs/blobstore/HdfsBlobStore.java | 45 ++++---- .../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 122 +++++++++++---------- .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 102 ++++++++--------- .../storm/hdfs/blobstore/HdfsClientBlobStore.java | 51 ++++----- .../apache/storm/blobstore/ClientBlobStore.java | 1 - .../storm/blobstore/ClientBlobStoreTest.java | 1 - 7 files changed, 164 insertions(+), 160 deletions(-) diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml index a369ebe..e2b948b 100644 --- a/external/storm-hdfs-blobstore/pom.xml +++ b/external/storm-hdfs-blobstore/pom.xml @@ -255,7 +255,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>80</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java index 814c5d4..da918c4 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java @@ -18,6 +18,10 @@ package org.apache.storm.hdfs.blobstore; +import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN; +import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; +import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE; + import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -50,20 +54,18 @@ import org.apache.storm.utils.WrappedKeyNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.storm.blobstore.BlobStoreAclHandler.*; - /** * Provides a HDFS file system backed blob store implementation. * Note that this provides an api for having HDFS be the backing store for the blobstore, * it is not a service/daemon. * - * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN + * <p>We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus. * - * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER + * <p>The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER * who has read, write or admin privileges in order to perform respective operations on the blob. * - * For hdfs blob store + * <p>For hdfs blob store * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike * local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs. * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. @@ -191,23 +193,23 @@ public class HdfsBlobStore extends BlobStore { if (hbs.exists(DATA_PREFIX + key)) { throw new WrappedKeyAlreadyExistsException(key); } - BlobStoreFileOutputStream mOut = null; + BlobStoreFileOutputStream outputStream = null; try { BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true); metaFile.setMetadata(meta); - mOut = new BlobStoreFileOutputStream(metaFile); - mOut.write(Utils.thriftSerialize(meta)); - mOut.close(); - mOut = null; + outputStream = new BlobStoreFileOutputStream(metaFile); + outputStream.write(Utils.thriftSerialize(meta)); + outputStream.close(); + outputStream = null; BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true); dataFile.setMetadata(meta); return new BlobStoreFileOutputStream(dataFile); } catch (IOException e) { throw new RuntimeException(e); } finally { - if (mOut != null) { + if (outputStream != null) { try { - mOut.cancel(); + outputStream.cancel(); } catch (IOException e) { //Ignored } @@ -281,9 +283,9 @@ public class HdfsBlobStore extends BlobStore { } /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. * - * @param leaderElector + * @param leaderElector the leader elector */ @Override public void setLeaderElector(ILeaderElector leaderElector) { @@ -302,7 +304,6 @@ public class HdfsBlobStore extends BlobStore { BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); SettableBlobMeta orig = getStoredBlobMeta(key); aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); - BlobStoreFileOutputStream mOut = null; writeMetadata(key, meta); } @@ -379,20 +380,20 @@ public class HdfsBlobStore extends BlobStore { public void writeMetadata(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { - BlobStoreFileOutputStream mOut = null; + BlobStoreFileOutputStream outputStream = null; try { BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false); hdfsFile.setMetadata(meta); - mOut = new BlobStoreFileOutputStream(hdfsFile); - mOut.write(Utils.thriftSerialize(meta)); - mOut.close(); - mOut = null; + outputStream = new BlobStoreFileOutputStream(hdfsFile); + outputStream.write(Utils.thriftSerialize(meta)); + outputStream.close(); + outputStream = null; } catch (IOException exp) { throw new RuntimeException(exp); } finally { - if (mOut != null) { + if (outputStream != null) { try { - mOut.cancel(); + outputStream.cancel(); } catch (IOException e) { //Ignored } diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java index 3021e66..564a1e3 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java @@ -15,8 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.blobstore; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.regex.Matcher; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; @@ -28,87 +34,83 @@ import org.apache.storm.generated.SettableBlobMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.regex.Matcher; - public class HdfsBlobStoreFile extends BlobStoreFile { - public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class); - - private final String _key; - private final boolean _isTmp; - private final Path _path; - private Long _modTime = null; - private final boolean _mustBeNew; - private final Configuration _hadoopConf; - private final FileSystem _fs; - private SettableBlobMeta meta; // files are world-wide readable and owner writable - final public static FsPermission BLOBSTORE_FILE_PERMISSION = + public static final FsPermission BLOBSTORE_FILE_PERMISSION = FsPermission.createImmutable((short) 0644); // rw-r--r-- + private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class); + + private final String key; + private final boolean isTmp; + private final Path path; + private Long modTime = null; + private final boolean mustBeNew; + private final Configuration hadoopConf; + private final FileSystem fileSystem; + private SettableBlobMeta settableBlobMeta; + public HdfsBlobStoreFile(Path base, String name, Configuration hconf) { if (BLOBSTORE_DATA_FILE.equals(name)) { - _isTmp = false; + isTmp = false; } else { Matcher m = TMP_NAME_PATTERN.matcher(name); if (!m.matches()) { - throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN); + throw new IllegalArgumentException("File name does not match '" + name + "' !~ " + TMP_NAME_PATTERN); } - _isTmp = true; + isTmp = true; } - _hadoopConf = hconf; - _key = base.getName(); - _path = new Path(base, name); - _mustBeNew = false; + hadoopConf = hconf; + key = base.getName(); + path = new Path(base, name); + mustBeNew = false; try { - _fs = _path.getFileSystem(_hadoopConf); + fileSystem = path.getFileSystem(hadoopConf); } catch (IOException e) { - throw new RuntimeException("Error getting filesystem for path: " + _path, e); + throw new RuntimeException("Error getting filesystem for path: " + path, e); } } public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) { - _key = base.getName(); - _hadoopConf = hconf; - _isTmp = isTmp; - _mustBeNew = mustBeNew; - if (_isTmp) { - _path = new Path(base, System.currentTimeMillis()+TMP_EXT); + key = base.getName(); + hadoopConf = hconf; + this.isTmp = isTmp; + this.mustBeNew = mustBeNew; + if (this.isTmp) { + path = new Path(base, System.currentTimeMillis() + TMP_EXT); } else { - _path = new Path(base, BLOBSTORE_DATA_FILE); + path = new Path(base, BLOBSTORE_DATA_FILE); } try { - _fs = _path.getFileSystem(_hadoopConf); + fileSystem = path.getFileSystem(hadoopConf); } catch (IOException e) { - throw new RuntimeException("Error getting filesystem for path: " + _path, e); + throw new RuntimeException("Error getting filesystem for path: " + path, e); } } @Override public void delete() throws IOException { - _fs.delete(_path, true); + fileSystem.delete(path, true); } @Override public boolean isTmp() { - return _isTmp; + return isTmp; } @Override public String getKey() { - return _key; + return key; } @Override public long getModTime() throws IOException { - if (_modTime == null) { - FileSystem fs = _path.getFileSystem(_hadoopConf); - _modTime = fs.getFileStatus(_path).getModificationTime(); + if (modTime == null) { + FileSystem fs = path.getFileSystem(hadoopConf); + modTime = fs.getFileStatus(path).getModificationTime(); } - return _modTime; + return modTime; } private void checkIsNotTmp() { @@ -126,7 +128,7 @@ public class HdfsBlobStoreFile extends BlobStoreFile { @Override public InputStream getInputStream() throws IOException { checkIsTmp(); - return _fs.open(_path); + return fileSystem.open(path); } @Override @@ -135,21 +137,21 @@ public class HdfsBlobStoreFile extends BlobStoreFile { OutputStream out = null; FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION); try { - out = _fs.create(_path, (short)this.getMetadata().get_replication_factor()); - _fs.setPermission(_path, fileperms); - _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor()); + out = fileSystem.create(path, (short)this.getMetadata().get_replication_factor()); + fileSystem.setPermission(path, fileperms); + fileSystem.setReplication(path, (short)this.getMetadata().get_replication_factor()); } catch (IOException e) { //Try to create the parent directory, may not work FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION); - if (!_fs.mkdirs(_path.getParent(), dirperms)) { - LOG.warn("error creating parent dir: " + _path.getParent()); + if (!fileSystem.mkdirs(path.getParent(), dirperms)) { + LOG.warn("error creating parent dir: " + path.getParent()); } - out = _fs.create(_path, (short)this.getMetadata().get_replication_factor()); - _fs.setPermission(_path, dirperms); - _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor()); + out = fileSystem.create(path, (short)this.getMetadata().get_replication_factor()); + fileSystem.setPermission(path, dirperms); + fileSystem.setReplication(path, (short)this.getMetadata().get_replication_factor()); } if (out == null) { - throw new IOException("Error in creating: " + _path); + throw new IOException("Error in creating: " + path); } return out; } @@ -158,12 +160,12 @@ public class HdfsBlobStoreFile extends BlobStoreFile { public void commit() throws IOException { checkIsNotTmp(); // FileContext supports atomic rename, whereas FileSystem doesn't - FileContext fc = FileContext.getFileContext(_hadoopConf); - Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE); - if (_mustBeNew) { - fc.rename(_path, dest); + FileContext fc = FileContext.getFileContext(hadoopConf); + Path dest = new Path(path.getParent(), BLOBSTORE_DATA_FILE); + if (mustBeNew) { + fc.rename(path, dest); } else { - fc.rename(_path, dest, Options.Rename.OVERWRITE); + fc.rename(path, dest, Options.Rename.OVERWRITE); } // Note, we could add support for setting the replication factor } @@ -176,21 +178,21 @@ public class HdfsBlobStoreFile extends BlobStoreFile { @Override public String toString() { - return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key; + return path + ":" + (isTmp ? "tmp" : BlobStoreFile.BLOBSTORE_DATA_FILE) + ":" + key; } @Override public long getFileLength() throws IOException { - return _fs.getFileStatus(_path).getLen(); + return fileSystem.getFileStatus(path).getLen(); } @Override public SettableBlobMeta getMetadata() { - return meta; + return settableBlobMeta; } @Override public void setMetadata(SettableBlobMeta meta) { - this.meta = meta; + this.settableBlobMeta = meta; } } diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java index 8a97572..e197c39 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java @@ -42,6 +42,11 @@ import org.slf4j.LoggerFactory; * HDFS blob store impl. */ public class HdfsBlobStoreImpl { + + // blobstore directory is private! + public static final FsPermission BLOBSTORE_DIR_PERMISSION = + FsPermission.createImmutable((short) 0700); // rwx-------- + private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class); private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000L; @@ -50,6 +55,10 @@ public class HdfsBlobStoreImpl { private Timer timer; + private Path fullPath; + private FileSystem fileSystem; + private Configuration hadoopConf; + public class KeyInHashDirIterator implements Iterator<String> { private int currentBucket = 0; private Iterator<String> it = null; @@ -62,7 +71,7 @@ public class HdfsBlobStoreImpl { private void primeNext() throws IOException { while (it == null && currentBucket < BUCKETS) { String name = String.valueOf(currentBucket); - Path dir = new Path(_fullPath, name); + Path dir = new Path(fullPath, name); try { it = listKeys(dir); } catch (FileNotFoundException e) { @@ -111,15 +120,6 @@ public class HdfsBlobStoreImpl { } } - - private Path _fullPath; - private FileSystem _fs; - private Configuration _hadoopConf; - - // blobstore directory is private! - final public static FsPermission BLOBSTORE_DIR_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx-------- - public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException { this(path, conf, new Configuration()); } @@ -127,15 +127,15 @@ public class HdfsBlobStoreImpl { public HdfsBlobStoreImpl(Path path, Map<String, Object> conf, Configuration hconf) throws IOException { LOG.debug("Blob store based in {}", path); - _fullPath = path; - _hadoopConf = hconf; - _fs = path.getFileSystem(_hadoopConf); + fullPath = path; + hadoopConf = hconf; + fileSystem = path.getFileSystem(hadoopConf); - if (!_fs.exists(_fullPath)) { + if (!fileSystem.exists(fullPath)) { FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION); - boolean success = _fs.mkdirs(_fullPath, perms); + boolean success = fileSystem.mkdirs(fullPath, perms); if (!success) { - throw new IOException("Error creating blobstore directory: " + _fullPath); + throw new IOException("Error creating blobstore directory: " + fullPath); } } @@ -158,22 +158,40 @@ public class HdfsBlobStoreImpl { } /** - * @return all keys that are available for reading. - * @throws IOException on any error. + * List relevant keys. + * + * @return all keys that are available for reading + * @throws IOException on any error */ public Iterator<String> listKeys() throws IOException { return new KeyInHashDirIterator(); } + protected Iterator<String> listKeys(Path path) throws IOException { + ArrayList<String> ret = new ArrayList<String>(); + FileStatus[] files = fileSystem.listStatus(new Path[]{path}); + if (files != null) { + for (FileStatus sub : files) { + try { + ret.add(sub.getPath().getName().toString()); + } catch (IllegalArgumentException e) { + //Ignored the file did not match + LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName()); + } + } + } + return ret.iterator(); + } + /** * Get an input stream for reading a part. * - * @param key the key of the part to read. - * @return the where to read the data from. + * @param key the key of the part to read + * @return the where to read the data from * @throws IOException on any error */ public BlobStoreFile read(String key) throws IOException { - return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf); + return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, hadoopConf); } /** @@ -185,7 +203,7 @@ public class HdfsBlobStoreImpl { * @throws IOException on any error */ public BlobStoreFile write(String key, boolean create) throws IOException { - return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf); + return new HdfsBlobStoreFile(getKeyDir(key), true, create, hadoopConf); } /** @@ -198,8 +216,8 @@ public class HdfsBlobStoreImpl { Path dir = getKeyDir(key); boolean res = false; try { - _fs = dir.getFileSystem(_hadoopConf); - res = _fs.exists(dir); + fileSystem = dir.getFileSystem(hadoopConf); + res = fileSystem.exists(dir); } catch (IOException e) { LOG.warn("Exception checking for exists on: " + key); } @@ -215,17 +233,17 @@ public class HdfsBlobStoreImpl { public void deleteKey(String key) throws IOException { Path keyDir = getKeyDir(key); HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA, - _hadoopConf); + hadoopConf); pf.delete(); delete(keyDir); } protected Path getKeyDir(String key) { String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS); - Path hashDir = new Path(_fullPath, hash); + Path hashDir = new Path(fullPath, hash); Path ret = new Path(hashDir, key); - LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash}); + LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, hash}); return ret; } @@ -239,7 +257,7 @@ public class HdfsBlobStoreImpl { if (!i.hasNext()) { //The dir is empty, so try to delete it, may fail, but that is OK try { - _fs.delete(keyDir, true); + fileSystem.delete(keyDir, true); } catch (Exception e) { LOG.warn("Could not delete " + keyDir + " will try again later"); } @@ -257,12 +275,12 @@ public class HdfsBlobStoreImpl { protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException { ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>(); - FileStatus[] files = _fs.listStatus(new Path[]{path}); + FileStatus[] files = fileSystem.listStatus(new Path[]{path}); if (files != null) { for (FileStatus sub : files) { try { ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(), - _hadoopConf)); + hadoopConf)); } catch (IllegalArgumentException e) { //Ignored the file did not match LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName()); @@ -272,37 +290,21 @@ public class HdfsBlobStoreImpl { return ret.iterator(); } - protected Iterator<String> listKeys(Path path) throws IOException { - ArrayList<String> ret = new ArrayList<String>(); - FileStatus[] files = _fs.listStatus(new Path[]{path}); - if (files != null) { - for (FileStatus sub : files) { - try { - ret.add(sub.getPath().getName().toString()); - } catch (IllegalArgumentException e) { - //Ignored the file did not match - LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName()); - } - } - } - return ret.iterator(); - } - protected int getBlobReplication(String key) throws IOException { Path path = getKeyDir(key); Path dest = new Path(path, BLOBSTORE_DATA); - return _fs.getFileStatus(dest).getReplication(); + return fileSystem.getFileStatus(dest).getReplication(); } protected int updateBlobReplication(String key, int replication) throws IOException { Path path = getKeyDir(key); Path dest = new Path(path, BLOBSTORE_DATA); - _fs.setReplication(dest, (short) replication); - return _fs.getFileStatus(dest).getReplication(); + fileSystem.setReplication(dest, (short) replication); + return fileSystem.getFileStatus(dest).getReplication(); } protected void delete(Path path) throws IOException { - _fs.delete(path, true); + fileSystem.delete(path, true); } public void shutdown() { diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java index 419cf99..ce159d5 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java @@ -15,8 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.blobstore; +import java.util.Iterator; +import java.util.Map; + import org.apache.storm.blobstore.AtomicOutputStream; import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.InputStreamWithMeta; @@ -29,53 +33,50 @@ import org.apache.storm.utils.NimbusClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.Map; - /** - * Client to access the HDFS blobStore. At this point, this is meant to only be used by the - * supervisor. Don't trust who the client says they are so pass null for all Subjects. + * Client to access the HDFS blobStore. At this point, this is meant to only be used by the + * supervisor. Don't trust who the client says they are so pass null for all Subjects. * - * The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects - * based on what hadoop says who the users are. These users must be configured accordingly - * in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs. - * This API is only used by the supervisor in order to talk directly to HDFS. + * <p>The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects + * based on what hadoop says who the users are. These users must be configured accordingly + * in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs. + * This API is only used by the supervisor in order to talk directly to HDFS. */ public class HdfsClientBlobStore extends ClientBlobStore { private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class); - private HdfsBlobStore _blobStore; - private Map _conf; + private HdfsBlobStore blobStore; + private Map conf; private NimbusClient client; @Override public void prepare(Map<String, Object> conf) { - this._conf = conf; - _blobStore = new HdfsBlobStore(); - _blobStore.prepare(conf, null, null, null); + this.conf = conf; + blobStore = new HdfsBlobStore(); + blobStore.prepare(conf, null, null, null); } @Override public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException { - return _blobStore.createBlob(key, meta, null); + return blobStore.createBlob(key, meta, null); } @Override public AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException { - return _blobStore.updateBlob(key, null); + return blobStore.updateBlob(key, null); } @Override public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException { - return _blobStore.getBlobMeta(key, null); + return blobStore.getBlobMeta(key, null); } @Override public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { try { - _blobStore.getBlob(blobKey, null); + blobStore.getBlob(blobKey, null); } catch (KeyNotFoundException e) { return false; } @@ -85,33 +86,33 @@ public class HdfsClientBlobStore extends ClientBlobStore { @Override public void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { - _blobStore.setBlobMeta(key, meta, null); + blobStore.setBlobMeta(key, meta, null); } @Override public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException { - _blobStore.deleteBlob(key, null); + blobStore.deleteBlob(key, null); } @Override public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException { - return _blobStore.getBlob(key, null); + return blobStore.getBlob(key, null); } @Override public Iterator<String> listKeys() { - return _blobStore.listKeys(); + return blobStore.listKeys(); } @Override public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException { - return _blobStore.getBlobReplication(key, null); + return blobStore.getBlobReplication(key, null); } @Override public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException { - return _blobStore.updateBlobReplication(key, replication, null); + return blobStore.updateBlobReplication(key, replication, null); } @Override @@ -132,7 +133,7 @@ public class HdfsClientBlobStore extends ClientBlobStore { @Override public void close() { - if(client != null) { + if (client != null) { client.close(); client = null; } diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java index b9290cf..ee4387f 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java @@ -39,7 +39,6 @@ import org.apache.storm.utils.Utils; * @see org.apache.storm.blobstore.NimbusBlobStore */ public abstract class ClientBlobStore implements Shutdownable, AutoCloseable { - protected Map<String, Object> conf; public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception { Map<String, Object> conf = ConfigUtils.readStormConfig(); diff --git a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java index 081cc7b..65a37df 100644 --- a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java +++ b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java @@ -111,7 +111,6 @@ public class ClientBlobStoreTest { @Override public void prepare(Map<String, Object> conf) { - this.conf = conf; allBlobs = new HashMap<String, SettableBlobMeta>(); }