http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java b/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java new file mode 100644 index 0000000..849a33b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.osgi; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.jar.Attributes; +import java.util.jar.JarInputStream; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import javax.annotation.Nullable; + +import org.apache.felix.framework.FrameworkFactory; +import org.apache.felix.framework.util.StringMap; +import org.apache.felix.framework.util.manifestparser.ManifestParser; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleException; +import org.osgi.framework.Constants; +import org.osgi.framework.Version; +import org.osgi.framework.launch.Framework; +import org.osgi.framework.namespace.PackageNamespace; +import org.osgi.framework.wiring.BundleCapability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; +import org.apache.brooklyn.core.util.ResourceUtils; +import org.apache.brooklyn.core.util.osgi.Osgis; + +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.ReferenceWithError; +import brooklyn.util.guava.Maybe; +import brooklyn.util.net.Urls; +import brooklyn.util.os.Os; +import brooklyn.util.stream.Streams; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Stopwatch; + +/** + * utilities for working with osgi. + * osgi support is in early days (June 2014) so this class is beta, subject to change, + * particularly in how framework is started and bundles installed. + * + * @since 0.7.0 */ +@Beta +public class Osgis { + private static final Logger LOG = LoggerFactory.getLogger(Osgis.class); + + private static final String EXTENSION_PROTOCOL = "system"; + private static final String MANIFEST_PATH = "META-INF/MANIFEST.MF"; + private static final Set<String> SYSTEM_BUNDLES = MutableSet.of(); + + public static class VersionedName { + private final String symbolicName; + private final Version version; + public VersionedName(Bundle b) { + this.symbolicName = b.getSymbolicName(); + this.version = b.getVersion(); + } + public VersionedName(String symbolicName, Version version) { + this.symbolicName = symbolicName; + this.version = version; + } + @Override public String toString() { + return symbolicName + ":" + Strings.toString(version); + } + public boolean equals(String sn, String v) { + return symbolicName.equals(sn) && (version == null && v == null || version != null && version.toString().equals(v)); + } + public boolean equals(String sn, Version v) { + return symbolicName.equals(sn) && (version == null && v == null || version != null && version.equals(v)); + } + public String getSymbolicName() { + return symbolicName; + } + public Version getVersion() { + return version; + } + @Override + public int hashCode() { + return Objects.hashCode(symbolicName, version); + } + @Override + public boolean equals(Object other) { + if (!(other instanceof VersionedName)) return false; + VersionedName o = (VersionedName) other; + return Objects.equal(symbolicName, o.symbolicName) && Objects.equal(version, o.version); + } + } + + public static class BundleFinder { + protected final Framework framework; + protected String symbolicName; + protected String version; + protected String url; + protected boolean urlMandatory = false; + protected final List<Predicate<? super Bundle>> predicates = MutableList.of(); + + protected BundleFinder(Framework framework) { + this.framework = framework; + } + + public BundleFinder symbolicName(String symbolicName) { + this.symbolicName = symbolicName; + return this; + } + + public BundleFinder version(String version) { + this.version = version; + return this; + } + + public BundleFinder id(String symbolicNameOptionallyWithVersion) { + if (Strings.isBlank(symbolicNameOptionallyWithVersion)) + return this; + + Maybe<VersionedName> nv = parseOsgiIdentifier(symbolicNameOptionallyWithVersion); + if (nv.isAbsent()) + throw new IllegalArgumentException("Cannot parse symbolic-name:version string '"+symbolicNameOptionallyWithVersion+"'"); + + return id(nv.get()); + } + + private BundleFinder id(VersionedName nv) { + symbolicName(nv.getSymbolicName()); + if (nv.getVersion() != null) { + version(nv.getVersion().toString()); + } + return this; + } + + public BundleFinder bundle(CatalogBundle bundle) { + if (bundle.isNamed()) { + symbolicName(bundle.getSymbolicName()); + version(bundle.getVersion()); + } + if (bundle.getUrl() != null) { + requiringFromUrl(bundle.getUrl()); + } + return this; + } + + /** Looks for a bundle matching the given URL; + * unlike {@link #requiringFromUrl(String)} however, if the URL does not match any bundles + * it will return other matching bundles <i>if</if> a {@link #symbolicName(String)} is specified. + */ + public BundleFinder preferringFromUrl(String url) { + this.url = url; + urlMandatory = false; + return this; + } + + /** Requires the bundle to have the given URL set as its location. */ + public BundleFinder requiringFromUrl(String url) { + this.url = url; + urlMandatory = true; + return this; + } + + /** Finds the best matching bundle. */ + public Maybe<Bundle> find() { + return findOne(false); + } + + /** Finds the matching bundle, requiring it to be unique. */ + public Maybe<Bundle> findUnique() { + return findOne(true); + } + + protected Maybe<Bundle> findOne(boolean requireExactlyOne) { + if (symbolicName==null && url==null) + throw new IllegalStateException(this+" must be given either a symbolic name or a URL"); + + List<Bundle> result = findAll(); + if (result.isEmpty()) + return Maybe.absent("No bundle matching "+getConstraintsDescription()); + if (requireExactlyOne && result.size()>1) + return Maybe.absent("Multiple bundles ("+result.size()+") matching "+getConstraintsDescription()); + + return Maybe.of(result.get(0)); + } + + /** Finds all matching bundles, in decreasing version order. */ + public List<Bundle> findAll() { + boolean urlMatched = false; + List<Bundle> result = MutableList.of(); + for (Bundle b: framework.getBundleContext().getBundles()) { + if (symbolicName!=null && !symbolicName.equals(b.getSymbolicName())) continue; + if (version!=null && !Version.parseVersion(version).equals(b.getVersion())) continue; + for (Predicate<? super Bundle> predicate: predicates) { + if (!predicate.apply(b)) continue; + } + + // check url last, because if it isn't mandatory we should only clear if we find a url + // for which the other items also match + if (url!=null) { + boolean matches = url.equals(b.getLocation()); + if (urlMandatory) { + if (!matches) continue; + else urlMatched = true; + } else { + if (matches) { + if (!urlMatched) { + result.clear(); + urlMatched = true; + } + } else { + if (urlMatched) { + // can't use this bundle as we have previously found a preferred bundle, with a matching url + continue; + } + } + } + } + + result.add(b); + } + + if (symbolicName==null && url!=null && !urlMatched) { + // if we only "preferred" the url, and we did not match it, and we did not have a symbolic name, + // then clear the results list! + result.clear(); + } + + Collections.sort(result, new Comparator<Bundle>() { + @Override + public int compare(Bundle o1, Bundle o2) { + return o2.getVersion().compareTo(o1.getVersion()); + } + }); + + return result; + } + + public String getConstraintsDescription() { + List<String> parts = MutableList.of(); + if (symbolicName!=null) parts.add("symbolicName="+symbolicName); + if (version!=null) parts.add("version="+version); + if (url!=null) + parts.add("url["+(urlMandatory ? "required" : "preferred")+"]="+url); + if (!predicates.isEmpty()) + parts.add("predicates="+predicates); + return Joiner.on(";").join(parts); + } + + public String toString() { + return getClass().getCanonicalName()+"["+getConstraintsDescription()+"]"; + } + + public BundleFinder version(final Predicate<Version> versionPredicate) { + return satisfying(new Predicate<Bundle>() { + @Override + public boolean apply(Bundle input) { + return versionPredicate.apply(input.getVersion()); + } + }); + } + + public BundleFinder satisfying(Predicate<? super Bundle> predicate) { + predicates.add(predicate); + return this; + } + } + + public static BundleFinder bundleFinder(Framework framework) { + return new BundleFinder(framework); + } + + /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated + public static List<Bundle> getBundlesByName(Framework framework, String symbolicName, Predicate<Version> versionMatcher) { + return bundleFinder(framework).symbolicName(symbolicName).version(versionMatcher).findAll(); + } + + /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated + public static List<Bundle> getBundlesByName(Framework framework, String symbolicName) { + return bundleFinder(framework).symbolicName(symbolicName).findAll(); + } + + /** + * Tries to find a bundle in the given framework with name matching either `name' or `name:version'. + * @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated + public static Maybe<Bundle> getBundle(Framework framework, String symbolicNameOptionallyWithVersion) { + return bundleFinder(framework).id(symbolicNameOptionallyWithVersion).find(); + } + + /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated + public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, String version) { + return bundleFinder(framework).symbolicName(symbolicName).version(version).find(); + } + + /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated + public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, Version version) { + return bundleFinder(framework).symbolicName(symbolicName).version(Predicates.equalTo(version)).findUnique(); + } + + // -------- creating + + /* + * loading framework factory and starting framework based on: + * http://felix.apache.org/documentation/subprojects/apache-felix-framework/apache-felix-framework-launching-and-embedding.html + */ + + public static FrameworkFactory newFrameworkFactory() { + URL url = Osgis.class.getClassLoader().getResource( + "META-INF/services/org.osgi.framework.launch.FrameworkFactory"); + if (url != null) { + try { + BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream())); + try { + for (String s = br.readLine(); s != null; s = br.readLine()) { + s = s.trim(); + // load the first non-empty, non-commented line + if ((s.length() > 0) && (s.charAt(0) != '#')) { + return (FrameworkFactory) Class.forName(s).newInstance(); + } + } + } finally { + if (br != null) br.close(); + } + } catch (Exception e) { + // class creation exceptions are not interesting to caller... + throw Exceptions.propagate(e); + } + } + throw new IllegalStateException("Could not find framework factory."); + } + + public static Framework newFrameworkStarted(String felixCacheDir, boolean clean, Map<?,?> extraStartupConfig) { + Map<Object,Object> cfg = MutableMap.copyOf(extraStartupConfig); + if (clean) cfg.put(Constants.FRAMEWORK_STORAGE_CLEAN, "onFirstInit"); + if (felixCacheDir!=null) cfg.put(Constants.FRAMEWORK_STORAGE, felixCacheDir); + cfg.put(Constants.FRAMEWORK_BSNVERSION, Constants.FRAMEWORK_BSNVERSION_MULTIPLE); + FrameworkFactory factory = newFrameworkFactory(); + + Stopwatch timer = Stopwatch.createStarted(); + Framework framework = factory.newFramework(cfg); + try { + framework.init(); + installBootBundles(framework); + framework.start(); + } catch (Exception e) { + // framework bundle start exceptions are not interesting to caller... + throw Exceptions.propagate(e); + } + LOG.debug("System bundles are: "+SYSTEM_BUNDLES); + LOG.debug("OSGi framework started in " + Duration.of(timer)); + return framework; + } + + private static void installBootBundles(Framework framework) { + Stopwatch timer = Stopwatch.createStarted(); + LOG.debug("Installing OSGi boot bundles from "+Osgis.class.getClassLoader()+"..."); + Enumeration<URL> resources; + try { + resources = Osgis.class.getClassLoader().getResources(MANIFEST_PATH); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + BundleContext bundleContext = framework.getBundleContext(); + Map<String, Bundle> installedBundles = getInstalledBundlesById(bundleContext); + while(resources.hasMoreElements()) { + URL url = resources.nextElement(); + ReferenceWithError<?> installResult = installExtensionBundle(bundleContext, url, installedBundles, getVersionedId(framework)); + if (installResult.hasError() && !installResult.masksErrorIfPresent()) { + // it's reported as a critical error, so warn here + LOG.warn("Unable to install manifest from "+url+": "+installResult.getError(), installResult.getError()); + } else { + Object result = installResult.getWithoutError(); + if (result instanceof Bundle) { + String v = getVersionedId( (Bundle)result ); + SYSTEM_BUNDLES.add(v); + if (installResult.hasError()) { + LOG.debug(installResult.getError().getMessage()+(result!=null ? " ("+result+"/"+v+")" : "")); + } else { + LOG.debug("Installed "+v+" from "+url); + } + } else if (installResult.hasError()) { + LOG.debug(installResult.getError().getMessage()); + } + } + } + LOG.debug("Installed OSGi boot bundles in "+Time.makeTimeStringRounded(timer)+": "+Arrays.asList(framework.getBundleContext().getBundles())); + } + + private static Map<String, Bundle> getInstalledBundlesById(BundleContext bundleContext) { + Map<String, Bundle> installedBundles = new HashMap<String, Bundle>(); + Bundle[] bundles = bundleContext.getBundles(); + for (Bundle b : bundles) { + installedBundles.put(getVersionedId(b), b); + } + return installedBundles; + } + + /** Wraps the bundle if successful or already installed, wraps TRUE if it's the system entry, + * wraps null if the bundle is already installed from somewhere else; + * in all these cases <i>masking</i> an explanatory error if already installed or it's the system entry. + * <p> + * Returns an instance wrapping null and <i>throwing</i> an error if the bundle could not be installed. + */ + private static ReferenceWithError<?> installExtensionBundle(BundleContext bundleContext, URL manifestUrl, Map<String, Bundle> installedBundles, String frameworkVersionedId) { + //ignore http://felix.extensions:9/ system entry + if("felix.extensions".equals(manifestUrl.getHost())) + return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Skipping install of internal extension bundle from "+manifestUrl)); + + try { + Manifest manifest = readManifest(manifestUrl); + if (!isValidBundle(manifest)) + return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Resource at "+manifestUrl+" is not an OSGi bundle: no valid manifest")); + + String versionedId = getVersionedId(manifest); + URL bundleUrl = ResourceUtils.getContainerUrl(manifestUrl, MANIFEST_PATH); + + Bundle existingBundle = installedBundles.get(versionedId); + if (existingBundle != null) { + if (!bundleUrl.equals(existingBundle.getLocation()) && + //the framework bundle is always pre-installed, don't display duplicate info + !versionedId.equals(frameworkVersionedId)) { + return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Bundle "+versionedId+" (from manifest " + manifestUrl + ") is already installed, from " + existingBundle.getLocation())); + } + return ReferenceWithError.newInstanceMaskingError(existingBundle, new IllegalArgumentException("Bundle "+versionedId+" from manifest " + manifestUrl + " is already installed")); + } + + byte[] jar = buildExtensionBundle(manifest); + LOG.debug("Installing boot bundle " + bundleUrl); + //mark the bundle as extension so we can detect it later using the "system:" protocol + //(since we cannot access BundleImpl.isExtension) + Bundle newBundle = bundleContext.installBundle(EXTENSION_PROTOCOL + ":" + bundleUrl.toString(), new ByteArrayInputStream(jar)); + installedBundles.put(versionedId, newBundle); + return ReferenceWithError.newInstanceWithoutError(newBundle); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + return ReferenceWithError.newInstanceThrowingError(null, + new IllegalStateException("Problem installing extension bundle " + manifestUrl + ": "+e, e)); + } + } + + private static Manifest readManifest(URL manifestUrl) throws IOException { + Manifest manifest; + InputStream in = null; + try { + in = manifestUrl.openStream(); + manifest = new Manifest(in); + } finally { + if (in != null) { + try {in.close();} + catch (Exception e) {}; + } + } + return manifest; + } + + private static byte[] buildExtensionBundle(Manifest manifest) throws IOException { + Attributes atts = manifest.getMainAttributes(); + + //the following properties are invalid in extension bundles + atts.remove(new Attributes.Name(Constants.IMPORT_PACKAGE)); + atts.remove(new Attributes.Name(Constants.REQUIRE_BUNDLE)); + atts.remove(new Attributes.Name(Constants.BUNDLE_NATIVECODE)); + atts.remove(new Attributes.Name(Constants.DYNAMICIMPORT_PACKAGE)); + atts.remove(new Attributes.Name(Constants.BUNDLE_ACTIVATOR)); + + //mark as extension bundle + atts.putValue(Constants.FRAGMENT_HOST, "system.bundle; extension:=framework"); + + //create the jar containing the manifest + ByteArrayOutputStream jar = new ByteArrayOutputStream(); + JarOutputStream out = new JarOutputStream(jar, manifest); + out.close(); + return jar.toByteArray(); + } + + private static boolean isValidBundle(Manifest manifest) { + Attributes atts = manifest.getMainAttributes(); + return atts.containsKey(new Attributes.Name(Constants.BUNDLE_MANIFESTVERSION)); + } + + private static String getVersionedId(Bundle b) { + return b.getSymbolicName() + ":" + b.getVersion(); + } + + private static String getVersionedId(Manifest manifest) { + Attributes atts = manifest.getMainAttributes(); + return atts.getValue(Constants.BUNDLE_SYMBOLICNAME) + ":" + + atts.getValue(Constants.BUNDLE_VERSION); + } + + /** + * Installs a bundle from the given URL, doing a check if already installed, and + * using the {@link ResourceUtils} loader for this project (brooklyn core) + */ + public static Bundle install(Framework framework, String url) throws BundleException { + boolean isLocal = isLocalUrl(url); + String localUrl = url; + if (!isLocal) { + localUrl = cacheFile(url); + } + + try { + Bundle bundle = getInstalledBundle(framework, localUrl); + if (bundle != null) { + return bundle; + } + + // use our URL resolution so we get classpath items + LOG.debug("Installing bundle into {} from url: {}", framework, url); + InputStream stream = getUrlStream(localUrl); + Bundle installedBundle = framework.getBundleContext().installBundle(url, stream); + + return installedBundle; + } finally { + if (!isLocal) { + try { + new File(new URI(localUrl)).delete(); + } catch (URISyntaxException e) { + throw Exceptions.propagate(e); + } + } + } + } + + private static String cacheFile(String url) { + InputStream in = getUrlStream(url); + File cache = Os.writeToTempFile(in, "bundle-cache", "jar"); + return cache.toURI().toString(); + } + + private static boolean isLocalUrl(String url) { + String protocol = Urls.getProtocol(url); + return "file".equals(protocol) || + "classpath".equals(protocol) || + "jar".equals(protocol); + } + + private static Bundle getInstalledBundle(Framework framework, String url) { + Bundle bundle = framework.getBundleContext().getBundle(url); + if (bundle != null) { + return bundle; + } + + // We now support same version installed multiple times (avail since OSGi 4.3+). + // However we do not support overriding *system* bundles, ie anything already on the classpath. + // If we wanted to disable multiple versions, see comments below, and reference to FRAMEWORK_BSNVERSION_MULTIPLE above. + + // Felix already assumes the stream is pointing to a JAR + JarInputStream stream; + try { + stream = new JarInputStream(getUrlStream(url)); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + Manifest manifest = stream.getManifest(); + Streams.closeQuietly(stream); + if (manifest == null) { + throw new IllegalStateException("Missing manifest file in bundle or not a jar file."); + } + String versionedId = getVersionedId(manifest); + for (Bundle installedBundle : framework.getBundleContext().getBundles()) { + if (versionedId.equals(getVersionedId(installedBundle))) { + if (SYSTEM_BUNDLES.contains(versionedId)) { + LOG.debug("Already have system bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; not installing"); + // "System bundles" (ie things on the classpath) cannot be overridden + return installedBundle; + } else { + LOG.debug("Already have bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; but it is not a system bundle so proceeding"); + // Other bundles can be installed multiple times. To ignore multiples and continue to use the old one, + // just return the installedBundle as done just above for system bundles. + } + } + } + return null; + } + + private static InputStream getUrlStream(String url) { + return ResourceUtils.create(Osgis.class).getResourceFromUrl(url); + } + + public static boolean isExtensionBundle(Bundle bundle) { + String location = bundle.getLocation(); + return location != null && + EXTENSION_PROTOCOL.equals(Urls.getProtocol(location)); + } + + /** Takes a string which might be of the form "symbolic-name" or "symbolic-name:version" (or something else entirely) + * and returns a VersionedName. The versionedName.getVersion() will be null if if there was no version in the input + * (or returning {@link Maybe#absent()} if not valid, with a suitable error message). */ + public static Maybe<VersionedName> parseOsgiIdentifier(String symbolicNameOptionalWithVersion) { + if (Strings.isBlank(symbolicNameOptionalWithVersion)) + return Maybe.absent("OSGi identifier is blank"); + + String[] parts = symbolicNameOptionalWithVersion.split(":"); + if (parts.length>2) + return Maybe.absent("OSGi identifier has too many parts; max one ':' symbol"); + + Version v = null; + if (parts.length == 2) { + try { + v = Version.parseVersion(parts[1]); + } catch (IllegalArgumentException e) { + return Maybe.absent("OSGi identifier has invalid version string ("+e.getMessage()+")"); + } + } + + return Maybe.of(new VersionedName(parts[0], v)); + } + + /** + * The class is not used, staying for future reference. + * Remove after OSGi transition is completed. + */ + public static class ManifestHelper { + + private static ManifestParser parse; + private Manifest manifest; + private String source; + + private static final String WIRING_PACKAGE = PackageNamespace.PACKAGE_NAMESPACE; + + public static ManifestHelper forManifestContents(String contents) throws IOException, BundleException { + ManifestHelper result = forManifest(Streams.newInputStreamWithContents(contents)); + result.source = contents; + return result; + } + + public static ManifestHelper forManifest(URL url) throws IOException, BundleException { + InputStream in = null; + try { + in = url.openStream(); + return forManifest(in); + } finally { + if (in != null) in.close(); + } + } + + public static ManifestHelper forManifest(InputStream in) throws IOException, BundleException { + return forManifest(new Manifest(in)); + } + + public static ManifestHelper forManifest(Manifest manifest) throws BundleException { + ManifestHelper result = new ManifestHelper(); + result.manifest = manifest; + parse = new ManifestParser(null, null, null, new StringMap(manifest.getMainAttributes())); + return result; + } + + public String getSymbolicName() { + return parse.getSymbolicName(); + } + + public Version getVersion() { + return parse.getBundleVersion(); + } + + public String getSymbolicNameVersion() { + return getSymbolicName()+":"+getVersion(); + } + + public List<String> getExportedPackages() { + MutableList<String> result = MutableList.of(); + for (BundleCapability c: parse.getCapabilities()) { + if (WIRING_PACKAGE.equals(c.getNamespace())) { + result.add((String)c.getAttributes().get(WIRING_PACKAGE)); + } + } + return result; + } + + @Nullable public String getSource() { + return source; + } + + public Manifest getManifest() { + return manifest; + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java new file mode 100644 index 0000000..3eea5d9 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.ExecutionManager; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; + +import com.google.common.collect.Maps; + +public abstract class AbstractExecutionContext implements ExecutionContext { + + /** + * Submits the given runnable/callable/task for execution (in a separate thread); + * supported keys in the map include: tags (add'l tags to put on the resulting task), + * description (string), and others as described in the reference below + * + * @see ExecutionManager#submit(Map, Task) + */ + @Override + public Task<?> submit(Map<?,?> properties, Runnable runnable) { return submitInternal(properties, runnable); } + + /** @see #submit(Map, Runnable) */ + @Override + public Task<?> submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); } + + /** @see #submit(Map, Runnable) */ + @Override + public <T> Task<T> submit(Callable<T> callable) { return submitInternal(Maps.newLinkedHashMap(), callable); } + + /** @see #submit(Map, Runnable) */ + @Override + public <T> Task<T> submit(Map<?,?> properties, Callable<T> callable) { return submitInternal(properties, callable); } + + /** @see #submit(Map, Runnable) */ + @Override + public <T> Task<T> submit(TaskAdaptable<T> task) { return submitInternal(Maps.newLinkedHashMap(), task.asTask()); } + + /** @see #submit(Map, Runnable) */ + @Override + public <T> Task<T> submit(Map<?,?> properties, TaskAdaptable<T> task) { return submitInternal(properties, task.asTask()); } + + /** + * Provided for compatibility + * + * Submit is preferred if a handle on the resulting Task is desired (although a task can be passed in so this is not always necessary) + * + * @see #submit(Map, Runnable) + */ + public void execute(Runnable r) { submit(r); } + + /** does the work internally of submitting the task; note that the return value may be a wrapper task even if a task is passed in, + * if the execution context where the target should run is different (e.g. submitting an effector task cross-context) */ + protected abstract <T> Task<T> submitInternal(Map<?,?> properties, Object task); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java new file mode 100644 index 0000000..1d28b4e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.ExecutionManager; +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.api.management.entitlement.EntitlementContext; +import org.apache.brooklyn.core.management.entitlement.Entitlements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; +import brooklyn.entity.basic.EntityInternal; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * A means of executing tasks against an ExecutionManager with a given bucket/set of tags pre-defined + * (so that it can look like an {@link Executor} and also supply {@link ExecutorService#submit(Callable)} + */ +public class BasicExecutionContext extends AbstractExecutionContext { + + private static final Logger log = LoggerFactory.getLogger(BasicExecutionContext.class); + + static final ThreadLocal<BasicExecutionContext> perThreadExecutionContext = new ThreadLocal<BasicExecutionContext>(); + + public static BasicExecutionContext getCurrentExecutionContext() { return perThreadExecutionContext.get(); } + + final ExecutionManager executionManager; + final Set<Object> tags = new LinkedHashSet<Object>(); + + public BasicExecutionContext(ExecutionManager executionManager) { + this(Collections.emptyMap(), executionManager); + } + + /** + * Supported flags are {@code tag} and {@code tags} + * + * @see ExecutionManager#submit(Map, Task) + */ + public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) { + this.executionManager = executionManager; + + if (flags.get("tag") != null) tags.add(flags.remove("tag")); + if (flags.containsKey("tags")) tags.addAll((Collection<?>)flags.remove("tags")); + + // FIXME brooklyn-specific check, just for sanity + // the context tag should always be a non-proxy entity, because that is what is passed to effector tasks + // which may require access to internal methods + for (Object tag: tags) { + if (tag instanceof BrooklynTaskTags.WrappedEntity) { + if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) { + log.warn(""+this+" has entity proxy in "+tag); + } + } + } + } + + public ExecutionManager getExecutionManager() { + return executionManager; + } + + /** returns tasks started by this context (or tasks which have all the tags on this object) */ + public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags((Set<?>)tags); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) { + if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) + return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask()); + + Map properties = propertiesQ; + if (properties.get("tags")==null) properties.put("tags", new ArrayList()); + Collection taskTags = (Collection)properties.get("tags"); + + // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass; + // the issue is that we want to ensure that cross-entity calls switch execution contexts; + // previously it was all very messy how that was handled (and it didn't really handle it in many cases) + if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() ); + Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY); + + if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) { + // task is switching execution context boundaries + /* + * longer notes: + * you fall in to this block if the caller requests a target entity different to the current context + * (e.g. where entity X is invoking an effector on Y, it will start in X's context, + * but the effector should run in Y's context). + * + * if X is invoking an effector on himself in his own context, or a sensor or other task, it will not come in to this block. + */ + final ExecutionContext tc = ((EntityInternal)target).getExecutionContext(); + if (log.isDebugEnabled()) + log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")"); + + if (task instanceof Task<?>) { + final Task<T> t = (Task<T>)task; + if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() instanceof HasTaskChildren) || + !Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) { + // this task is switching execution context boundaries _and_ it is not a child and not yet queued, + // so wrap it in a task running in this context to keep a reference to the child + // (this matters when we are navigating in the GUI; without it we lose the reference to the child + // when browsing in the context of the parent) + return submit(Tasks.<T>builder().name("Cross-context execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() { + public T call() { + return DynamicTasks.get(t); + } + }).build()); + } else { + // if we are already tracked by parent, just submit it + return tc.submit(t); + } + } else { + // as above, but here we are definitely not a child (what we are submitting isn't even a task) + // (will only come here if properties defines tags including a target entity, which probably never happens) + submit(Tasks.<T>builder().name("Cross-context execution").dynamic(true).body(new Callable<T>() { + public T call() { + if (task instanceof Callable) { + return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() ).getUnchecked(); + } else if (task instanceof Runnable) { + return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked(); + } else { + throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null")); + } + } + }).build()); + } + } + + EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags); + if (entitlementContext==null) + entitlementContext = Entitlements.getEntitlementContext(); + if (entitlementContext!=null) { + taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext)); + } + + taskTags.addAll(tags); + + if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) + && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { + // tag as transient if submitter is transient, unless explicitly tagged as non-transient + taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); + } + + final Object startCallback = properties.get("newTaskStartCallback"); + properties.put("newTaskStartCallback", new Function<Object,Void>() { + public Void apply(Object it) { + registerPerThreadExecutionContext(); + if (startCallback!=null) ExecutionUtils.invoke(startCallback, it); + return null; + }}); + + final Object endCallback = properties.get("newTaskEndCallback"); + properties.put("newTaskEndCallback", new Function<Object,Void>() { + public Void apply(Object it) { + try { + if (endCallback!=null) ExecutionUtils.invoke(endCallback, it); + } finally { + clearPerThreadExecutionContext(); + } + return null; + }}); + + if (task instanceof Task) { + return executionManager.submit(properties, (Task)task); + } else if (task instanceof Callable) { + return executionManager.submit(properties, (Callable)task); + } else if (task instanceof Runnable) { + return (Task<T>) executionManager.submit(properties, (Runnable)task); + } else { + throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null")); + } + } + + private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); } + + private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); } + + @Override + public boolean isShutdown() { + return getExecutionManager().isShutdown(); + } + + @Override + public String toString() { + return super.toString()+"("+tags+")"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java new file mode 100644 index 0000000..f93abe1 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.brooklyn.api.management.ExecutionManager; +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.collections.MutableList; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.text.Identifiers; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CaseFormat; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Manages the execution of atomic tasks and scheduled (recurring) tasks, + * including setting tags and invoking callbacks. + */ +public class BasicExecutionManager implements ExecutionManager { + private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class); + + private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS); + + private static class PerThreadCurrentTaskHolder { + public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>(); + } + + public static ThreadLocal<Task<?>> getPerThreadCurrentTask() { + return PerThreadCurrentTaskHolder.perThreadCurrentTask; + } + + private final ThreadFactory threadFactory; + + private final ThreadFactory daemonThreadFactory; + + private final ExecutorService runner; + + private final ScheduledExecutorService delayedRunner; + + // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag, + // so the same task could be listed multiple times if it has multiple tags... + + //access to this field AND to members in this field is synchronized, + //to allow us to preserve order while guaranteeing thread-safe + //(but more testing is needed before we are completely sure it is thread-safe!) + //synch blocks are as finely grained as possible for efficiency; + //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty + private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>(); + + private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>(); + + private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>(); + + /** count of all tasks submitted, including finished */ + private final AtomicLong totalTaskCount = new AtomicLong(); + + /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */ + private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>(); + + /** tasks started but not yet finished */ + private final AtomicInteger activeTaskCount = new AtomicInteger(); + + private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>(); + + private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() { + protected String initialValue() { + // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked + log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current()); + return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8); + } + }; + + public BasicExecutionManager(String contextid) { + threadFactory = newThreadFactory(contextid); + daemonThreadFactory = new ThreadFactoryBuilder() + .setThreadFactory(threadFactory) + .setDaemon(true) + .build(); + + // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown! + runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + daemonThreadFactory); + + delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); + } + + private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.error("Uncaught exception in thread "+t.getName(), e); + } + } + + /** + * For use by overriders to use custom thread factory. + * But be extremely careful: called by constructor, so before sub-class' constructor will + * have been invoked! + */ + protected ThreadFactory newThreadFactory(String contextid) { + return new ThreadFactoryBuilder() + .setNameFormat("brooklyn-execmanager-"+contextid+"-%d") + .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation()) + .build(); + } + + public void shutdownNow() { + runner.shutdownNow(); + delayedRunner.shutdownNow(); + } + + public void addListener(ExecutionListener listener) { + listeners.add(listener); + } + + public void removeListener(ExecutionListener listener) { + listeners.remove(listener); + } + + /** + * Deletes the given tag, including all tasks using this tag. + * + * Useful, for example, if an entity is being expunged so that we don't keep holding + * a reference to it as a tag. + */ + public void deleteTag(Object tag) { + Set<Task<?>> tasks; + synchronized (tasksByTag) { + tasks = tasksByTag.remove(tag); + } + if (tasks != null) { + for (Task<?> task : tasks) { + deleteTask(task); + } + } + } + + public void deleteTask(Task<?> task) { + boolean removed = deleteTaskNonRecursive(task); + if (!removed) return; + + if (task instanceof HasTaskChildren) { + List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren()); + for (Task<?> child : children) { + deleteTask(child); + } + } + } + + protected boolean deleteTaskNonRecursive(Task<?> task) { + Set<?> tags = checkNotNull(task, "task").getTags(); + for (Object tag : tags) { + synchronized (tasksByTag) { + Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag); + if (tasks != null) { + tasks.remove(task); + if (tasks.isEmpty()) { + tasksByTag.remove(tag); + } + } + } + } + Task<?> removed = tasksById.remove(task.getId()); + incompleteTaskIds.remove(task.getId()); + if (removed!=null && removed.isSubmitted() && !removed.isDone()) { + log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?"); + } + return removed != null; + } + + public boolean isShutdown() { + return runner.isShutdown(); + } + + /** count of all tasks submitted */ + public long getTotalTasksSubmitted() { + return totalTaskCount.get(); + } + + /** count of tasks submitted but not ended */ + public long getNumIncompleteTasks() { + return incompleteTaskIds.size(); + } + + /** count of tasks started but not ended */ + public long getNumActiveTasks() { + return activeTaskCount.get(); + } + + /** count of tasks kept in memory, often including ended tasks */ + public long getNumInMemoryTasks() { + return tasksById.size(); + } + + private Set<Task<?>> tasksWithTagCreating(Object tag) { + Preconditions.checkNotNull(tag); + synchronized (tasksByTag) { + Set<Task<?>> result = tasksWithTagLiveOrNull(tag); + if (result==null) { + result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>()); + tasksByTag.put(tag, result); + } + return result; + } + } + + /** exposes live view, for internal use only */ + @Beta + public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) { + synchronized (tasksByTag) { + return tasksByTag.get(tag); + } + } + + @Override + public Task<?> getTask(String id) { + return tasksById.get(id); + } + + /** not on interface because potentially expensive */ + public List<Task<?>> getAllTasks() { + // not sure if synching makes any difference; have not observed CME's yet + // (and so far this is only called when a CME was caught on a previous operation) + synchronized (tasksById) { + return MutableList.copyOf(tasksById.values()); + } + } + + @Override + public Set<Task<?>> getTasksWithTag(Object tag) { + Set<Task<?>> result = tasksWithTagLiveOrNull(tag); + if (result==null) return Collections.emptySet(); + synchronized (result) { + return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result)); + } + } + + @Override + public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) { + Set<Task<?>> result = new LinkedHashSet<Task<?>>(); + Iterator<?> ti = tags.iterator(); + while (ti.hasNext()) { + Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next()); + if (tasksForTag!=null) { + synchronized (tasksForTag) { + result.addAll(tasksForTag); + } + } + } + return Collections.unmodifiableSet(result); + } + + /** only works with at least one tag; returns empty if no tags */ + @Override + public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) { + //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!) + //by first looking for the least-used tag, getting those tasks, and then for each of those tasks + //checking whether it contains the other tags (looking for second-least used, then third-least used, etc) + Set<Task<?>> result = new LinkedHashSet<Task<?>>(); + boolean first = true; + Iterator<?> ti = tags.iterator(); + while (ti.hasNext()) { + Object tag = ti.next(); + if (first) { + first = false; + result.addAll(getTasksWithTag(tag)); + } else { + result.retainAll(getTasksWithTag(tag)); + } + } + return Collections.unmodifiableSet(result); + } + + /** live view of all tasks, for internal use only */ + @Beta + public Collection<Task<?>> allTasksLive() { return tasksById.values(); } + + public Set<Object> getTaskTags() { + synchronized (tasksByTag) { + return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); + } + } + + public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); } + public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); } + + public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); } + public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); } + + public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); } + public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) { + if (!(task instanceof Task)) + task = task.asTask(); + synchronized (task) { + if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task; + return submitNewTask(flags, (Task<T>) task); + } + } + + public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); } + public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) { + synchronized (task) { + if (((TaskInternal<?>)task).getInternalFuture()!=null) return task; + return submitNewTask(flags, task); + } + } + + protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) { + tasksById.put(task.getId(), task); + totalTaskCount.incrementAndGet(); + + beforeSubmitScheduledTaskAllIterations(flags, task); + + return submitSubsequentScheduledTask(flags, task); + } + + @SuppressWarnings("unchecked") + protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) { + if (!task.isDone()) { + task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), + task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); + } else { + afterEndScheduledTaskAllIterations(flags, task); + } + return task; + } + + protected class ScheduledTaskCallable implements Callable<Object> { + public ScheduledTask task; + public Map<?,?> flags; + + public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) { + this.task = task; + this.flags = flags; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public Object call() { + if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis(); + TaskInternal<?> taskScheduled = null; + try { + beforeStartScheduledTaskSubmissionIteration(flags, task); + taskScheduled = (TaskInternal<?>) task.newTask(); + taskScheduled.setSubmittedByTask(task); + final Callable<?> oldJob = taskScheduled.getJob(); + final TaskInternal<?> taskScheduledF = taskScheduled; + taskScheduled.setJob(new Callable() { public Object call() { + boolean resubmitted = false; + task.recentRun = taskScheduledF; + try { + synchronized (task) { + task.notifyAll(); + } + Object result; + try { + result = oldJob.call(); + } catch (Exception e) { + if (!Tasks.isInterrupted()) { + log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution", e); + } else { + log.debug("Interrupted executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution: "+e); + } + throw Exceptions.propagate(e); + } + task.runCount++; + if (task.period!=null && !task.isCancelled()) { + task.delay = task.period; + submitSubsequentScheduledTask(flags, task); + resubmitted = true; + } + return result; + } finally { + // do in finally block in case we were interrupted + if (!resubmitted) + afterEndScheduledTaskAllIterations(flags, task); + } + }}); + task.nextRun = taskScheduled; + BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); + if (ec!=null) return ec.submit(taskScheduled); + else return submit(taskScheduled); + } finally { + afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled); + } + } + + @Override + public String toString() { + return "ScheduledTaskCallable["+task+","+flags+"]"; + } + } + + private final class SubmissionCallable<T> implements Callable<T> { + private final Map<?, ?> flags; + private final Task<T> task; + + private SubmissionCallable(Map<?, ?> flags, Task<T> task) { + this.flags = flags; + this.task = task; + } + + public T call() { + try { + T result = null; + Throwable error = null; + String oldThreadName = Thread.currentThread().getName(); + try { + if (RENAME_THREADS) { + String newThreadName = oldThreadName+"-"+task.getDisplayName()+ + "["+task.getId().substring(0, 8)+"]"; + Thread.currentThread().setName(newThreadName); + } + beforeStartAtomicTask(flags, task); + if (!task.isCancelled()) { + result = ((TaskInternal<T>)task).getJob().call(); + } else throw new CancellationException(); + } catch(Throwable e) { + error = e; + } finally { + if (RENAME_THREADS) { + Thread.currentThread().setName(oldThreadName); + } + afterEndAtomicTask(flags, task); + } + if (error!=null) { + /* we throw, after logging debug. + * the throw means the error is available for task submitters to monitor. + * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. + * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) + */ + if (log.isDebugEnabled()) { + // debug only here, because most submitters will handle failures + log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error); + if (log.isTraceEnabled()) + log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error); + } + throw Exceptions.propagate(error); + } + return result; + } finally { + ((TaskInternal<?>)task).runListeners(); + } + } + + @Override + public String toString() { + return "BEM.call("+task+","+flags+")"; + } + } + + private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> { + private final Task<T> task; + + private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) { + super(delegate, list); + this.task = task; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = false; + if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning); + result |= super.cancel(mayInterruptIfRunning); + ((TaskInternal<?>)task).runListeners(); + return result; + } + } + + private final class SubmissionListenerToCallOtherListeners<T> implements Runnable { + private final Task<T> task; + + private SubmissionListenerToCallOtherListeners(Task<T> task) { + this.task = task; + } + + @Override + public void run() { + try { + ((TaskInternal<?>)task).runListeners(); + } catch (Exception e) { + log.warn("Error running task listeners for task "+task+" done", e); + } + + for (ExecutionListener listener : listeners) { + try { + listener.onTaskDone(task); + } catch (Exception e) { + log.warn("Error running execution listener "+listener+" of task "+task+" done", e); + } + } + } + } + + @SuppressWarnings("unchecked") + protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) { + if (task instanceof ScheduledTask) + return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task); + + tasksById.put(task.getId(), task); + totalTaskCount.incrementAndGet(); + + beforeSubmitAtomicTask(flags, task); + + if (((TaskInternal<T>)task).getJob() == null) + throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied."); + + Callable<T> job = new SubmissionCallable<T>(flags, task); + + // If there's a scheduler then use that; otherwise execute it directly + Set<TaskScheduler> schedulers = null; + for (Object tago: task.getTags()) { + TaskScheduler scheduler = getTaskSchedulerForTag(tago); + if (scheduler!=null) { + if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2); + schedulers.add(scheduler); + } + } + Future<T> future; + if (schedulers!=null && !schedulers.isEmpty()) { + if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers); + future = schedulers.iterator().next().submit(job); + } else { + future = runner.submit(job); + } + // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel + // (and we make sure the same ExecutionList is used in the future as in the task) + ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task); + // doesn't matter whether the listener is added to the listenableFuture or the task, + // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown + // (avoid a bunch of ugly warnings in tests which start and stop things a lot!) + // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement] + ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner); + + ((TaskInternal<T>)task).initInternalFuture(listenableFuture); + + return task; + } + + protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { + internalBeforeSubmit(flags, task); + } + protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) { + internalBeforeSubmit(flags, task); + } + /** invoked when a task is submitted */ + protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) { + incompleteTaskIds.put(task.getId(), task.getId()); + + Task<?> currentTask = Tasks.current(); + if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask); + ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis()); + + if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag")); + if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags")); + + for (Object tag: ((TaskInternal<?>)task).getTags()) { + tasksWithTagCreating(tag).add(task); + } + } + + protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) { + internalBeforeStart(flags, task); + } + protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) { + internalBeforeStart(flags, task); + } + + /** invoked in a task's thread when a task is starting to run (may be some time after submitted), + * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ + protected void internalBeforeStart(Map<?,?> flags, Task<?> task) { + activeTaskCount.incrementAndGet(); + + //set thread _before_ start time, so we won't get a null thread when there is a start-time + if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task); + if (!task.isCancelled()) { + Thread thread = Thread.currentThread(); + ((TaskInternal<?>)task).setThread(thread); + if (RENAME_THREADS) { + threadOriginalName.set(thread.getName()); + String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8); + thread.setName(newThreadName); + } + PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); + ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); + } + ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task); + } + + /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ + protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { + internalAfterEnd(flags, task, false, true); + } + /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)}, + * with a per-iteration task generated by the surrounding scheduled task */ + protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) { + internalAfterEnd(flags, scheduledTask, true, false); + } + /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, + * and normally (if not interrupted prior to start) + * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */ + protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) { + internalAfterEnd(flags, task, true, true); + } + /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, + * and, for atomic tasks and scheduled-task submission iterations where + * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */ + protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) { + if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task); + if (startedInThisThread) { + activeTaskCount.decrementAndGet(); + } + if (isEndingAllIterations) { + incompleteTaskIds.remove(task.getId()); + ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task); + ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis()); + } + + if (startedInThisThread) { + PerThreadCurrentTaskHolder.perThreadCurrentTask.remove(); + //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time + if (RENAME_THREADS && startedInThisThread) { + Thread thread = task.getThread(); + if (thread==null) { + log.warn("BasicTask.afterEnd invoked without corresponding beforeStart"); + } else { + thread.setName(threadOriginalName.get()); + threadOriginalName.remove(); + } + } + ((TaskInternal<?>)task).setThread(null); + } + synchronized (task) { task.notifyAll(); } + } + + public TaskScheduler getTaskSchedulerForTag(Object tag) { + return schedulerByTag.get(tag); + } + + public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) { + synchronized (schedulerByTag) { + TaskScheduler old = getTaskSchedulerForTag(tag); + if (old!=null) { + if (scheduler.isAssignableFrom(old.getClass())) { + /* already have such an instance */ + return; + } + //might support multiple in future... + throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")"); + } + try { + TaskScheduler schedulerI = scheduler.newInstance(); + // allow scheduler to have a nice name, for logging etc + if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag); + setTaskSchedulerForTag(tag, schedulerI); + } catch (InstantiationException e) { + throw Exceptions.propagate(e); + } catch (IllegalAccessException e) { + throw Exceptions.propagate(e); + } + } + } + + /** + * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag. + * + * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class) + * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two. + * + * @see #setTaskSchedulerForTag(Object, Class) + */ + public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) { + synchronized (schedulerByTag) { + scheduler.injectExecutor(runner); + + Object old = schedulerByTag.put(tag, scheduler); + if (old!=null && old!=scheduler) { + //might support multiple in future... + throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")"); + } + } + } + + /** + * Forgets that any scheduler was associated with a tag. + * + * @see #setTaskSchedulerForTag(Object, TaskScheduler) + * @see #setTaskSchedulerForTag(Object, Class) + */ + public boolean clearTaskSchedulerForTag(Object tag) { + synchronized (schedulerByTag) { + Object old = schedulerByTag.remove(tag); + return (old!=null); + } + } + + @VisibleForTesting + public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() { + return schedulerByTag; + } + +}
