http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/localizer/Localizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/localizer/Localizer.java b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java new file mode 100644 index 0000000..ef5684f --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java @@ -0,0 +1,695 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.localizer; + +import backtype.storm.Config; +import backtype.storm.blobstore.ClientBlobStore; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.utils.ShellUtils.ExitCodeException; +import backtype.storm.utils.ShellUtils.ShellCommandExecutor; +import backtype.storm.utils.Utils; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; + +/** + * Class to download and manage files from the blobstore. It uses an LRU cache + * to determine which files to keep so they can be reused and which files to delete. + */ +public class Localizer { + public static final Logger LOG = LoggerFactory.getLogger(Localizer.class); + + private Map _conf; + private int _threadPoolSize; + // thread pool for initial download + private ExecutorService _execService; + // thread pool for updates + private ExecutorService _updateExecService; + private int _blobDownloadRetries; + + // track resources - user to resourceSet + private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new + ConcurrentHashMap<String, LocalizedResourceSet>(); + + private String _localBaseDir; + public static final String USERCACHE = "usercache"; + public static final String FILECACHE = "filecache"; + + // sub directories to store either files or uncompressed archives respectively + public static final String FILESDIR = "files"; + public static final String ARCHIVESDIR = "archives"; + + private static final String TO_UNCOMPRESS = "_tmp_"; + + // cleanup + private long _cacheTargetSize; + private long _cacheCleanupPeriod; + private ScheduledExecutorService _cacheCleanupService; + + public Localizer(Map conf, String baseDir) { + _conf = conf; + _localBaseDir = baseDir; + // default cache size 10GB, converted to Bytes + _cacheTargetSize = Utils.getInt(_conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB), + 10 * 1024).longValue() << 20; + // default 10 minutes. + _cacheCleanupPeriod = Utils.getInt(_conf.get( + Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue(); + + // if we needed we could make config for update thread pool size + _threadPoolSize = Utils.getInt(_conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5); + _blobDownloadRetries = Utils.getInt(_conf.get( + Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3); + + _execService = Executors.newFixedThreadPool(_threadPoolSize); + _updateExecService = Executors.newFixedThreadPool(_threadPoolSize); + reconstructLocalizedResources(); + } + + // For testing, it allows setting size in bytes + protected void setTargetCacheSize(long size) { + _cacheTargetSize = size; + } + + // For testing, be careful as it doesn't clone + ConcurrentMap<String, LocalizedResourceSet> getUserResources() { + return _userRsrc; + } + + public void startCleaner() { + _cacheCleanupService = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder() + .setNameFormat("Localizer Cache Cleanup") + .build()); + + _cacheCleanupService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + handleCacheCleanup(); + } + }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS); + } + + public void shutdown() { + if (_cacheCleanupService != null) { + _cacheCleanupService.shutdown(); + } + if (_execService != null) { + _execService.shutdown(); + } + if (_updateExecService != null) { + _updateExecService.shutdown(); + } + } + + // baseDir/supervisor/usercache/ + protected File getUserCacheDir() { + return new File(_localBaseDir, USERCACHE); + } + + // baseDir/supervisor/usercache/user1/ + protected File getLocalUserDir(String userName) { + return new File(getUserCacheDir(), userName); + } + + // baseDir/supervisor/usercache/user1/filecache + public File getLocalUserFileCacheDir(String userName) { + return new File(getLocalUserDir(userName), FILECACHE); + } + + // baseDir/supervisor/usercache/user1/filecache/files + protected File getCacheDirForFiles(File dir) { + return new File(dir, FILESDIR); + } + + // get the directory to put uncompressed archives in + // baseDir/supervisor/usercache/user1/filecache/archives + protected File getCacheDirForArchives(File dir) { + return new File(dir, ARCHIVESDIR); + } + + protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet, + boolean uncompress) { + File[] lrsrcs = readCurrentBlobs(dir); + + if (lrsrcs != null) { + for (File rsrc : lrsrcs) { + LOG.info("add localized in dir found: " + rsrc); + /// strip off .suffix + String path = rsrc.getPath(); + int p = path.lastIndexOf('.'); + if (p > 0) { + path = path.substring(0, p); + } + LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path); + LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path, + uncompress); + lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress); + } + } + } + + protected File[] readDirContents(String location) { + File dir = new File(location); + File[] files = null; + if (dir.exists()) { + files = dir.listFiles(); + } + return files; + } + + // Looks for files in the directory with .current suffix + protected File[] readCurrentBlobs(String location) { + File dir = new File(location); + File[] files = null; + if (dir.exists()) { + files = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(Utils.DEFAULT_CURRENT_BLOB_SUFFIX); + } + }); + } + return files; + } + + // Check to see if there are any existing files already localized. + protected void reconstructLocalizedResources() { + try { + LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath()); + File[] users = readDirContents(getUserCacheDir().getPath()); + + if (users != null) { + for (File userDir : users) { + String user = userDir.getName(); + LOG.debug("looking in: {} for user: {}", userDir.getPath(), user); + LocalizedResourceSet newSet = new LocalizedResourceSet(user); + LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); + if (lrsrcSet == null) { + lrsrcSet = newSet; + } + addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(), + lrsrcSet, false); + addLocalizedResourceInDir( + getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(), + lrsrcSet, true); + } + } else { + LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath()); + } + } catch (Exception e) { + LOG.error("ERROR reconstructing localized resources", e); + } + } + + // ignores invalid user/topo/key + public synchronized void removeBlobReference(String key, String user, String topo, + boolean uncompress) throws AuthorizationException, KeyNotFoundException { + LocalizedResourceSet lrsrcSet = _userRsrc.get(user); + if (lrsrcSet != null) { + LocalizedResource lrsrc = lrsrcSet.get(key, uncompress); + if (lrsrc != null) { + LOG.debug("removing blob reference to: {} for topo: {}", key, topo); + lrsrc.removeReference(topo); + } else { + LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user + + " topo: " + topo); + } + } else { + LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: " + + key + " topo: " + topo); + } + } + + public synchronized void addReferences(List<LocalResource> localresource, String user, + String topo) { + LocalizedResourceSet lrsrcSet = _userRsrc.get(user); + if (lrsrcSet != null) { + for (LocalResource blob : localresource) { + LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress()); + if (lrsrc != null) { + lrsrc.addReference(topo); + LOG.debug("added reference for topo: {} key: {}", topo, blob); + } else { + LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo); + } + } + } else { + LOG.warn("trying to add reference to non-existent local resource set, " + + "user: " + user + " topo: " + topo); + } + } + + /** + * This function either returns the blob in the existing cache or if it doesn't exist in the + * cache, it will download the blob and will block until the download is complete. + */ + public LocalizedResource getBlob(LocalResource localResource, String user, String topo, + File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException { + ArrayList<LocalResource> arr = new ArrayList<LocalResource>(); + arr.add(localResource); + List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir); + if (results.isEmpty() || results.size() != 1) { + throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user + + ", topo: " + topo); + } + return results.get(0); + } + + protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) { + File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath()); + File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion()); + File versionFile = new File(lrsrc.getVersionFilePath()); + return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists()); + } + + protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc, + ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException { + String localFile = lrsrc.getFilePath(); + long nimbusBlobVersion = Utils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore); + long currentBlobVersion = Utils.localVersionOfBlob(localFile); + return (nimbusBlobVersion == currentBlobVersion); + } + + protected ClientBlobStore getClientBlobStore() { + return Utils.getClientBlobStoreForSupervisor(_conf); + } + + /** + * This function updates blobs on the supervisor. It uses a separate thread pool and runs + * asynchronously of the download and delete. + */ + public List<LocalizedResource> updateBlobs(List<LocalResource> localResources, + String user) throws AuthorizationException, KeyNotFoundException, IOException { + LocalizedResourceSet lrsrcSet = _userRsrc.get(user); + ArrayList<LocalizedResource> results = new ArrayList<>(); + ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>(); + + if (lrsrcSet == null) { + // resource set must have been removed + return results; + } + ClientBlobStore blobstore = null; + try { + blobstore = getClientBlobStore(); + for (LocalResource localResource: localResources) { + String key = localResource.getBlobName(); + LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress()); + if (lrsrc == null) { + LOG.warn("blob requested for update doesn't exist: {}", key); + continue; + } else { + // update it if either the version isn't the latest or if any local blob files are missing + if (!isLocalizedResourceUpToDate(lrsrc, blobstore) || + !isLocalizedResourceDownloaded(lrsrc)) { + LOG.debug("updating blob: {}", key); + updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user, + lrsrc.isUncompressed(), true)); + } + } + } + } finally { + if(blobstore != null) { + blobstore.shutdown(); + } + } + try { + List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates); + for (Future<LocalizedResource> futureRsrc : futures) { + try { + LocalizedResource lrsrc = futureRsrc.get(); + // put the resource just in case it was removed at same time by the cleaner + LocalizedResourceSet newSet = new LocalizedResourceSet(user); + LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet); + if (newlrsrcSet == null) { + newlrsrcSet = newSet; + } + newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); + results.add(lrsrc); + } + catch (ExecutionException e) { + LOG.error("Error updating blob: ", e); + if (e.getCause() instanceof AuthorizationException) { + throw (AuthorizationException)e.getCause(); + } + if (e.getCause() instanceof KeyNotFoundException) { + throw (KeyNotFoundException)e.getCause(); + } + } + } + } catch (RejectedExecutionException re) { + LOG.error("Error updating blobs : ", re); + } catch (InterruptedException ie) { + throw new IOException("Interrupted Exception", ie); + } + return results; + } + + /** + * This function either returns the blobs in the existing cache or if they don't exist in the + * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) + * and will block until all of them have been downloaded + */ + public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources, + String user, String topo, File userFileDir) + throws AuthorizationException, KeyNotFoundException, IOException { + + LocalizedResourceSet newSet = new LocalizedResourceSet(user); + LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); + if (lrsrcSet == null) { + lrsrcSet = newSet; + } + ArrayList<LocalizedResource> results = new ArrayList<>(); + ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>(); + + ClientBlobStore blobstore = null; + try { + blobstore = getClientBlobStore(); + for (LocalResource localResource: localResources) { + String key = localResource.getBlobName(); + boolean uncompress = localResource.shouldUncompress(); + LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress()); + boolean isUpdate = false; + if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) && + (isLocalizedResourceDownloaded(lrsrc))) { + if (isLocalizedResourceUpToDate(lrsrc, blobstore)) { + LOG.debug("blob already exists: {}", key); + lrsrc.addReference(topo); + results.add(lrsrc); + continue; + } + LOG.debug("blob exists but isn't up to date: {}", key); + isUpdate = true; + } + + // go off to blobstore and get it + // assume dir passed in exists and has correct permission + LOG.debug("fetching blob: {}", key); + File downloadDir = getCacheDirForFiles(userFileDir); + File localFile = new File(downloadDir, key); + if (uncompress) { + // for compressed file, download to archives dir + downloadDir = getCacheDirForArchives(userFileDir); + localFile = new File(downloadDir, key); + } + downloadDir.mkdir(); + downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress, + isUpdate)); + } + } finally { + if(blobstore !=null) { + blobstore.shutdown(); + } + } + try { + List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads); + for (Future<LocalizedResource> futureRsrc: futures) { + LocalizedResource lrsrc = futureRsrc.get(); + lrsrc.addReference(topo); + lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); + results.add(lrsrc); + } + } catch (ExecutionException e) { + if (e.getCause() instanceof AuthorizationException) + throw (AuthorizationException)e.getCause(); + else if (e.getCause() instanceof KeyNotFoundException) { + throw (KeyNotFoundException)e.getCause(); + } else { + throw new IOException("Error getting blobs", e); + } + } catch (RejectedExecutionException re) { + throw new IOException("RejectedExecutionException: ", re); + } catch (InterruptedException ie) { + throw new IOException("Interrupted Exception", ie); + } + return results; + } + + static class DownloadBlob implements Callable<LocalizedResource> { + + private Localizer _localizer; + private Map _conf; + private String _key; + private File _localFile; + private String _user; + private boolean _uncompress; + private boolean _isUpdate; + + public DownloadBlob(Localizer localizer, Map conf, String key, File localFile, + String user, boolean uncompress, boolean update) { + _localizer = localizer; + _conf = conf; + _key = key; + _localFile = localFile; + _user = user; + _uncompress = uncompress; + _isUpdate = update; + } + + @Override + public LocalizedResource call() + throws AuthorizationException, KeyNotFoundException, IOException { + return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress, + _isUpdate); + } + } + + private LocalizedResource downloadBlob(Map conf, String key, File localFile, + String user, boolean uncompress, boolean isUpdate) + throws AuthorizationException, KeyNotFoundException, IOException { + ClientBlobStore blobstore = null; + try { + blobstore = getClientBlobStore(); + long nimbusBlobVersion = Utils.nimbusVersionOfBlob(key, blobstore); + long oldVersion = Utils.localVersionOfBlob(localFile.toString()); + FileOutputStream out = null; + PrintWriter writer = null; + int numTries = 0; + String localizedPath = localFile.toString(); + String localFileWithVersion = Utils.constructBlobWithVersionFileName(localFile.toString(), + nimbusBlobVersion); + String localVersionFile = Utils.constructVersionFileName(localFile.toString()); + String downloadFile = localFileWithVersion; + if (uncompress) { + // we need to download to temp file and then unpack into the one requested + downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString(); + } + while (numTries < _blobDownloadRetries) { + out = new FileOutputStream(downloadFile); + numTries++; + try { + if (!Utils.canUserReadBlob(blobstore.getBlobMeta(key), user)) { + throw new AuthorizationException(user + " does not have READ access to " + key); + } + InputStreamWithMeta in = blobstore.getBlob(key); + byte[] buffer = new byte[1024]; + int len; + while ((len = in.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } + out.close(); + in.close(); + if (uncompress) { + Utils.unpack(new File(downloadFile), new File(localFileWithVersion)); + LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion); + } + + // Next write the version. + LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " + + nimbusBlobVersion + " local version was: " + oldVersion); + // The false parameter ensures overwriting the version file, not appending + writer = new PrintWriter( + new BufferedWriter(new FileWriter(localVersionFile, false))); + writer.println(nimbusBlobVersion); + writer.close(); + + try { + setBlobPermissions(conf, user, localFileWithVersion); + setBlobPermissions(conf, user, localVersionFile); + + // Update the key.current symlink. First create tmp symlink and do + // move of tmp to current so that the operation is atomic. + String tmp_uuid_local = java.util.UUID.randomUUID().toString(); + LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " + + "linking to: " + localFile + "." + nimbusBlobVersion); + File uuid_symlink = new File(localFile + "." + tmp_uuid_local); + + Files.createSymbolicLink(uuid_symlink.toPath(), + Paths.get(Utils.constructBlobWithVersionFileName(localFile.toString(), + nimbusBlobVersion))); + File current_symlink = new File(Utils.constructBlobCurrentSymlinkName( + localFile.toString())); + Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE); + } catch (IOException e) { + // if we fail after writing the version file but before we move current link we need to + // restore the old version to the file + try { + PrintWriter restoreWriter = new PrintWriter( + new BufferedWriter(new FileWriter(localVersionFile, false))); + restoreWriter.println(oldVersion); + restoreWriter.close(); + } catch (IOException ignore) {} + throw e; + } + + String oldBlobFile = localFile + "." + oldVersion; + try { + // Remove the old version. Note that if a number of processes have that file open, + // the OS will keep the old blob file around until they all close the handle and only + // then deletes it. No new process will open the old blob, since the users will open the + // blob through the "blob.current" symlink, which always points to the latest version of + // a blob. Remove the old version after the current symlink is updated as to not affect + // anyone trying to read it. + if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) { + LOG.info("Removing an old blob file:" + oldBlobFile); + Files.delete(Paths.get(oldBlobFile)); + } + } catch (IOException e) { + // At this point we have downloaded everything and moved symlinks. If the remove of + // old fails just log an error + LOG.error("Exception removing old blob version: " + oldBlobFile); + } + + break; + } catch (AuthorizationException ae) { + // we consider this non-retriable exceptions + if (out != null) { + out.close(); + } + new File(downloadFile).delete(); + throw ae; + } catch (IOException | KeyNotFoundException e) { + if (out != null) { + out.close(); + } + if (writer != null) { + writer.close(); + } + new File(downloadFile).delete(); + if (uncompress) { + try { + FileUtils.deleteDirectory(new File(localFileWithVersion)); + } catch (IOException ignore) {} + } + if (!isUpdate) { + // don't want to remove existing version file if its an update + new File(localVersionFile).delete(); + } + + if (numTries < _blobDownloadRetries) { + LOG.error("Failed to download blob, retrying", e); + } else { + throw e; + } + } + } + return new LocalizedResource(key, localizedPath, uncompress); + } finally { + if(blobstore != null) { + blobstore.shutdown(); + } + } + } + + public void setBlobPermissions(Map conf, String user, String path) + throws IOException { + + if (!Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + return; + } + String wlCommand = Utils.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), ""); + if (wlCommand.isEmpty()) { + String stormHome = System.getProperty("storm.home"); + wlCommand = stormHome + "/bin/worker-launcher"; + } + List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path)); + + String[] commandArray = command.toArray(new String[command.size()]); + ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); + LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray)); + + try { + shExec.execute(); + LOG.debug("output: {}", shExec.getOutput()); + } catch (ExitCodeException e) { + int exitCode = shExec.getExitCode(); + LOG.warn("Exit code from worker-launcher is : " + exitCode, e); + LOG.debug("output: {}", shExec.getOutput()); + throw new IOException("Setting blob permissions failed" + + " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); + } + } + + + public synchronized void handleCacheCleanup() { + LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize); + // need one large set of all and then clean via LRU + for (LocalizedResourceSet t : _userRsrc.values()) { + toClean.addResources(t); + LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean); + } + toClean.cleanup(); + LOG.debug("Resource cleanup: {}", toClean); + for (LocalizedResourceSet t : _userRsrc.values()) { + if (t.getSize() == 0) { + String user = t.getUser(); + + LOG.debug("removing empty set: {}", user); + File userFileCacheDir = getLocalUserFileCacheDir(user); + getCacheDirForFiles(userFileCacheDir).delete(); + getCacheDirForArchives(userFileCacheDir).delete(); + getLocalUserFileCacheDir(user).delete(); + boolean dirsRemoved = getLocalUserDir(user).delete(); + // to catch race with update thread + if (dirsRemoved) { + _userRsrc.remove(user); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java new file mode 100644 index 0000000..c07ae84 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.security.auth; + +import java.security.Principal; + +public class NimbusPrincipal implements Principal { + + @Override + public String getName() { + return NimbusPrincipal.class.toString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java new file mode 100644 index 0000000..c718858 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + + +public class BufferInputStream { + byte[] buffer; + InputStream stream; + + public BufferInputStream(InputStream stream, int bufferSize) { + this.stream = stream; + buffer = new byte[bufferSize]; + } + + public BufferInputStream(InputStream stream) { + this(stream, 15*1024); + } + + public byte[] read() throws IOException { + int length = stream.read(buffer); + if(length==-1) { + close(); + return new byte[0]; + } else if(length==buffer.length) { + return buffer; + } else { + return Arrays.copyOf(buffer, length); + } + } + + public void close() throws IOException { + stream.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java index 8595b71..55e3866 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java @@ -102,6 +102,13 @@ abstract public class ShellUtils { this(interval, false); } + /** get the exit code + * @return the exit code of the process + */ + public int getExitCode() { + return exitCode; + } + /** * @param interval the minimum duration to wait before re-executing the * command. http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index c086be2..e0bbb1f 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -18,18 +18,35 @@ package backtype.storm.utils; import backtype.storm.Config; +import backtype.storm.blobstore.BlobStore; +import backtype.storm.blobstore.BlobStoreAclHandler; +import backtype.storm.blobstore.ClientBlobStore; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.blobstore.LocalFsBlobStore; +import backtype.storm.generated.AccessControl; +import backtype.storm.generated.AccessControlType; import backtype.storm.generated.AuthorizationException; import backtype.storm.generated.ComponentCommon; import backtype.storm.generated.ComponentObject; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; import backtype.storm.generated.StormTopology; +import backtype.storm.localizer.Localizer; +import backtype.storm.nimbus.NimbusInfo; import backtype.storm.serialization.DefaultSerializationDelegate; import backtype.storm.serialization.SerializationDelegate; import clojure.lang.IFn; import clojure.lang.RT; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -39,13 +56,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; - import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermission; + import java.io.File; +import java.io.FileReader; import java.io.FileInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; @@ -57,13 +77,21 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.FileOutputStream; import java.io.BufferedReader; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.PrintStream; import java.io.RandomAccessFile; import java.io.Serializable; import java.io.IOException; + +import java.util.jar.JarEntry; +import java.util.jar.JarFile; import java.util.Map; +import java.util.Set; import java.util.Iterator; import java.util.Enumeration; import java.util.List; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashSet; import java.util.HashMap; @@ -83,6 +111,9 @@ import org.apache.thrift.TSerializer; public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String DEFAULT_STREAM_ID = "default"; + public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version"; + public static final String CURRENT_BLOB_SUFFIX_ID = "current"; + public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID; private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>(); private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>(); @@ -110,28 +141,6 @@ public class Utils { return serializationDelegate.deserialize(serialized, clazz); } - public static byte[] thriftSerialize(TBase t) { - try { - TSerializer ser = threadSer.get(); - if (ser == null) { - ser = new TSerializer(); - threadSer.set(ser); - } - return ser.serialize(t); - } catch (TException e) { - throw new RuntimeException(e); - } - } - - private static TDeserializer getDes() { - TDeserializer des = threadDes.get(); - if(des == null) { - des = new TDeserializer(); - threadDes.set(des); - } - return des; - } - public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) { try { T ret = (T) c.newInstance(); @@ -143,18 +152,6 @@ public class Utils { } } - public static <T> T thriftDeserialize(Class c, byte[] b) { - try { - T ret = (T) c.newInstance(); - TDeserializer des = getDes(); - des.deserialize((TBase) ret, b); - return ret; - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - public static byte[] javaSerialize(Object obj) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -431,38 +428,170 @@ public class Utils { return ret; } - public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException { - NimbusClient client = NimbusClient.getConfiguredClient(conf); - try { - download(client, file, localFile); - } finally { - client.close(); + + public static Localizer createLocalizer(Map conf, String baseDir) { + return new Localizer(conf, baseDir); + } + + public static ClientBlobStore getClientBlobStoreForSupervisor(Map conf) { + ClientBlobStore store = (ClientBlobStore) newInstance( + (String) conf.get(Config.SUPERVISOR_BLOBSTORE)); + store.prepare(conf); + return store; + } + + public static BlobStore getNimbusBlobStore(Map conf, NimbusInfo nimbusInfo) { + return getNimbusBlobStore(conf, null, nimbusInfo); + } + + public static BlobStore getNimbusBlobStore(Map conf, String baseDir, NimbusInfo nimbusInfo) { + String type = (String)conf.get(Config.NIMBUS_BLOBSTORE); + if (type == null) { + type = LocalFsBlobStore.class.getName(); } + BlobStore store = (BlobStore) newInstance(type); + HashMap nconf = new HashMap(conf); + // only enable cleanup of blobstore on nimbus + nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE); + store.prepare(nconf, baseDir, nimbusInfo); + return store; } - public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException { - NimbusClient client = new NimbusClient (conf, host, port, null); - try { - download(client, file, localFile); - } finally { - client.close(); + /** + * Meant to be called only by the supervisor for stormjar/stormconf/stormcode files. + * @param key + * @param localFile + * @param cb + * @throws AuthorizationException + * @throws KeyNotFoundException + * @throws IOException + */ + public static void downloadResourcesAsSupervisor(String key, String localFile, + ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException { + final int MAX_RETRY_ATTEMPTS = 2; + final int ATTEMPTS_INTERVAL_TIME = 100; + for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) { + if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) { + break; + } + Utils.sleep(ATTEMPTS_INTERVAL_TIME); } } - private static void download(NimbusClient client, String file, String localFile) throws IOException, TException, AuthorizationException { - WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile)); + public static ClientBlobStore getClientBlobStore(Map conf) { + ClientBlobStore store = (ClientBlobStore) Utils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE)); + store.prepare(conf); + return store; + } + + private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) { + boolean isSuccess = false; + FileOutputStream out = null; + InputStreamWithMeta in = null; try { - String id = client.getClient().beginFileDownload(file); - while (true) { - ByteBuffer chunk = client.getClient().downloadChunk(id); - int written = out.write(chunk); - if (written == 0) break; + out = new FileOutputStream(localFile); + in = cb.getBlob(key); + long fileSize = in.getFileLength(); + + byte[] buffer = new byte[1024]; + int len; + int downloadFileSize = 0; + while ((len = in.read(buffer)) >= 0) { + out.write(buffer, 0, len); + downloadFileSize += len; } + + isSuccess = (fileSize == downloadFileSize); + } catch (TException | IOException e) { + LOG.error("An exception happened while downloading {} from blob store.", localFile, e); } finally { - out.close(); + try { + if (out != null) { + out.close(); + } + } catch (IOException ignored) {} + try { + if (in != null) { + in.close(); + } + } catch (IOException ignored) {} + } + if (!isSuccess) { + try { + Files.deleteIfExists(Paths.get(localFile)); + } catch (IOException ex) { + LOG.error("Failed trying to delete the partially downloaded {}", localFile, ex); + } + } + return isSuccess; + } + + public static boolean checkFileExists(String dir, String file) { + return Files.exists(new File(dir, file).toPath()); + } + + public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException { + long nimbusBlobVersion = 0; + ReadableBlobMeta metadata = cb.getBlobMeta(key); + nimbusBlobVersion = metadata.get_version(); + return nimbusBlobVersion; + } + + public static String getFileOwner(String path) throws IOException { + return Files.getOwner(FileSystems.getDefault().getPath(path)).getName(); + } + + public static long localVersionOfBlob(String localFile) { + File f = new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX); + long currentVersion = 0; + if (f.exists() && !(f.isDirectory())) { + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(f)); + String line = br.readLine(); + currentVersion = Long.parseLong(line); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + if (br != null) { + br.close(); + } + } catch (Exception ignore) { + LOG.error("Exception trying to cleanup", ignore); + } + } + return currentVersion; + } else { + return -1; + } + } + + public static String constructBlobWithVersionFileName(String fileName, long version) { + return fileName + "." + version; + } + + public static String constructBlobCurrentSymlinkName(String fileName) { + return fileName + Utils.DEFAULT_CURRENT_BLOB_SUFFIX; + } + + public static String constructVersionFileName(String fileName) { + return fileName + Utils.DEFAULT_BLOB_VERSION_SUFFIX; + } + // only works on operating systems that support posix + public static void restrictPermissions(String baseDir) { + try { + Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>( + Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ, + PosixFilePermission.GROUP_EXECUTE)); + Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms); + } catch (IOException e) { + throw new RuntimeException(e); } } + public static IFn loadClojureFn(String namespace, String name) { try { clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); @@ -505,6 +634,37 @@ public class Utils { return result; } + private static TDeserializer getDes() { + TDeserializer des = threadDes.get(); + if(des == null) { + des = new TDeserializer(); + threadDes.set(des); + } + return des; + } + + public static byte[] thriftSerialize(TBase t) { + try { + TSerializer ser = threadSer.get(); + if (ser == null) { + ser = new TSerializer(); + threadSer.set(ser); + } + return ser.serialize(t); + } catch (TException e) { + LOG.error("Failed to serialize to thrift: ", e); + throw new RuntimeException(e); + } + } + + public static <T> T thriftDeserialize(Class c, byte[] b) { + try { + return Utils.thriftDeserialize(c, b, 0, b.length); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static Integer getInt(Object o, Integer defaultValue) { if (null == o) { return defaultValue; @@ -571,6 +731,245 @@ public class Utils { return UUID.randomUUID().getLeastSignificantBits(); } + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. + * + * @param jarFile the .jar file to unpack + * @param toDir the destination directory into which to unpack the jar + */ + public static void unJar(File jarFile, File toDir) + throws IOException { + JarFile jar = new JarFile(jarFile); + try { + Enumeration<JarEntry> entries = jar.entries(); + while (entries.hasMoreElements()) { + final JarEntry entry = entries.nextElement(); + if (!entry.isDirectory()) { + InputStream in = jar.getInputStream(entry); + try { + File file = new File(toDir, entry.getName()); + ensureDirectory(file.getParentFile()); + OutputStream out = new FileOutputStream(file); + try { + copyBytes(in, out, 8192); + } finally { + out.close(); + } + } finally { + in.close(); + } + } + } + } finally { + jar.close(); + } + } + + /** + * Copies from one stream to another. + * + * @param in InputStream to read from + * @param out OutputStream to write to + * @param buffSize the size of the buffer + */ + public static void copyBytes(InputStream in, OutputStream out, int buffSize) + throws IOException { + PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; + byte buf[] = new byte[buffSize]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesRead = in.read(buf); + } + } + + /** + * Ensure the existence of a given directory. + * + * @throws IOException if it cannot be created and does not already exist + */ + private static void ensureDirectory(File dir) throws IOException { + if (!dir.mkdirs() && !dir.isDirectory()) { + throw new IOException("Mkdirs failed to create " + + dir.toString()); + } + } + + /** + * Given a Tar File as input it will untar the file in a the untar directory + * passed as the second parameter + * <p/> + * This utility will untar ".tar" files and ".tar.gz","tgz" files. + * + * @param inFile The tar file as input. + * @param untarDir The untar directory where to untar the tar file. + * @throws IOException + */ + public static void unTar(File inFile, File untarDir) throws IOException { + if (!untarDir.mkdirs()) { + if (!untarDir.isDirectory()) { + throw new IOException("Mkdirs failed to create " + untarDir); + } + } + + boolean gzipped = inFile.toString().endsWith("gz"); + if (onWindows()) { + // Tar is not native to Windows. Use simple Java based implementation for + // tests and simple tar archives + unTarUsingJava(inFile, untarDir, gzipped); + } else { + // spawn tar utility to untar archive for full fledged unix behavior such + // as resolving symlinks in tar archives + unTarUsingTar(inFile, untarDir, gzipped); + } + } + + private static void unTarUsingTar(File inFile, File untarDir, + boolean gzipped) throws IOException { + StringBuffer untarCommand = new StringBuffer(); + if (gzipped) { + untarCommand.append(" gzip -dc '"); + untarCommand.append(inFile.toString()); + untarCommand.append("' | ("); + } + untarCommand.append("cd '"); + untarCommand.append(untarDir.toString()); + untarCommand.append("' ; "); + untarCommand.append("tar -xf "); + + if (gzipped) { + untarCommand.append(" -)"); + } else { + untarCommand.append(inFile.toString()); + } + String[] shellCmd = {"bash", "-c", untarCommand.toString()}; + ShellUtils.ShellCommandExecutor shexec = new ShellUtils.ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + inFile + + ". Tar process exited with exit code " + exitcode); + } + } + + private static void unTarUsingJava(File inFile, File untarDir, + boolean gzipped) throws IOException { + InputStream inputStream = null; + TarArchiveInputStream tis = null; + try { + if (gzipped) { + inputStream = new BufferedInputStream(new GZIPInputStream( + new FileInputStream(inFile))); + } else { + inputStream = new BufferedInputStream(new FileInputStream(inFile)); + } + tis = new TarArchiveInputStream(inputStream); + for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) { + unpackEntries(tis, entry, untarDir); + entry = tis.getNextTarEntry(); + } + } finally { + cleanup(tis, inputStream); + } + } + + /** + * Close the Closeable objects and <b>ignore</b> any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param closeables the objects to close + */ + private static void cleanup(java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch (IOException e) { + LOG.debug("Exception in closing " + c, e); + + } + } + } + } + + private static void unpackEntries(TarArchiveInputStream tis, + TarArchiveEntry entry, File outputDir) throws IOException { + if (entry.isDirectory()) { + File subDir = new File(outputDir, entry.getName()); + if (!subDir.mkdirs() && !subDir.isDirectory()) { + throw new IOException("Mkdirs failed to create tar internal dir " + + outputDir); + } + for (TarArchiveEntry e : entry.getDirectoryEntries()) { + unpackEntries(tis, e, subDir); + } + return; + } + File outputFile = new File(outputDir, entry.getName()); + if (!outputFile.getParentFile().exists()) { + if (!outputFile.getParentFile().mkdirs()) { + throw new IOException("Mkdirs failed to create tar internal dir " + + outputDir); + } + } + int count; + byte data[] = new byte[2048]; + BufferedOutputStream outputStream = new BufferedOutputStream( + new FileOutputStream(outputFile)); + + while ((count = tis.read(data)) != -1) { + outputStream.write(data, 0, count); + } + outputStream.flush(); + outputStream.close(); + } + + public static boolean onWindows() { + if (System.getenv("OS") != null) { + return System.getenv("OS").equals("Windows_NT"); + } + return false; + } + + public static void unpack(File localrsrc, File dst) throws IOException { + String lowerDst = localrsrc.getName().toLowerCase(); + if (lowerDst.endsWith(".jar")) { + unJar(localrsrc, dst); + } else if (lowerDst.endsWith(".zip")) { + unZip(localrsrc, dst); + } else if (lowerDst.endsWith(".tar.gz") || + lowerDst.endsWith(".tgz") || + lowerDst.endsWith(".tar")) { + unTar(localrsrc, dst); + } else { + LOG.warn("Cannot unpack " + localrsrc); + if (!localrsrc.renameTo(dst)) { + throw new IOException("Unable to rename file: [" + localrsrc + + "] to [" + dst + "]"); + } + } + if (localrsrc.isFile()) { + localrsrc.delete(); + } + } + + public static boolean canUserReadBlob(ReadableBlobMeta meta, String user) { + SettableBlobMeta settable = meta.get_settable(); + for (AccessControl acl : settable.get_acl()) { + if (acl.get_type().equals(AccessControlType.OTHER) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) { + return true; + } + if (acl.get_name().equals(user) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) { + return true; + } + } + return false; + } + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) { return newCurator(conf, servers, port, root, null); } @@ -702,6 +1101,38 @@ public class Utils { return ret; } + /** + * Takes an input dir or file and returns the disk usage on that local directory. + * Very basic implementation. + * + * @param dir The input dir to get the disk space of this local dir + * @return The total disk space of the input local directory + */ + public static long getDU(File dir) { + long size = 0; + if (!dir.exists()) + return 0; + if (!dir.isDirectory()) { + return dir.length(); + } else { + File[] allFiles = dir.listFiles(); + if(allFiles != null) { + for (int i = 0; i < allFiles.length; i++) { + boolean isSymLink; + try { + isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]); + } catch(IOException ioe) { + isSymLink = true; + } + if(!isSymLink) { + size += getDU(allFiles[i]); + } + } + } + return size; + } + } + public static String threadDump() { final StringBuilder dump = new StringBuilder(); final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java index 44ec967..f1be007 100644 --- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java +++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java @@ -477,8 +477,16 @@ public class ConfigValidation { } } - public static class PacemakerAuthTypeValidator extends Validator { + public static class MapOfStringToMapOfStringToObjectValidator extends Validator { + @Override + public void validateField(String name, Object o) { + ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false), + ConfigValidationUtils.mapFv(String.class, Object.class,true), true); + validator.validateField(name, o); + } + } + public static class PacemakerAuthTypeValidator extends Validator { @Override public void validateField(String name, Object o) { if(o == null) { @@ -486,9 +494,9 @@ public class ConfigValidation { } if(o instanceof String && - (((String)o).equals("NONE") || - ((String)o).equals("DIGEST") || - ((String)o).equals("KERBEROS"))) { + (((String)o).equals("NONE") || + ((String)o).equals("DIGEST") || + ((String)o).equals("KERBEROS"))) { return; } throw new IllegalArgumentException( "Field " + name + " must be one of \"NONE\", \"DIGEST\", or \"KERBEROS\""); http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java index ed93370..cb742fa 100644 --- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java +++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java @@ -51,7 +51,6 @@ public class ConfigValidationAnnotations { /** * Validators with fields: validatorClass and type */ - @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface isType { @@ -82,7 +81,6 @@ public class ConfigValidationAnnotations { /** * Validators with fields: validatorClass */ - @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface isString { @@ -109,7 +107,7 @@ public class ConfigValidationAnnotations { } /** - * validates on object is not null + * Validates on object is not null */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) @@ -118,7 +116,7 @@ public class ConfigValidationAnnotations { } /** - * validates that there are no duplicates in a list + * Validates that there are no duplicates in a list */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) @@ -142,7 +140,6 @@ public class ConfigValidationAnnotations { * Validates the type of each key and value in a map * Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass */ - @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface isMapEntryType { @@ -168,7 +165,7 @@ public class ConfigValidationAnnotations { } /** - * checks if a number is positive and whether zero inclusive + * Checks if a number is positive and whether zero inclusive * Validator with fields: validatorClass, includeZero */ @Retention(RetentionPolicy.RUNTIME) @@ -182,7 +179,6 @@ public class ConfigValidationAnnotations { /** * Complex/custom type validators */ - @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface isStringOrStringList { @@ -204,7 +200,6 @@ public class ConfigValidationAnnotations { /** * For custom validators */ - @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface CustomValidator { http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/Nimbus-remote ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote index 63e7dce..5b8e396 100644 --- a/storm-core/src/py/storm/Nimbus-remote +++ b/storm-core/src/py/storm/Nimbus-remote @@ -55,6 +55,20 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' void setWorkerProfiler(string id, ProfileRequest profileRequest)') print(' getComponentPendingProfileActions(string id, string component_id, ProfileAction action)') print(' void uploadNewCredentials(string name, Credentials creds)') + print(' string beginCreateBlob(string key, SettableBlobMeta meta)') + print(' string beginUpdateBlob(string key)') + print(' void uploadBlobChunk(string session, string chunk)') + print(' void finishBlobUpload(string session)') + print(' void cancelBlobUpload(string session)') + print(' ReadableBlobMeta getBlobMeta(string key)') + print(' void setBlobMeta(string key, SettableBlobMeta meta)') + print(' BeginDownloadResult beginBlobDownload(string key)') + print(' string downloadBlobChunk(string session)') + print(' void deleteBlob(string key)') + print(' ListBlobsResult listBlobs(string session)') + print(' i32 getBlobReplication(string key)') + print(' i32 updateBlobReplication(string key, i32 replication)') + print(' void createStateInZookeeper(string key)') print(' string beginFileUpload()') print(' void uploadChunk(string location, string chunk)') print(' void finishFileUpload(string location)') @@ -204,6 +218,90 @@ elif cmd == 'uploadNewCredentials': sys.exit(1) pp.pprint(client.uploadNewCredentials(args[0],eval(args[1]),)) +elif cmd == 'beginCreateBlob': + if len(args) != 2: + print('beginCreateBlob requires 2 args') + sys.exit(1) + pp.pprint(client.beginCreateBlob(args[0],eval(args[1]),)) + +elif cmd == 'beginUpdateBlob': + if len(args) != 1: + print('beginUpdateBlob requires 1 args') + sys.exit(1) + pp.pprint(client.beginUpdateBlob(args[0],)) + +elif cmd == 'uploadBlobChunk': + if len(args) != 2: + print('uploadBlobChunk requires 2 args') + sys.exit(1) + pp.pprint(client.uploadBlobChunk(args[0],args[1],)) + +elif cmd == 'finishBlobUpload': + if len(args) != 1: + print('finishBlobUpload requires 1 args') + sys.exit(1) + pp.pprint(client.finishBlobUpload(args[0],)) + +elif cmd == 'cancelBlobUpload': + if len(args) != 1: + print('cancelBlobUpload requires 1 args') + sys.exit(1) + pp.pprint(client.cancelBlobUpload(args[0],)) + +elif cmd == 'getBlobMeta': + if len(args) != 1: + print('getBlobMeta requires 1 args') + sys.exit(1) + pp.pprint(client.getBlobMeta(args[0],)) + +elif cmd == 'setBlobMeta': + if len(args) != 2: + print('setBlobMeta requires 2 args') + sys.exit(1) + pp.pprint(client.setBlobMeta(args[0],eval(args[1]),)) + +elif cmd == 'beginBlobDownload': + if len(args) != 1: + print('beginBlobDownload requires 1 args') + sys.exit(1) + pp.pprint(client.beginBlobDownload(args[0],)) + +elif cmd == 'downloadBlobChunk': + if len(args) != 1: + print('downloadBlobChunk requires 1 args') + sys.exit(1) + pp.pprint(client.downloadBlobChunk(args[0],)) + +elif cmd == 'deleteBlob': + if len(args) != 1: + print('deleteBlob requires 1 args') + sys.exit(1) + pp.pprint(client.deleteBlob(args[0],)) + +elif cmd == 'listBlobs': + if len(args) != 1: + print('listBlobs requires 1 args') + sys.exit(1) + pp.pprint(client.listBlobs(args[0],)) + +elif cmd == 'getBlobReplication': + if len(args) != 1: + print('getBlobReplication requires 1 args') + sys.exit(1) + pp.pprint(client.getBlobReplication(args[0],)) + +elif cmd == 'updateBlobReplication': + if len(args) != 2: + print('updateBlobReplication requires 2 args') + sys.exit(1) + pp.pprint(client.updateBlobReplication(args[0],eval(args[1]),)) + +elif cmd == 'createStateInZookeeper': + if len(args) != 1: + print('createStateInZookeeper requires 1 args') + sys.exit(1) + pp.pprint(client.createStateInZookeeper(args[0],)) + elif cmd == 'beginFileUpload': if len(args) != 0: print('beginFileUpload requires 0 args')
