http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/cf2f7a93/utils/common/src/main/java/org/apache/brooklyn/util/os/Os.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/os/Os.java b/utils/common/src/main/java/org/apache/brooklyn/util/os/Os.java new file mode 100644 index 0000000..13ba63b --- /dev/null +++ b/utils/common/src/main/java/org/apache/brooklyn/util/os/Os.java @@ -0,0 +1,572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.os; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.net.Urls; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.stream.Streams; +import org.apache.brooklyn.util.text.Identifiers; +import org.apache.brooklyn.util.text.Strings; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; + +public class Os { + + private static final Logger log = LoggerFactory.getLogger(Os.class); + + private static final int TEMP_DIR_ATTEMPTS = 1000; + + private static final char SEPARATOR_UNIX = '/'; + private static final char SEPARATOR_WIN = '\\'; + + public static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** returns the best tmp dir to use; see {@link TmpDirFinder} for the logic + * (and the explanation why this is needed!) */ + public static String tmp() { + Maybe<String> tmp = tmpdir.get(); + if (tmp.isPresent()) return tmp.get(); + + tmpdir.useWithWarning(System.getProperty("java.io.tmpdir")); + return tmp.get(); + } + + private static TmpDirFinder tmpdir = new TmpDirFinder(); + + /** utility for finding a usable (writable) tmp dir, preferring java.io.tmpdir + * (unless it's weird, e.g. /private/tmp/xxx or /var/tmp/... as under OS X, and /tmp is valid), + * falling back to ~/.tmp/ (and creating that) if the others are not usable + * <p> + * it is weird if /tmp is not writable, but it does happen, hence this check + * <p> + * note you can also set java system property {@value #BROOKLYN_OS_TMPDIR_PROPERTY} + * to force the use of a specific tmp space */ + public static class TmpDirFinder { + /** can be set as a jvm system property to force a particular tmp dir; directory must exist with the right permissions */ + public static String BROOKLYN_OS_TMPDIR_PROPERTY = "brooklyn.os.tmpdir"; + + private String tmpdir = null; + private boolean isFallback = false; + + public Maybe<String> get() { + if (isFallback()) + log.debug("TmpDirFinder: using fallback tmp directory "+tmpdir, new Throwable("Caller using fallback tmp dir")); + if (isFound()) return Maybe.of(tmpdir); + if (find()) return Maybe.of(tmpdir); + return Maybe.absent(newFailure("TmpDirFinder: No valid tmp dir can be found")); + } + + public boolean isFallback() { + return isFallback; + } + + public boolean useWithWarning(String dir) { + if (tmpdir==null) { + tmpdir = dir; + isFallback = true; + log.warn("Unable to find a valid tmp dir; will use "+dir+" but with caution! See (debug) messages marked TmpDirFinder for more information."); + return true; + } + return false; + } + + public boolean isFound() { + return tmpdir!=null; + } + protected synchronized boolean find() { + if (isFound()) return true; + + String customtmp = System.getProperty(BROOKLYN_OS_TMPDIR_PROPERTY); + if (customtmp!=null) { + if (checkAndSet(customtmp)) return true; + log.warn("TmpDirFinder: Custom tmp directory '"+customtmp+"' in "+BROOKLYN_OS_TMPDIR_PROPERTY+" is not a valid tmp dir; ignoring"); + } + + String systmp = System.getProperty("java.io.tmpdir"); + boolean systmpWeird = (systmp.contains("/var/") || systmp.startsWith("/private")); + if (!systmpWeird) if (checkAndSet(systmp)) return true; + + if (checkAndSet(File.separator+"tmp")) return true; + if (systmpWeird) if (checkAndSet(systmp)) return true; + + try { + String hometmp = mergePaths(home(), ".tmp"); + File hometmpF = new File(hometmp); + hometmpF.mkdirs(); + if (checkAndSet(hometmp)) return true; + } catch (Exception e) { + log.debug("TmpDirFinder: Cannot create tmp dir in user's home dir: "+e); + } + + return false; + } + + protected boolean checkAndSet(String candidate) { + if (!check(candidate)) return false; + // seems okay + tmpdir = candidate; + log.debug("TmpDirFinder: Selected tmp dir '"+candidate+"' as the best tmp working space"); + return true; + } + + protected boolean check(String candidate) { + try { + File f = new File(candidate); + if (!f.exists()) { + log.debug("TmpDirFinder: Candidate tmp dir '"+candidate+"' does not exist"); + return false; + } + if (!f.isDirectory()) { + log.debug("TmpDirFinder: Candidate tmp dir '"+candidate+"' is not a directory"); + return false; + } + File f2 = new File(f, "brooklyn-tmp-check-"+Strings.makeRandomId(4)); + if (!f2.createNewFile()) { + log.debug("TmpDirFinder: Candidate tmp dir '"+candidate+"' cannot have files created inside it ("+f2+")"); + return false; + } + if (!f2.delete()) { + log.debug("TmpDirFinder: Candidate tmp dir '"+candidate+"' cannot have files deleted inside it ("+f2+")"); + return false; + } + + return true; + } catch (Exception e) { + log.debug("TmpDirFinder: Candidate tmp dir '"+candidate+"' is not valid: "+e); + return false; + } + } + + protected IllegalStateException newFailure(String message) { + return new IllegalStateException(message); + } + } + + /** user name */ + public static String user() { + return System.getProperty("user.name"); + } + + /** user's home directory */ + public static String home() { + return System.getProperty("user.home"); + } + + /** merges paths using forward slash (unix way); + * now identical to {@link Os#mergePaths(String...)} but kept for contexts + * where caller wants to indicate the target system should definitely be unix */ + public static String mergePathsUnix(String ...items) { + return Urls.mergePaths(items); + } + + /** merges paths using forward slash as the "local OS file separator", because it is recognised on windows, + * making paths more consistent and avoiding problems with backslashes being escaped */ + public static String mergePaths(String ...items) { + char separatorChar = '/'; + StringBuilder result = new StringBuilder(); + for (String item: items) { + if (Strings.isEmpty(item)) continue; + if (result.length() > 0 && !isSeparator(result.codePointAt(result.length()-1))) result.append(separatorChar); + result.append(item); + } + return result.toString(); + } + + /** tries to delete a directory recursively; + * use with care - see http://stackoverflow.com/questions/8320376/why-is-files-deletedirectorycontents-deprecated-in-guava. + * <p> + * also note this implementation refuses to delete / or ~ or anything else not passing {@link #checkSafe(File)}. + * if you might really want to delete something like that, use {@link #deleteRecursively(File, boolean)}. + */ + @Beta + public static DeletionResult deleteRecursively(File dir) { + return deleteRecursively(dir, false); + } + + /** + * as {@link #deleteRecursively(File)} but includes safety checks to prevent deletion of / or ~ + * or anything else not passing {@link #checkSafe(File)}, unless the skipSafetyChecks parameter is set + */ + @Beta + public static DeletionResult deleteRecursively(File dir, boolean skipSafetyChecks) { + if (dir==null) return new DeletionResult(null, true, null); + + try { + if (!skipSafetyChecks) checkSafe(dir); + + FileUtils.deleteDirectory(dir); + return new DeletionResult(dir, true, null); + } catch (IllegalArgumentException e) { + // See exception reported in https://issues.apache.org/jira/browse/BROOKLYN-72 + // If another thread is changing the contents of the directory at the same time as + // we delete it, then can get this exception. + return new DeletionResult(dir, false, e); + } catch (IOException e) { + return new DeletionResult(dir, false, e); + } + } + + /** fails if the dir is not "safe" for deletion, currently length <= 2 or the home directory */ + protected static void checkSafe(File dir) throws IOException { + String dp = dir.getAbsolutePath(); + dp = Strings.removeFromEnd(dp, "/"); + if (dp.length()<=2) + throw new IOException("Refusing instruction to delete "+dir+": name too short"); + + if (Os.home().equals(dp)) + throw new IOException("Refusing instruction to delete "+dir+": it's the home directory"); + } + + /** + * @see {@link #deleteRecursively(File)} + */ + @Beta + public static DeletionResult deleteRecursively(String dir) { + if (dir==null) return new DeletionResult(null, true, null); + return deleteRecursively(new File(dir)); + } + + public static class DeletionResult { + private final File file; + private final boolean successful; + private final Throwable throwable; + public DeletionResult(File file, boolean successful, Throwable throwable) { + this.file = file; + this.successful = successful; + this.throwable = throwable; + } + public boolean wasSuccessful() { return successful; } + public DeletionResult throwIfFailed() { + if (!successful) + throw Exceptions.propagate(new IOException("Unable to delete '"+file+"': delete returned false", throwable)); + return this; + } + public File getFile() { return file; } + public Throwable getThrowable() { return throwable; } + public <T> T asNullIgnoringError() { return null; } + public <T> T asNullOrThrowing() { + throwIfFailed(); + return null; + } + } + + private static class FileDeletionHook { + public FileDeletionHook(File f, boolean recursively) { + this.path = f; + this.recursively = recursively; + } + final File path; + final boolean recursively; + + public void run() throws IOException { + if (path.exists()) { + if (recursively && path.isDirectory()) { + Os.deleteRecursively(path); + } else { + path.delete(); + } + } + } + } + + private static final Map<String,FileDeletionHook> deletions = new LinkedHashMap<String, Os.FileDeletionHook>(); + + private static void addShutdownFileDeletionHook(String path, FileDeletionHook hook) { + synchronized (deletions) { + if (deletions.isEmpty()) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + synchronized (deletions) { + List<String> pathsToDelete = new ArrayList<String>(deletions.keySet()); + Collections.sort(pathsToDelete, Strings.lengthComparator().reverse()); + for (String path: pathsToDelete) { + try { + deletions.remove(path).run(); + } catch (Exception e) { + log.warn("Unable to delete '"+path+"' on shutdown: "+e); + } + } + } + } + }); + } + FileDeletionHook oldHook = deletions.put(path, hook); + if (oldHook!=null && oldHook.recursively) + // prefer any hook which is recursive + deletions.put(path, oldHook); + } + } + + /** deletes the given file or empty directory on exit + * <p> + * similar to {@link File#deleteOnExit()} except it is smart about trying to delete longer filenames first + * (and the shutdown hook order does not use proprietary java hooks) + * <p> + * note this does not delete non-empty directories; see {@link #deleteOnExitRecursively(File)} for that */ + public static void deleteOnExit(File directoryToDeleteIfEmptyOrFile) { + addShutdownFileDeletionHook(directoryToDeleteIfEmptyOrFile.getAbsolutePath(), new FileDeletionHook(directoryToDeleteIfEmptyOrFile, false)); + } + + /** deletes the given file or directory and, in the case of directories, any contents; + * similar to apache commons FileUtils.cleanDirectoryOnExit but corrects a bug in that implementation + * which causes it to fail if content is added to that directory after the hook is registered */ + public static void deleteOnExitRecursively(File directoryToCleanOrFile) { + addShutdownFileDeletionHook(directoryToCleanOrFile.getAbsolutePath(), new FileDeletionHook(directoryToCleanOrFile, true)); + } + + /** causes empty directories from subsubdir up to and including dir to be deleted on exit; + * useful e.g. if we create /tmp/brooklyn-test/foo/test1/ and someone else might create + * /tmp/brooklyn-test/foo/test2/ and we'd like the last tear-down to result in /tmp/brooklyn-test being deleted! + * <p> + * returns number of directories queued for deletion so caller can check for errors if desired; + * if dir is not an ancestor of subsubdir this logs a warning but does not throw */ + public static int deleteOnExitEmptyParentsUpTo(File subsubDirOrFile, File dir) { + if (subsubDirOrFile==null || dir==null) + return 0; + + List<File> dirsToDelete = new ArrayList<File>(); + File d = subsubDirOrFile; + do { + dirsToDelete.add(d); + if (d.equals(dir)) break; + d = d.getParentFile(); + } while (d!=null); + + if (d==null) { + log.warn("File "+subsubDirOrFile+" has no ancestor "+dir+": will not attempt to clean up with ancestors on exit"); + // dir is not an ancestor if subsubdir + return 0; + } + + for (File f: dirsToDelete) + deleteOnExit(f); + + return dirsToDelete.size(); + } + + /** like {@link #deleteOnExitRecursively(File)} followed by {@link #deleteOnExitEmptyParentsUpTo(File, File)} */ + public static void deleteOnExitRecursivelyAndEmptyParentsUpTo(File directoryToCleanOrFile, File highestAncestorToDelete) { + deleteOnExitRecursively(directoryToCleanOrFile); + deleteOnExitEmptyParentsUpTo(directoryToCleanOrFile, highestAncestorToDelete); + } + + /** as {@link File#mkdirs()} but throwing on failure and returning the directory made for fluent convenience */ + public static File mkdirs(File dir) { + dir.mkdirs(); + if (dir.isDirectory()) { + return dir; + } + throw Exceptions.propagate(new IOException("Failed to create directory " + dir + + (dir.isFile() ? "(is file)" : ""))); + } + + /** writes given contents to a temporary file which will be deleted on exit */ + public static File writeToTempFile(InputStream is, String prefix, String suffix) { + return writeToTempFile(is, new File(Os.tmp()), prefix, suffix); + } + + /** writes given contents to a temporary file which will be deleted on exit, located under the given directory */ + public static File writeToTempFile(InputStream is, File tempDir, String prefix, String suffix) { + Preconditions.checkNotNull(is, "Input stream required to create temp file for %s*%s", prefix, suffix); + mkdirs(tempDir); + File tempFile = newTempFile(prefix, suffix); + OutputStream out = null; + try { + out = new FileOutputStream(tempFile); + ByteStreams.copy(is, out); + } catch (IOException e) { + throw Throwables.propagate(e); + } finally { + Streams.closeQuietly(is); + Streams.closeQuietly(out); + } + return tempFile; + } + + public static File writePropertiesToTempFile(Properties props, String prefix, String suffix) { + return writePropertiesToTempFile(props, new File(Os.tmp()), prefix, suffix); + } + + public static File writePropertiesToTempFile(Properties props, File tempDir, String prefix, String suffix) { + Preconditions.checkNotNull(props, "Properties required to create temp file for %s*%s", prefix, suffix); + File tempFile; + try { + tempFile = File.createTempFile(prefix, suffix, tempDir); + } catch (IOException e) { + throw Throwables.propagate(e); + } + tempFile.deleteOnExit(); + + OutputStream out = null; + try { + out = new FileOutputStream(tempFile); + props.store(out, "Auto-generated by Brooklyn"); + } catch (IOException e) { + throw Throwables.propagate(e); + } finally { + Streams.closeQuietly(out); + } + return tempFile; + } + + /** + * Tidy up a file path. + * <p> + * Removes duplicate or trailing path separators (Unix style forward + * slashes only), replaces initial {@literal ~} with the + * value of {@link #home()} and folds out use of {@literal ..} and + * {@literal .} path segments. + * + * @see com.google.common.io.Files#simplifyPath(String) + */ + public static String tidyPath(String path) { + Preconditions.checkNotNull(path, "path"); + Iterable<String> segments = Splitter.on("/").split(Files.simplifyPath(path)); + if (Iterables.get(segments, 0).equals("~")) { // Always at least one segment after simplifyPath + segments = Iterables.concat(ImmutableSet.of(Os.home()), Iterables.skip(segments, 1)); + } + String result = Joiner.on("/").join(segments); + if (log.isTraceEnabled() && !result.equals(path)) log.trace("Quietly changing '{}' to '{}'", path, result); + return result; + } + + /** + * Checks whether a file system path is absolute in a platform neutral way. + * <p> + * As a consequence of the platform neutrality some edge cases are + * not handled correctly: + * <ul> + * <li>On Windows relative paths starting with slash (either forward or backward) or ~ are treated as absolute. + * <li>On UNIX relative paths starting with X:/ are treated as absolute. + * </ul> + * + * @param path A string representing a file system path. + * @return whether the path is absolute under the above constraints. + */ + public static boolean isAbsolutish(String path) { + return + path.codePointAt(0) == SEPARATOR_UNIX || + path.equals("~") || path.startsWith("~" + SEPARATOR_UNIX) || + path.length()>=3 && path.codePointAt(1) == ':' && + isSeparator(path.codePointAt(2)); + } + + /** @deprecated since 0.7.0, use {@link #isAbsolutish(String)} */ + @Deprecated + public static boolean isAbsolute(String path) { + return isAbsolutish(path); + } + + private static boolean isSeparator(int sep) { + return sep == SEPARATOR_UNIX || + sep == SEPARATOR_WIN; + } + + public static String fromHome(String path) { + return new File(Os.home(), path).getAbsolutePath(); + } + + public static String nativePath(String path) { + return new File(path).getPath(); + } + + public static boolean isMicrosoftWindows() { + String os = System.getProperty("os.name").toLowerCase(); + //see org.apache.commons.lang.SystemUtils.IS_WINDOWS + return os.startsWith("windows"); + } + + /** creates a private temp file which will be deleted on exit; + * either prefix or ext may be null; + * if ext is non-empty and not > 4 chars and not starting with a ., then a dot will be inserted */ + public static File newTempFile(String prefix, String ext) { + String sanitizedPrefix = (Strings.isNonEmpty(prefix) ? Strings.makeValidFilename(prefix) + "-" : ""); + String extWithPrecedingSeparator = (Strings.isNonEmpty(ext) ? ext.startsWith(".") || ext.length()>4 ? ext : "."+ext : ""); + try { + File tempFile = File.createTempFile(sanitizedPrefix, extWithPrecedingSeparator, new File(tmp())); + tempFile.deleteOnExit(); + return tempFile; + } catch (IOException e) { + throw Exceptions.propagate(e); + } + } + + /** as {@link #newTempFile(String, String)} using the class as the basis for a prefix */ + public static File newTempFile(Class<?> clazz, String ext) { + return newTempFile(JavaClassNames.cleanSimpleClassName(clazz), ext); + } + + /** creates a temp dir which will be deleted on exit */ + public static File newTempDir(String prefix) { + String sanitizedPrefix = (prefix==null ? "" : prefix + "-"); + String tmpParent = tmp(); + + //With lots of stale temp dirs it is possible to have + //name collisions so we need to retry until a unique + //name is found + for (int i = 0; i < TEMP_DIR_ATTEMPTS; i++) { + String baseName = sanitizedPrefix + Identifiers.makeRandomId(4); + File tempDir = new File(tmpParent, baseName); + if (!tempDir.exists()) { + if (tempDir.mkdir()) { + Os.deleteOnExitRecursively(tempDir); + return tempDir; + } else { + log.warn("Attempt to create temp dir failed " + tempDir + ". Either an IO error (disk full, no rights) or someone else created the folder after the !exists() check."); + } + } else { + log.debug("Attempt to create temp dir failed, already exists " + tempDir + ". With ID of length 4 it is not unusual (15% chance) to have duplicate names at the 2000 samples mark."); + } + } + throw new IllegalStateException("cannot create temporary folders in parent " + tmpParent + " after " + TEMP_DIR_ATTEMPTS + " attempts."); + } + + /** as {@link #newTempDir(String)}, using the class as the basis for a prefix */ + public static File newTempDir(Class<?> clazz) { + return newTempDir(JavaClassNames.cleanSimpleClassName(clazz)); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/cf2f7a93/utils/common/src/main/java/org/apache/brooklyn/util/pool/BasicPool.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/pool/BasicPool.java b/utils/common/src/main/java/org/apache/brooklyn/util/pool/BasicPool.java new file mode 100644 index 0000000..8efa316 --- /dev/null +++ b/utils/common/src/main/java/org/apache/brooklyn/util/pool/BasicPool.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.pool; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.util.stream.Streams; +import org.apache.brooklyn.util.text.Identifiers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +public class BasicPool<T> implements Pool<T> { + + // TODO Implement expiry of pooled resources + + private static final Logger LOG = LoggerFactory.getLogger(BasicPool.class); + + public static <T> Builder<T> builder() { + return new Builder<T>(); + } + + public static class Builder<T> { + private String name; + private Supplier<? extends T> supplier; + private Predicate<? super T> viabilityChecker = Predicates.alwaysTrue(); + private Function<? super T, ?> closer = Functions.identity(); + + public Builder<T> name(String val) { + this.name = val; + return this; + } + + public Builder<T> supplier(Supplier<? extends T> val) { + this.supplier = val; + return this; + } + + public Builder<T> viabilityChecker(Predicate<? super T> val) { + this.viabilityChecker = val; + return this; + } + + public Builder<T> closer(Function<? super T, ?> val) { + this.closer = val; + return this; + } + + public BasicPool<T> build() { + return new BasicPool<T>(this); + } + } + + private final String name; + private final Supplier<? extends T> supplier; + private final Predicate<? super T> viabilityChecker; + private Function<? super T, ?> closer; + private final Deque<T> pool = Lists.newLinkedList(); + private AtomicBoolean closed = new AtomicBoolean(false); + + private AtomicInteger currentLeasedCount = new AtomicInteger(0); + private AtomicInteger totalLeasedCount = new AtomicInteger(0); + private AtomicInteger totalCreatedCount = new AtomicInteger(0); + private AtomicInteger totalClosedCount = new AtomicInteger(0); + + private BasicPool(Builder<T> builder) { + this.name = (builder.name != null) ? "Pool("+builder.name+")" : "Pool-"+Identifiers.makeRandomId(8); + this.supplier = checkNotNull(builder.supplier, "supplier"); + this.viabilityChecker = checkNotNull(builder.viabilityChecker, "viabilityChecker"); + this.closer = checkNotNull(builder.closer, closer); + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("name", name).toString(); + } + + @Override + public Lease<T> leaseObject() { + totalLeasedCount.incrementAndGet(); + T existing; + do { + existing = null; + synchronized (pool) { + if (closed.get()) { + throw new IllegalStateException("Pool closed for "+this); + } + if (pool.size() > 0) { + existing = pool.removeLast(); + } + } + + if (existing != null) { + if (viabilityChecker.apply(existing)) { + currentLeasedCount.incrementAndGet(); + if (LOG.isTraceEnabled()) LOG.trace("{} reusing existing pool entry {} ({})", new Object[] {this, existing, getMetrics()}); + return new BasicLease(existing); + } else { + totalClosedCount.incrementAndGet(); + if (LOG.isDebugEnabled()) LOG.debug("{} not reusing entry {} as no longer viable; discarding and trying again", this, existing); + closer.apply(existing); + } + } + } while (existing != null); + + T result = supplier.get(); + totalCreatedCount.incrementAndGet(); + currentLeasedCount.incrementAndGet(); + if (LOG.isDebugEnabled()) LOG.debug("{} acquired and returning new entry {} ({})", new Object[] {this, result, getMetrics()}); + return new BasicLease(result); + } + + @Override + public <R> R exec(Function<? super T,R> receiver) { + Lease<T> lease = leaseObject(); + try { + if (LOG.isTraceEnabled()) LOG.trace("{} executing {} with leasee {}", new Object[] {this, receiver, lease.leasedObject()}); + return receiver.apply(lease.leasedObject()); + } finally { + Streams.closeQuietly(lease); + } + } + + @Override + public void close() throws IOException { + synchronized (pool) { + if (LOG.isDebugEnabled()) LOG.debug("{} closing, with {} resources ({})", new Object[] {this, pool.size(), getMetrics()}); + closed.set(true); + for (T resource : pool) { + totalClosedCount.incrementAndGet(); + closer.apply(resource); + } + pool.clear(); + } + + } + + private void returnLeasee(T val) { + currentLeasedCount.decrementAndGet(); + synchronized (pool) { + if (closed.get()) { + totalClosedCount.incrementAndGet(); + if (LOG.isDebugEnabled()) LOG.debug("{} closing returned leasee {}, because pool closed ({})", new Object[] {this, val, getMetrics()}); + closer.apply(val); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} adding {} back into pool ({})", new Object[] {this, val, getMetrics()}); + pool.addLast(val); + } + } + } + + private String getMetrics() { + return String.format("currentLeased=%s; totalLeased=%s; totalCreated=%s; totalClosed=%s", + currentLeasedCount, totalLeasedCount, totalCreatedCount, totalClosedCount); + + } + private class BasicLease implements Lease<T> { + private final T val; + + BasicLease(T val) { + this.val = val; + } + + @Override + public T leasedObject() { + return val; + } + + @Override + public void close() { + BasicPool.this.returnLeasee(val); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/cf2f7a93/utils/common/src/main/java/org/apache/brooklyn/util/pool/Lease.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/pool/Lease.java b/utils/common/src/main/java/org/apache/brooklyn/util/pool/Lease.java new file mode 100644 index 0000000..38eee26 --- /dev/null +++ b/utils/common/src/main/java/org/apache/brooklyn/util/pool/Lease.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.pool; + +import java.io.Closeable; +import java.io.IOException; + +public interface Lease<T> extends Closeable { + + T leasedObject(); + + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/cf2f7a93/utils/common/src/main/java/org/apache/brooklyn/util/pool/Pool.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/pool/Pool.java b/utils/common/src/main/java/org/apache/brooklyn/util/pool/Pool.java new file mode 100644 index 0000000..6575038 --- /dev/null +++ b/utils/common/src/main/java/org/apache/brooklyn/util/pool/Pool.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.pool; + +import java.io.Closeable; +import java.io.IOException; + +import com.google.common.base.Function; + +/** + * See discussion at http://code.google.com/p/guava-libraries/issues/detail?id=683. + * This API is inspired by that proposed by [email protected] + * + * There are two ways to use the pool. + * + * Passive: + * + * <pre> + * {@code + * Pool<Expensive> pool = ... + * Lease<Expensive> lease = pool.leaseObject(); + * try { + * Expensive o = lease.leasedObject(); + * doSomethingWith(o); + * } finally { + * lease.close(); + * } + * } + * </pre> + * + * Or active: + * + * <pre> + * {@code + * Pool<Expensive> pool = ... + * pool.exec( + * new Function<Expensive,Void>() { + * public Void apply(Expensive o) { + * doSomethingWith(o); + * return null; + * } + * }); + * } + * </pre> + * + * @see BasicPool + * + * @author aled + */ +public interface Pool<T> extends Closeable { + + Lease<T> leaseObject(); + + <R> R exec(Function<? super T,R> receiver); + + @Override + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/cf2f7a93/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java b/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java new file mode 100644 index 0000000..d8ac013 --- /dev/null +++ b/utils/common/src/main/java/org/apache/brooklyn/util/repeat/Repeater.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.repeat; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.ReferenceWithError; +import org.apache.brooklyn.util.repeat.Repeater; +import org.apache.brooklyn.util.time.CountdownTimer; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Callables; + +/** + * Simple DSL to repeat a fragment of code periodically until a condition is satisfied. + * + * In its simplest case, it is passed two {@link groovy.lang.Closure}s / {@link Callable} - + * the first is executed, then the second. If the second closure returns false, the loop + * is repeated; if true, it finishes. Further customization can be applied to set the period + * between loops and place a maximum limit on how long the loop should run for. + * <p> + * It is configured in a <em>fluent</em> manner. For example, in Groovy: + * <pre> + * {@code + * Repeater.create("Wait until the Frobnitzer is ready") + * .repeat { + * status = frobnitzer.getStatus() + * } + * .until { + * status == "Ready" || status == "Failed" + * } + * .limitIterationsTo(30) + * .run() + * } + * </pre> + * + * Or in Java: + * <pre> + * {@code + * Repeater.create("Wait until the Frobnitzer is ready") + * .until(new Callable<Boolean>() { + * public Boolean call() { + * String status = frobnitzer.getStatus() + * return "Ready".equals(status) || "Failed".equals(status); + * }}) + * .limitIterationsTo(30) + * .run() + * } + * </pre> + */ +public class Repeater { + + private static final Logger log = LoggerFactory.getLogger(Repeater.class); + + /** A small initial duration that something should wait between repeats, + * e.g. when doing {@link #backoffTo(Duration)}. + * <p> + * Chosen to be small enough that a user won't notice at all, + * but we're not going to be chewing up CPU while waiting. */ + public static final Duration DEFAULT_REAL_QUICK_PERIOD = Duration.millis(10); + + private final String description; + private Callable<?> body = Callables.returning(null); + private Callable<Boolean> exitCondition; + private Function<? super Integer,Duration> delayOnIteration = null; + private Duration timeLimit = null; + private int iterationLimit = 0; + private boolean rethrowException = false; + private boolean rethrowExceptionImmediately = false; + private boolean warnOnUnRethrownException = true; + + public Repeater() { + this(null); + } + + /** + * Construct a new instance of Repeater. + * + * @param description a description of the operation that will appear in debug logs. + */ + public Repeater(String description) { + this.description = description != null ? description : "Repeater"; + } + + public static Repeater create() { + return create(null); + } + public static Repeater create(String description) { + return new Repeater(description); + } + + /** + * Sets the main body of the loop to be a no-op; useful if using {@link #until(Callable)} instead + * + * @return {@literal this} to aid coding in a fluent style. + * @deprecated since 0.7.0 this is no-op, as the repeater defaults to repeating nothing, simply remove the call, + * using just <code>Repeater.until(...)</code>. + */ + public Repeater repeat() { + return repeat(Callables.returning(null)); + } + + /** + * Sets the main body of the loop. + * + * @param body a closure or other Runnable that is executed in the main body of the loop. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater repeat(Runnable body) { + checkNotNull(body, "body must not be null"); + this.body = (body instanceof Callable) ? (Callable<?>)body : Executors.callable(body); + return this; + } + + /** + * Sets the main body of the loop. + * + * @param body a closure or other Callable that is executed in the main body of the loop. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater repeat(Callable<?> body) { + checkNotNull(body, "body must not be null"); + this.body = body; + return this; + } + + /** + * Set how long to wait between loop iterations. + * + * @param period how long to wait between loop iterations. + * @param unit the unit of measurement of the period. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater every(long period, TimeUnit unit) { + return every(Duration.of(period, unit)); + } + + /** + * Set how long to wait between loop iterations, as a constant function in {@link #delayOnIteration} + */ + public Repeater every(Duration duration) { + Preconditions.checkNotNull(duration, "duration must not be null"); + Preconditions.checkArgument(duration.toMilliseconds()>0, "period must be positive: %s", duration); + return delayOnIteration(Functions.constant(duration)); + } + + public Repeater every(groovy.time.Duration duration) { + return every(Duration.of(duration)); + } + + /** sets a function which determines how long to delay on a given iteration between checks, + * with 0 being mapped to the initial delay (after the initial check) */ + public Repeater delayOnIteration(Function<? super Integer,Duration> delayFunction) { + Preconditions.checkNotNull(delayFunction, "delayFunction must not be null"); + this.delayOnIteration = delayFunction; + return this; + } + + /** sets the {@link #delayOnIteration(Function)} function to be an exponential backoff as follows: + * @param initialDelay the delay on the first iteration, after the initial check + * @param multiplier the rate at which to increase the loop delay, must be >= 1 + * @param finalDelay an optional cap on the loop delay */ + public Repeater backoff(final Duration initialDelay, final double multiplier, @Nullable final Duration finalDelay) { + Preconditions.checkNotNull(initialDelay, "initialDelay"); + Preconditions.checkArgument(multiplier>=1.0, "multiplier >= 1.0"); + return delayOnIteration(new Function<Integer, Duration>() { + @Override + public Duration apply(Integer iteration) { + /* we iterate because otherwise we risk overflow errors by using multiplier^iteration; + * e.g. with: + * return Duration.min(initialDelay.multiply(Math.pow(multiplier, iteration)), finalDelay); */ + Duration result = initialDelay; + for (int i=0; i<iteration; i++) { + result = result.multiply(multiplier); + if (finalDelay!=null && result.compareTo(finalDelay)>0) + return finalDelay; + } + return result; + } + }); + } + + /** convenience to start with a 10ms delay and exponentially back-off at a rate of 1.2 + * up to a max per-iteration delay as supplied here. + * 1.2 chosen because it decays nicely, going from 10ms to 1s in approx 25 iterations totalling 5s elapsed time. */ + public Repeater backoffTo(final Duration finalDelay) { + return backoff(Duration.millis(10), 1.2, finalDelay); + } + + /** + * Set code fragment that tests if the loop has completed. + * + * @param exitCondition a closure or other Callable that returns a boolean. If this code returns {@literal true} then the + * loop will stop executing. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater until(Callable<Boolean> exitCondition) { + Preconditions.checkNotNull(exitCondition, "exitCondition must not be null"); + this.exitCondition = exitCondition; + return this; + } + + public <T> Repeater until(final T target, final Predicate<T> exitCondition) { + Preconditions.checkNotNull(exitCondition, "exitCondition must not be null"); + return until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return exitCondition.apply(target); + } + }); + } + + /** + * If the exit condition check throws an exception, it will be recorded and the last exception will be thrown on failure. + * + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater rethrowException() { + this.rethrowException = true; + return this; + } + + /** + * If the repeated body or the exit condition check throws an exception, then propagate that exception immediately. + * + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater rethrowExceptionImmediately() { + this.rethrowExceptionImmediately = true; + return this; + } + + public Repeater suppressWarnings() { + this.warnOnUnRethrownException = false; + return this; + } + + /** + * Set the maximum number of iterations. + * + * The loop will exit if the condition has not been satisfied after this number of iterations. + * + * @param iterationLimit the maximum number of iterations. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater limitIterationsTo(int iterationLimit) { + Preconditions.checkArgument(iterationLimit > 0, "iterationLimit must be positive: %s", iterationLimit); + this.iterationLimit = iterationLimit; + return this; + } + + /** + * @see #limitTimeTo(Duration) + * + * @param deadline the time that the loop should wait. + * @param unit the unit of measurement of the period. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater limitTimeTo(long deadline, TimeUnit unit) { + return limitTimeTo(Duration.of(deadline, unit)); + } + + /** + * Set the amount of time to wait for the condition. + * The repeater will wait at least this long for the condition to be true, + * and will exit soon after even if the condition is false. + */ + public Repeater limitTimeTo(Duration duration) { + Preconditions.checkNotNull(duration, "duration must not be null"); + Preconditions.checkArgument(duration.toMilliseconds() > 0, "deadline must be positive: %s", duration); + this.timeLimit = duration; + return this; + } + + /** + * Run the loop. + * + * @return true if the exit condition was satisfied; false if the loop terminated for any other reason. + */ + public boolean run() { + return runKeepingError().getWithoutError(); + } + + public void runRequiringTrue() { + Stopwatch timer = Stopwatch.createStarted(); + ReferenceWithError<Boolean> result = runKeepingError(); + result.checkNoError(); + if (!result.get()) + throw new IllegalStateException(description+" unsatisfied after "+Duration.of(timer)); + } + + public ReferenceWithError<Boolean> runKeepingError() { + Preconditions.checkState(body != null, "repeat() method has not been called to set the body"); + Preconditions.checkState(exitCondition != null, "until() method has not been called to set the exit condition"); + Preconditions.checkState(delayOnIteration != null, "every() method (or other delaySupplier() / backoff() method) has not been called to set the loop delay"); + + Throwable lastError = null; + int iterations = 0; + CountdownTimer timer = timeLimit!=null ? CountdownTimer.newInstanceStarted(timeLimit) : CountdownTimer.newInstancePaused(Duration.PRACTICALLY_FOREVER); + + while (true) { + Duration delayThisIteration = delayOnIteration.apply(iterations); + iterations++; + + try { + body.call(); + } catch (Exception e) { + log.warn(description, e); + if (rethrowExceptionImmediately) throw Exceptions.propagate(e); + } + + boolean done = false; + try { + lastError = null; + done = exitCondition.call(); + } catch (Exception e) { + if (log.isDebugEnabled()) log.debug(description, e); + lastError = e; + if (rethrowExceptionImmediately) throw Exceptions.propagate(e); + } + if (done) { + if (log.isDebugEnabled()) log.debug("{}: condition satisfied", description); + return ReferenceWithError.newInstanceWithoutError(true); + } else { + if (log.isDebugEnabled()) { + String msg = String.format("%s: unsatisfied during iteration %s %s", description, iterations, + (iterationLimit > 0 ? "(max "+iterationLimit+" attempts)" : "") + + (timer.isRunning() ? "("+Time.makeTimeStringRounded(timer.getDurationRemaining())+" remaining)" : "")); + if (iterations == 1) { + log.debug(msg); + } else { + log.trace(msg); + } + } + } + + if (iterationLimit > 0 && iterations >= iterationLimit) { + if (log.isDebugEnabled()) log.debug("{}: condition not satisfied and exceeded iteration limit", description); + if (rethrowException && lastError != null) { + log.warn("{}: error caught checking condition (rethrowing): {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + if (warnOnUnRethrownException && lastError != null) + log.warn("{}: error caught checking condition: {}", description, lastError.getMessage()); + return ReferenceWithError.newInstanceMaskingError(false, lastError); + } + + if (timer.isExpired()) { + if (log.isDebugEnabled()) log.debug("{}: condition not satisfied, with {} elapsed (limit {})", + new Object[] { description, Time.makeTimeStringRounded(timer.getDurationElapsed()), Time.makeTimeStringRounded(timeLimit) }); + if (rethrowException && lastError != null) { + log.error("{}: error caught checking condition: {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + return ReferenceWithError.newInstanceMaskingError(false, lastError); + } + + Time.sleep(delayThisIteration); + } + } + + public String getDescription() { + return description; + } + + public Duration getTimeLimit() { + return timeLimit; + } + +}
