http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/osgi/Osgis.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/osgi/Osgis.java b/core/src/main/java/brooklyn/util/osgi/Osgis.java deleted file mode 100644 index 3bf972e..0000000 --- a/core/src/main/java/brooklyn/util/osgi/Osgis.java +++ /dev/null @@ -1,719 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.osgi; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.jar.Attributes; -import java.util.jar.JarInputStream; -import java.util.jar.JarOutputStream; -import java.util.jar.Manifest; - -import javax.annotation.Nullable; - -import org.apache.felix.framework.FrameworkFactory; -import org.apache.felix.framework.util.StringMap; -import org.apache.felix.framework.util.manifestparser.ManifestParser; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.BundleException; -import org.osgi.framework.Constants; -import org.osgi.framework.Version; -import org.osgi.framework.launch.Framework; -import org.osgi.framework.namespace.PackageNamespace; -import org.osgi.framework.wiring.BundleCapability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; - -import brooklyn.util.ResourceUtils; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.ReferenceWithError; -import brooklyn.util.guava.Maybe; -import brooklyn.util.net.Urls; -import brooklyn.util.os.Os; -import brooklyn.util.stream.Streams; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Stopwatch; - -/** - * utilities for working with osgi. - * osgi support is in early days (June 2014) so this class is beta, subject to change, - * particularly in how framework is started and bundles installed. - * - * @since 0.7.0 */ -@Beta -public class Osgis { - private static final Logger LOG = LoggerFactory.getLogger(Osgis.class); - - private static final String EXTENSION_PROTOCOL = "system"; - private static final String MANIFEST_PATH = "META-INF/MANIFEST.MF"; - private static final Set<String> SYSTEM_BUNDLES = MutableSet.of(); - - public static class VersionedName { - private final String symbolicName; - private final Version version; - public VersionedName(Bundle b) { - this.symbolicName = b.getSymbolicName(); - this.version = b.getVersion(); - } - public VersionedName(String symbolicName, Version version) { - this.symbolicName = symbolicName; - this.version = version; - } - @Override public String toString() { - return symbolicName + ":" + Strings.toString(version); - } - public boolean equals(String sn, String v) { - return symbolicName.equals(sn) && (version == null && v == null || version != null && version.toString().equals(v)); - } - public boolean equals(String sn, Version v) { - return symbolicName.equals(sn) && (version == null && v == null || version != null && version.equals(v)); - } - public String getSymbolicName() { - return symbolicName; - } - public Version getVersion() { - return version; - } - @Override - public int hashCode() { - return Objects.hashCode(symbolicName, version); - } - @Override - public boolean equals(Object other) { - if (!(other instanceof VersionedName)) return false; - VersionedName o = (VersionedName) other; - return Objects.equal(symbolicName, o.symbolicName) && Objects.equal(version, o.version); - } - } - - public static class BundleFinder { - protected final Framework framework; - protected String symbolicName; - protected String version; - protected String url; - protected boolean urlMandatory = false; - protected final List<Predicate<? super Bundle>> predicates = MutableList.of(); - - protected BundleFinder(Framework framework) { - this.framework = framework; - } - - public BundleFinder symbolicName(String symbolicName) { - this.symbolicName = symbolicName; - return this; - } - - public BundleFinder version(String version) { - this.version = version; - return this; - } - - public BundleFinder id(String symbolicNameOptionallyWithVersion) { - if (Strings.isBlank(symbolicNameOptionallyWithVersion)) - return this; - - Maybe<VersionedName> nv = parseOsgiIdentifier(symbolicNameOptionallyWithVersion); - if (nv.isAbsent()) - throw new IllegalArgumentException("Cannot parse symbolic-name:version string '"+symbolicNameOptionallyWithVersion+"'"); - - return id(nv.get()); - } - - private BundleFinder id(VersionedName nv) { - symbolicName(nv.getSymbolicName()); - if (nv.getVersion() != null) { - version(nv.getVersion().toString()); - } - return this; - } - - public BundleFinder bundle(CatalogBundle bundle) { - if (bundle.isNamed()) { - symbolicName(bundle.getSymbolicName()); - version(bundle.getVersion()); - } - if (bundle.getUrl() != null) { - requiringFromUrl(bundle.getUrl()); - } - return this; - } - - /** Looks for a bundle matching the given URL; - * unlike {@link #requiringFromUrl(String)} however, if the URL does not match any bundles - * it will return other matching bundles <i>if</if> a {@link #symbolicName(String)} is specified. - */ - public BundleFinder preferringFromUrl(String url) { - this.url = url; - urlMandatory = false; - return this; - } - - /** Requires the bundle to have the given URL set as its location. */ - public BundleFinder requiringFromUrl(String url) { - this.url = url; - urlMandatory = true; - return this; - } - - /** Finds the best matching bundle. */ - public Maybe<Bundle> find() { - return findOne(false); - } - - /** Finds the matching bundle, requiring it to be unique. */ - public Maybe<Bundle> findUnique() { - return findOne(true); - } - - protected Maybe<Bundle> findOne(boolean requireExactlyOne) { - if (symbolicName==null && url==null) - throw new IllegalStateException(this+" must be given either a symbolic name or a URL"); - - List<Bundle> result = findAll(); - if (result.isEmpty()) - return Maybe.absent("No bundle matching "+getConstraintsDescription()); - if (requireExactlyOne && result.size()>1) - return Maybe.absent("Multiple bundles ("+result.size()+") matching "+getConstraintsDescription()); - - return Maybe.of(result.get(0)); - } - - /** Finds all matching bundles, in decreasing version order. */ - public List<Bundle> findAll() { - boolean urlMatched = false; - List<Bundle> result = MutableList.of(); - for (Bundle b: framework.getBundleContext().getBundles()) { - if (symbolicName!=null && !symbolicName.equals(b.getSymbolicName())) continue; - if (version!=null && !Version.parseVersion(version).equals(b.getVersion())) continue; - for (Predicate<? super Bundle> predicate: predicates) { - if (!predicate.apply(b)) continue; - } - - // check url last, because if it isn't mandatory we should only clear if we find a url - // for which the other items also match - if (url!=null) { - boolean matches = url.equals(b.getLocation()); - if (urlMandatory) { - if (!matches) continue; - else urlMatched = true; - } else { - if (matches) { - if (!urlMatched) { - result.clear(); - urlMatched = true; - } - } else { - if (urlMatched) { - // can't use this bundle as we have previously found a preferred bundle, with a matching url - continue; - } - } - } - } - - result.add(b); - } - - if (symbolicName==null && url!=null && !urlMatched) { - // if we only "preferred" the url, and we did not match it, and we did not have a symbolic name, - // then clear the results list! - result.clear(); - } - - Collections.sort(result, new Comparator<Bundle>() { - @Override - public int compare(Bundle o1, Bundle o2) { - return o2.getVersion().compareTo(o1.getVersion()); - } - }); - - return result; - } - - public String getConstraintsDescription() { - List<String> parts = MutableList.of(); - if (symbolicName!=null) parts.add("symbolicName="+symbolicName); - if (version!=null) parts.add("version="+version); - if (url!=null) - parts.add("url["+(urlMandatory ? "required" : "preferred")+"]="+url); - if (!predicates.isEmpty()) - parts.add("predicates="+predicates); - return Joiner.on(";").join(parts); - } - - public String toString() { - return getClass().getCanonicalName()+"["+getConstraintsDescription()+"]"; - } - - public BundleFinder version(final Predicate<Version> versionPredicate) { - return satisfying(new Predicate<Bundle>() { - @Override - public boolean apply(Bundle input) { - return versionPredicate.apply(input.getVersion()); - } - }); - } - - public BundleFinder satisfying(Predicate<? super Bundle> predicate) { - predicates.add(predicate); - return this; - } - } - - public static BundleFinder bundleFinder(Framework framework) { - return new BundleFinder(framework); - } - - /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated - public static List<Bundle> getBundlesByName(Framework framework, String symbolicName, Predicate<Version> versionMatcher) { - return bundleFinder(framework).symbolicName(symbolicName).version(versionMatcher).findAll(); - } - - /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated - public static List<Bundle> getBundlesByName(Framework framework, String symbolicName) { - return bundleFinder(framework).symbolicName(symbolicName).findAll(); - } - - /** - * Tries to find a bundle in the given framework with name matching either `name' or `name:version'. - * @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated - public static Maybe<Bundle> getBundle(Framework framework, String symbolicNameOptionallyWithVersion) { - return bundleFinder(framework).id(symbolicNameOptionallyWithVersion).find(); - } - - /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated - public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, String version) { - return bundleFinder(framework).symbolicName(symbolicName).version(version).find(); - } - - /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated - public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, Version version) { - return bundleFinder(framework).symbolicName(symbolicName).version(Predicates.equalTo(version)).findUnique(); - } - - // -------- creating - - /* - * loading framework factory and starting framework based on: - * http://felix.apache.org/documentation/subprojects/apache-felix-framework/apache-felix-framework-launching-and-embedding.html - */ - - public static FrameworkFactory newFrameworkFactory() { - URL url = Osgis.class.getClassLoader().getResource( - "META-INF/services/org.osgi.framework.launch.FrameworkFactory"); - if (url != null) { - try { - BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream())); - try { - for (String s = br.readLine(); s != null; s = br.readLine()) { - s = s.trim(); - // load the first non-empty, non-commented line - if ((s.length() > 0) && (s.charAt(0) != '#')) { - return (FrameworkFactory) Class.forName(s).newInstance(); - } - } - } finally { - if (br != null) br.close(); - } - } catch (Exception e) { - // class creation exceptions are not interesting to caller... - throw Exceptions.propagate(e); - } - } - throw new IllegalStateException("Could not find framework factory."); - } - - public static Framework newFrameworkStarted(String felixCacheDir, boolean clean, Map<?,?> extraStartupConfig) { - Map<Object,Object> cfg = MutableMap.copyOf(extraStartupConfig); - if (clean) cfg.put(Constants.FRAMEWORK_STORAGE_CLEAN, "onFirstInit"); - if (felixCacheDir!=null) cfg.put(Constants.FRAMEWORK_STORAGE, felixCacheDir); - cfg.put(Constants.FRAMEWORK_BSNVERSION, Constants.FRAMEWORK_BSNVERSION_MULTIPLE); - FrameworkFactory factory = newFrameworkFactory(); - - Stopwatch timer = Stopwatch.createStarted(); - Framework framework = factory.newFramework(cfg); - try { - framework.init(); - installBootBundles(framework); - framework.start(); - } catch (Exception e) { - // framework bundle start exceptions are not interesting to caller... - throw Exceptions.propagate(e); - } - LOG.debug("System bundles are: "+SYSTEM_BUNDLES); - LOG.debug("OSGi framework started in " + Duration.of(timer)); - return framework; - } - - private static void installBootBundles(Framework framework) { - Stopwatch timer = Stopwatch.createStarted(); - LOG.debug("Installing OSGi boot bundles from "+Osgis.class.getClassLoader()+"..."); - Enumeration<URL> resources; - try { - resources = Osgis.class.getClassLoader().getResources(MANIFEST_PATH); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - BundleContext bundleContext = framework.getBundleContext(); - Map<String, Bundle> installedBundles = getInstalledBundlesById(bundleContext); - while(resources.hasMoreElements()) { - URL url = resources.nextElement(); - ReferenceWithError<?> installResult = installExtensionBundle(bundleContext, url, installedBundles, getVersionedId(framework)); - if (installResult.hasError() && !installResult.masksErrorIfPresent()) { - // it's reported as a critical error, so warn here - LOG.warn("Unable to install manifest from "+url+": "+installResult.getError(), installResult.getError()); - } else { - Object result = installResult.getWithoutError(); - if (result instanceof Bundle) { - String v = getVersionedId( (Bundle)result ); - SYSTEM_BUNDLES.add(v); - if (installResult.hasError()) { - LOG.debug(installResult.getError().getMessage()+(result!=null ? " ("+result+"/"+v+")" : "")); - } else { - LOG.debug("Installed "+v+" from "+url); - } - } else if (installResult.hasError()) { - LOG.debug(installResult.getError().getMessage()); - } - } - } - LOG.debug("Installed OSGi boot bundles in "+Time.makeTimeStringRounded(timer)+": "+Arrays.asList(framework.getBundleContext().getBundles())); - } - - private static Map<String, Bundle> getInstalledBundlesById(BundleContext bundleContext) { - Map<String, Bundle> installedBundles = new HashMap<String, Bundle>(); - Bundle[] bundles = bundleContext.getBundles(); - for (Bundle b : bundles) { - installedBundles.put(getVersionedId(b), b); - } - return installedBundles; - } - - /** Wraps the bundle if successful or already installed, wraps TRUE if it's the system entry, - * wraps null if the bundle is already installed from somewhere else; - * in all these cases <i>masking</i> an explanatory error if already installed or it's the system entry. - * <p> - * Returns an instance wrapping null and <i>throwing</i> an error if the bundle could not be installed. - */ - private static ReferenceWithError<?> installExtensionBundle(BundleContext bundleContext, URL manifestUrl, Map<String, Bundle> installedBundles, String frameworkVersionedId) { - //ignore http://felix.extensions:9/ system entry - if("felix.extensions".equals(manifestUrl.getHost())) - return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Skipping install of internal extension bundle from "+manifestUrl)); - - try { - Manifest manifest = readManifest(manifestUrl); - if (!isValidBundle(manifest)) - return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Resource at "+manifestUrl+" is not an OSGi bundle: no valid manifest")); - - String versionedId = getVersionedId(manifest); - URL bundleUrl = ResourceUtils.getContainerUrl(manifestUrl, MANIFEST_PATH); - - Bundle existingBundle = installedBundles.get(versionedId); - if (existingBundle != null) { - if (!bundleUrl.equals(existingBundle.getLocation()) && - //the framework bundle is always pre-installed, don't display duplicate info - !versionedId.equals(frameworkVersionedId)) { - return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Bundle "+versionedId+" (from manifest " + manifestUrl + ") is already installed, from " + existingBundle.getLocation())); - } - return ReferenceWithError.newInstanceMaskingError(existingBundle, new IllegalArgumentException("Bundle "+versionedId+" from manifest " + manifestUrl + " is already installed")); - } - - byte[] jar = buildExtensionBundle(manifest); - LOG.debug("Installing boot bundle " + bundleUrl); - //mark the bundle as extension so we can detect it later using the "system:" protocol - //(since we cannot access BundleImpl.isExtension) - Bundle newBundle = bundleContext.installBundle(EXTENSION_PROTOCOL + ":" + bundleUrl.toString(), new ByteArrayInputStream(jar)); - installedBundles.put(versionedId, newBundle); - return ReferenceWithError.newInstanceWithoutError(newBundle); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - return ReferenceWithError.newInstanceThrowingError(null, - new IllegalStateException("Problem installing extension bundle " + manifestUrl + ": "+e, e)); - } - } - - private static Manifest readManifest(URL manifestUrl) throws IOException { - Manifest manifest; - InputStream in = null; - try { - in = manifestUrl.openStream(); - manifest = new Manifest(in); - } finally { - if (in != null) { - try {in.close();} - catch (Exception e) {}; - } - } - return manifest; - } - - private static byte[] buildExtensionBundle(Manifest manifest) throws IOException { - Attributes atts = manifest.getMainAttributes(); - - //the following properties are invalid in extension bundles - atts.remove(new Attributes.Name(Constants.IMPORT_PACKAGE)); - atts.remove(new Attributes.Name(Constants.REQUIRE_BUNDLE)); - atts.remove(new Attributes.Name(Constants.BUNDLE_NATIVECODE)); - atts.remove(new Attributes.Name(Constants.DYNAMICIMPORT_PACKAGE)); - atts.remove(new Attributes.Name(Constants.BUNDLE_ACTIVATOR)); - - //mark as extension bundle - atts.putValue(Constants.FRAGMENT_HOST, "system.bundle; extension:=framework"); - - //create the jar containing the manifest - ByteArrayOutputStream jar = new ByteArrayOutputStream(); - JarOutputStream out = new JarOutputStream(jar, manifest); - out.close(); - return jar.toByteArray(); - } - - private static boolean isValidBundle(Manifest manifest) { - Attributes atts = manifest.getMainAttributes(); - return atts.containsKey(new Attributes.Name(Constants.BUNDLE_MANIFESTVERSION)); - } - - private static String getVersionedId(Bundle b) { - return b.getSymbolicName() + ":" + b.getVersion(); - } - - private static String getVersionedId(Manifest manifest) { - Attributes atts = manifest.getMainAttributes(); - return atts.getValue(Constants.BUNDLE_SYMBOLICNAME) + ":" + - atts.getValue(Constants.BUNDLE_VERSION); - } - - /** - * Installs a bundle from the given URL, doing a check if already installed, and - * using the {@link ResourceUtils} loader for this project (brooklyn core) - */ - public static Bundle install(Framework framework, String url) throws BundleException { - boolean isLocal = isLocalUrl(url); - String localUrl = url; - if (!isLocal) { - localUrl = cacheFile(url); - } - - try { - Bundle bundle = getInstalledBundle(framework, localUrl); - if (bundle != null) { - return bundle; - } - - // use our URL resolution so we get classpath items - LOG.debug("Installing bundle into {} from url: {}", framework, url); - InputStream stream = getUrlStream(localUrl); - Bundle installedBundle = framework.getBundleContext().installBundle(url, stream); - - return installedBundle; - } finally { - if (!isLocal) { - try { - new File(new URI(localUrl)).delete(); - } catch (URISyntaxException e) { - throw Exceptions.propagate(e); - } - } - } - } - - private static String cacheFile(String url) { - InputStream in = getUrlStream(url); - File cache = Os.writeToTempFile(in, "bundle-cache", "jar"); - return cache.toURI().toString(); - } - - private static boolean isLocalUrl(String url) { - String protocol = Urls.getProtocol(url); - return "file".equals(protocol) || - "classpath".equals(protocol) || - "jar".equals(protocol); - } - - private static Bundle getInstalledBundle(Framework framework, String url) { - Bundle bundle = framework.getBundleContext().getBundle(url); - if (bundle != null) { - return bundle; - } - - // We now support same version installed multiple times (avail since OSGi 4.3+). - // However we do not support overriding *system* bundles, ie anything already on the classpath. - // If we wanted to disable multiple versions, see comments below, and reference to FRAMEWORK_BSNVERSION_MULTIPLE above. - - // Felix already assumes the stream is pointing to a JAR - JarInputStream stream; - try { - stream = new JarInputStream(getUrlStream(url)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - Manifest manifest = stream.getManifest(); - Streams.closeQuietly(stream); - if (manifest == null) { - throw new IllegalStateException("Missing manifest file in bundle or not a jar file."); - } - String versionedId = getVersionedId(manifest); - for (Bundle installedBundle : framework.getBundleContext().getBundles()) { - if (versionedId.equals(getVersionedId(installedBundle))) { - if (SYSTEM_BUNDLES.contains(versionedId)) { - LOG.debug("Already have system bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; not installing"); - // "System bundles" (ie things on the classpath) cannot be overridden - return installedBundle; - } else { - LOG.debug("Already have bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; but it is not a system bundle so proceeding"); - // Other bundles can be installed multiple times. To ignore multiples and continue to use the old one, - // just return the installedBundle as done just above for system bundles. - } - } - } - return null; - } - - private static InputStream getUrlStream(String url) { - return ResourceUtils.create(Osgis.class).getResourceFromUrl(url); - } - - public static boolean isExtensionBundle(Bundle bundle) { - String location = bundle.getLocation(); - return location != null && - EXTENSION_PROTOCOL.equals(Urls.getProtocol(location)); - } - - /** Takes a string which might be of the form "symbolic-name" or "symbolic-name:version" (or something else entirely) - * and returns a VersionedName. The versionedName.getVersion() will be null if if there was no version in the input - * (or returning {@link Maybe#absent()} if not valid, with a suitable error message). */ - public static Maybe<VersionedName> parseOsgiIdentifier(String symbolicNameOptionalWithVersion) { - if (Strings.isBlank(symbolicNameOptionalWithVersion)) - return Maybe.absent("OSGi identifier is blank"); - - String[] parts = symbolicNameOptionalWithVersion.split(":"); - if (parts.length>2) - return Maybe.absent("OSGi identifier has too many parts; max one ':' symbol"); - - Version v = null; - if (parts.length == 2) { - try { - v = Version.parseVersion(parts[1]); - } catch (IllegalArgumentException e) { - return Maybe.absent("OSGi identifier has invalid version string ("+e.getMessage()+")"); - } - } - - return Maybe.of(new VersionedName(parts[0], v)); - } - - /** - * The class is not used, staying for future reference. - * Remove after OSGi transition is completed. - */ - public static class ManifestHelper { - - private static ManifestParser parse; - private Manifest manifest; - private String source; - - private static final String WIRING_PACKAGE = PackageNamespace.PACKAGE_NAMESPACE; - - public static ManifestHelper forManifestContents(String contents) throws IOException, BundleException { - ManifestHelper result = forManifest(Streams.newInputStreamWithContents(contents)); - result.source = contents; - return result; - } - - public static ManifestHelper forManifest(URL url) throws IOException, BundleException { - InputStream in = null; - try { - in = url.openStream(); - return forManifest(in); - } finally { - if (in != null) in.close(); - } - } - - public static ManifestHelper forManifest(InputStream in) throws IOException, BundleException { - return forManifest(new Manifest(in)); - } - - public static ManifestHelper forManifest(Manifest manifest) throws BundleException { - ManifestHelper result = new ManifestHelper(); - result.manifest = manifest; - parse = new ManifestParser(null, null, null, new StringMap(manifest.getMainAttributes())); - return result; - } - - public String getSymbolicName() { - return parse.getSymbolicName(); - } - - public Version getVersion() { - return parse.getBundleVersion(); - } - - public String getSymbolicNameVersion() { - return getSymbolicName()+":"+getVersion(); - } - - public List<String> getExportedPackages() { - MutableList<String> result = MutableList.of(); - for (BundleCapability c: parse.getCapabilities()) { - if (WIRING_PACKAGE.equals(c.getNamespace())) { - result.add((String)c.getAttributes().get(WIRING_PACKAGE)); - } - } - return result; - } - - @Nullable public String getSource() { - return source; - } - - public Manifest getManifest() { - return manifest; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java b/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java deleted file mode 100644 index f3511d7..0000000 --- a/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Map; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.ExecutionManager; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; - -import com.google.common.collect.Maps; - -public abstract class AbstractExecutionContext implements ExecutionContext { - - /** - * Submits the given runnable/callable/task for execution (in a separate thread); - * supported keys in the map include: tags (add'l tags to put on the resulting task), - * description (string), and others as described in the reference below - * - * @see ExecutionManager#submit(Map, Task) - */ - @Override - public Task<?> submit(Map<?,?> properties, Runnable runnable) { return submitInternal(properties, runnable); } - - /** @see #submit(Map, Runnable) */ - @Override - public Task<?> submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); } - - /** @see #submit(Map, Runnable) */ - @Override - public <T> Task<T> submit(Callable<T> callable) { return submitInternal(Maps.newLinkedHashMap(), callable); } - - /** @see #submit(Map, Runnable) */ - @Override - public <T> Task<T> submit(Map<?,?> properties, Callable<T> callable) { return submitInternal(properties, callable); } - - /** @see #submit(Map, Runnable) */ - @Override - public <T> Task<T> submit(TaskAdaptable<T> task) { return submitInternal(Maps.newLinkedHashMap(), task.asTask()); } - - /** @see #submit(Map, Runnable) */ - @Override - public <T> Task<T> submit(Map<?,?> properties, TaskAdaptable<T> task) { return submitInternal(properties, task.asTask()); } - - /** - * Provided for compatibility - * - * Submit is preferred if a handle on the resulting Task is desired (although a task can be passed in so this is not always necessary) - * - * @see #submit(Map, Runnable) - */ - public void execute(Runnable r) { submit(r); } - - /** does the work internally of submitting the task; note that the return value may be a wrapper task even if a task is passed in, - * if the execution context where the target should run is different (e.g. submitting an effector task cross-context) */ - protected abstract <T> Task<T> submitInternal(Map<?,?> properties, Object task); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java b/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java deleted file mode 100644 index 8942a18..0000000 --- a/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.ExecutionManager; -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.api.management.entitlement.EntitlementContext; -import org.apache.brooklyn.core.management.entitlement.Entitlements; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; -import brooklyn.entity.basic.EntityInternal; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - -/** - * A means of executing tasks against an ExecutionManager with a given bucket/set of tags pre-defined - * (so that it can look like an {@link Executor} and also supply {@link ExecutorService#submit(Callable)} - */ -public class BasicExecutionContext extends AbstractExecutionContext { - - private static final Logger log = LoggerFactory.getLogger(BasicExecutionContext.class); - - static final ThreadLocal<BasicExecutionContext> perThreadExecutionContext = new ThreadLocal<BasicExecutionContext>(); - - public static BasicExecutionContext getCurrentExecutionContext() { return perThreadExecutionContext.get(); } - - final ExecutionManager executionManager; - final Set<Object> tags = new LinkedHashSet<Object>(); - - public BasicExecutionContext(ExecutionManager executionManager) { - this(Collections.emptyMap(), executionManager); - } - - /** - * Supported flags are {@code tag} and {@code tags} - * - * @see ExecutionManager#submit(Map, Task) - */ - public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) { - this.executionManager = executionManager; - - if (flags.get("tag") != null) tags.add(flags.remove("tag")); - if (flags.containsKey("tags")) tags.addAll((Collection<?>)flags.remove("tags")); - - // FIXME brooklyn-specific check, just for sanity - // the context tag should always be a non-proxy entity, because that is what is passed to effector tasks - // which may require access to internal methods - for (Object tag: tags) { - if (tag instanceof BrooklynTaskTags.WrappedEntity) { - if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) { - log.warn(""+this+" has entity proxy in "+tag); - } - } - } - } - - public ExecutionManager getExecutionManager() { - return executionManager; - } - - /** returns tasks started by this context (or tasks which have all the tags on this object) */ - public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags((Set<?>)tags); } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) { - if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) - return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask()); - - Map properties = propertiesQ; - if (properties.get("tags")==null) properties.put("tags", new ArrayList()); - Collection taskTags = (Collection)properties.get("tags"); - - // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass; - // the issue is that we want to ensure that cross-entity calls switch execution contexts; - // previously it was all very messy how that was handled (and it didn't really handle it in many cases) - if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() ); - Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY); - - if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) { - // task is switching execution context boundaries - /* - * longer notes: - * you fall in to this block if the caller requests a target entity different to the current context - * (e.g. where entity X is invoking an effector on Y, it will start in X's context, - * but the effector should run in Y's context). - * - * if X is invoking an effector on himself in his own context, or a sensor or other task, it will not come in to this block. - */ - final ExecutionContext tc = ((EntityInternal)target).getExecutionContext(); - if (log.isDebugEnabled()) - log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")"); - - if (task instanceof Task<?>) { - final Task<T> t = (Task<T>)task; - if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() instanceof HasTaskChildren) || - !Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) { - // this task is switching execution context boundaries _and_ it is not a child and not yet queued, - // so wrap it in a task running in this context to keep a reference to the child - // (this matters when we are navigating in the GUI; without it we lose the reference to the child - // when browsing in the context of the parent) - return submit(Tasks.<T>builder().name("Cross-context execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() { - public T call() { - return DynamicTasks.get(t); - } - }).build()); - } else { - // if we are already tracked by parent, just submit it - return tc.submit(t); - } - } else { - // as above, but here we are definitely not a child (what we are submitting isn't even a task) - // (will only come here if properties defines tags including a target entity, which probably never happens) - submit(Tasks.<T>builder().name("Cross-context execution").dynamic(true).body(new Callable<T>() { - public T call() { - if (task instanceof Callable) { - return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() ).getUnchecked(); - } else if (task instanceof Runnable) { - return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked(); - } else { - throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null")); - } - } - }).build()); - } - } - - EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags); - if (entitlementContext==null) - entitlementContext = Entitlements.getEntitlementContext(); - if (entitlementContext!=null) { - taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext)); - } - - taskTags.addAll(tags); - - if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) - && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { - // tag as transient if submitter is transient, unless explicitly tagged as non-transient - taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); - } - - final Object startCallback = properties.get("newTaskStartCallback"); - properties.put("newTaskStartCallback", new Function<Object,Void>() { - public Void apply(Object it) { - registerPerThreadExecutionContext(); - if (startCallback!=null) ExecutionUtils.invoke(startCallback, it); - return null; - }}); - - final Object endCallback = properties.get("newTaskEndCallback"); - properties.put("newTaskEndCallback", new Function<Object,Void>() { - public Void apply(Object it) { - try { - if (endCallback!=null) ExecutionUtils.invoke(endCallback, it); - } finally { - clearPerThreadExecutionContext(); - } - return null; - }}); - - if (task instanceof Task) { - return executionManager.submit(properties, (Task)task); - } else if (task instanceof Callable) { - return executionManager.submit(properties, (Callable)task); - } else if (task instanceof Runnable) { - return (Task<T>) executionManager.submit(properties, (Runnable)task); - } else { - throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null")); - } - } - - private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); } - - private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); } - - @Override - public boolean isShutdown() { - return getExecutionManager().isShutdown(); - } - - @Override - public String toString() { - return super.toString()+"("+tags+")"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java deleted file mode 100644 index 13d035b..0000000 --- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java +++ /dev/null @@ -1,755 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.management.ExecutionManager; -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.util.collections.MutableList; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.text.Identifiers; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.CaseFormat; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Manages the execution of atomic tasks and scheduled (recurring) tasks, - * including setting tags and invoking callbacks. - */ -public class BasicExecutionManager implements ExecutionManager { - private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class); - - private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS); - - private static class PerThreadCurrentTaskHolder { - public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>(); - } - - public static ThreadLocal<Task<?>> getPerThreadCurrentTask() { - return PerThreadCurrentTaskHolder.perThreadCurrentTask; - } - - private final ThreadFactory threadFactory; - - private final ThreadFactory daemonThreadFactory; - - private final ExecutorService runner; - - private final ScheduledExecutorService delayedRunner; - - // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag, - // so the same task could be listed multiple times if it has multiple tags... - - //access to this field AND to members in this field is synchronized, - //to allow us to preserve order while guaranteeing thread-safe - //(but more testing is needed before we are completely sure it is thread-safe!) - //synch blocks are as finely grained as possible for efficiency; - //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty - private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>(); - - private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>(); - - private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>(); - - /** count of all tasks submitted, including finished */ - private final AtomicLong totalTaskCount = new AtomicLong(); - - /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */ - private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>(); - - /** tasks started but not yet finished */ - private final AtomicInteger activeTaskCount = new AtomicInteger(); - - private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>(); - - private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() { - protected String initialValue() { - // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked - log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current()); - return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8); - } - }; - - public BasicExecutionManager(String contextid) { - threadFactory = newThreadFactory(contextid); - daemonThreadFactory = new ThreadFactoryBuilder() - .setThreadFactory(threadFactory) - .setDaemon(true) - .build(); - - // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown! - runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), - daemonThreadFactory); - - delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); - } - - private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { - @Override - public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread "+t.getName(), e); - } - } - - /** - * For use by overriders to use custom thread factory. - * But be extremely careful: called by constructor, so before sub-class' constructor will - * have been invoked! - */ - protected ThreadFactory newThreadFactory(String contextid) { - return new ThreadFactoryBuilder() - .setNameFormat("brooklyn-execmanager-"+contextid+"-%d") - .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation()) - .build(); - } - - public void shutdownNow() { - runner.shutdownNow(); - delayedRunner.shutdownNow(); - } - - public void addListener(ExecutionListener listener) { - listeners.add(listener); - } - - public void removeListener(ExecutionListener listener) { - listeners.remove(listener); - } - - /** - * Deletes the given tag, including all tasks using this tag. - * - * Useful, for example, if an entity is being expunged so that we don't keep holding - * a reference to it as a tag. - */ - public void deleteTag(Object tag) { - Set<Task<?>> tasks; - synchronized (tasksByTag) { - tasks = tasksByTag.remove(tag); - } - if (tasks != null) { - for (Task<?> task : tasks) { - deleteTask(task); - } - } - } - - public void deleteTask(Task<?> task) { - boolean removed = deleteTaskNonRecursive(task); - if (!removed) return; - - if (task instanceof HasTaskChildren) { - List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren()); - for (Task<?> child : children) { - deleteTask(child); - } - } - } - - protected boolean deleteTaskNonRecursive(Task<?> task) { - Set<?> tags = checkNotNull(task, "task").getTags(); - for (Object tag : tags) { - synchronized (tasksByTag) { - Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag); - if (tasks != null) { - tasks.remove(task); - if (tasks.isEmpty()) { - tasksByTag.remove(tag); - } - } - } - } - Task<?> removed = tasksById.remove(task.getId()); - incompleteTaskIds.remove(task.getId()); - if (removed!=null && removed.isSubmitted() && !removed.isDone()) { - log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?"); - } - return removed != null; - } - - public boolean isShutdown() { - return runner.isShutdown(); - } - - /** count of all tasks submitted */ - public long getTotalTasksSubmitted() { - return totalTaskCount.get(); - } - - /** count of tasks submitted but not ended */ - public long getNumIncompleteTasks() { - return incompleteTaskIds.size(); - } - - /** count of tasks started but not ended */ - public long getNumActiveTasks() { - return activeTaskCount.get(); - } - - /** count of tasks kept in memory, often including ended tasks */ - public long getNumInMemoryTasks() { - return tasksById.size(); - } - - private Set<Task<?>> tasksWithTagCreating(Object tag) { - Preconditions.checkNotNull(tag); - synchronized (tasksByTag) { - Set<Task<?>> result = tasksWithTagLiveOrNull(tag); - if (result==null) { - result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>()); - tasksByTag.put(tag, result); - } - return result; - } - } - - /** exposes live view, for internal use only */ - @Beta - public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) { - synchronized (tasksByTag) { - return tasksByTag.get(tag); - } - } - - @Override - public Task<?> getTask(String id) { - return tasksById.get(id); - } - - /** not on interface because potentially expensive */ - public List<Task<?>> getAllTasks() { - // not sure if synching makes any difference; have not observed CME's yet - // (and so far this is only called when a CME was caught on a previous operation) - synchronized (tasksById) { - return MutableList.copyOf(tasksById.values()); - } - } - - @Override - public Set<Task<?>> getTasksWithTag(Object tag) { - Set<Task<?>> result = tasksWithTagLiveOrNull(tag); - if (result==null) return Collections.emptySet(); - synchronized (result) { - return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result)); - } - } - - @Override - public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) { - Set<Task<?>> result = new LinkedHashSet<Task<?>>(); - Iterator<?> ti = tags.iterator(); - while (ti.hasNext()) { - Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next()); - if (tasksForTag!=null) { - synchronized (tasksForTag) { - result.addAll(tasksForTag); - } - } - } - return Collections.unmodifiableSet(result); - } - - /** only works with at least one tag; returns empty if no tags */ - @Override - public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) { - //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!) - //by first looking for the least-used tag, getting those tasks, and then for each of those tasks - //checking whether it contains the other tags (looking for second-least used, then third-least used, etc) - Set<Task<?>> result = new LinkedHashSet<Task<?>>(); - boolean first = true; - Iterator<?> ti = tags.iterator(); - while (ti.hasNext()) { - Object tag = ti.next(); - if (first) { - first = false; - result.addAll(getTasksWithTag(tag)); - } else { - result.retainAll(getTasksWithTag(tag)); - } - } - return Collections.unmodifiableSet(result); - } - - /** live view of all tasks, for internal use only */ - @Beta - public Collection<Task<?>> allTasksLive() { return tasksById.values(); } - - public Set<Object> getTaskTags() { - synchronized (tasksByTag) { - return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); - } - } - - public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); } - public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); } - - public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); } - public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); } - - public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); } - public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) { - if (!(task instanceof Task)) - task = task.asTask(); - synchronized (task) { - if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task; - return submitNewTask(flags, (Task<T>) task); - } - } - - public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); } - public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) { - synchronized (task) { - if (((TaskInternal<?>)task).getInternalFuture()!=null) return task; - return submitNewTask(flags, task); - } - } - - protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) { - tasksById.put(task.getId(), task); - totalTaskCount.incrementAndGet(); - - beforeSubmitScheduledTaskAllIterations(flags, task); - - return submitSubsequentScheduledTask(flags, task); - } - - @SuppressWarnings("unchecked") - protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) { - if (!task.isDone()) { - task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), - task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); - } else { - afterEndScheduledTaskAllIterations(flags, task); - } - return task; - } - - protected class ScheduledTaskCallable implements Callable<Object> { - public ScheduledTask task; - public Map<?,?> flags; - - public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) { - this.task = task; - this.flags = flags; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Object call() { - if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis(); - TaskInternal<?> taskScheduled = null; - try { - beforeStartScheduledTaskSubmissionIteration(flags, task); - taskScheduled = (TaskInternal<?>) task.newTask(); - taskScheduled.setSubmittedByTask(task); - final Callable<?> oldJob = taskScheduled.getJob(); - final TaskInternal<?> taskScheduledF = taskScheduled; - taskScheduled.setJob(new Callable() { public Object call() { - boolean resubmitted = false; - task.recentRun = taskScheduledF; - try { - synchronized (task) { - task.notifyAll(); - } - Object result; - try { - result = oldJob.call(); - } catch (Exception e) { - if (!Tasks.isInterrupted()) { - log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution", e); - } else { - log.debug("Interrupted executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution: "+e); - } - throw Exceptions.propagate(e); - } - task.runCount++; - if (task.period!=null && !task.isCancelled()) { - task.delay = task.period; - submitSubsequentScheduledTask(flags, task); - resubmitted = true; - } - return result; - } finally { - // do in finally block in case we were interrupted - if (!resubmitted) - afterEndScheduledTaskAllIterations(flags, task); - } - }}); - task.nextRun = taskScheduled; - BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); - if (ec!=null) return ec.submit(taskScheduled); - else return submit(taskScheduled); - } finally { - afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled); - } - } - - @Override - public String toString() { - return "ScheduledTaskCallable["+task+","+flags+"]"; - } - } - - private final class SubmissionCallable<T> implements Callable<T> { - private final Map<?, ?> flags; - private final Task<T> task; - - private SubmissionCallable(Map<?, ?> flags, Task<T> task) { - this.flags = flags; - this.task = task; - } - - public T call() { - try { - T result = null; - Throwable error = null; - String oldThreadName = Thread.currentThread().getName(); - try { - if (RENAME_THREADS) { - String newThreadName = oldThreadName+"-"+task.getDisplayName()+ - "["+task.getId().substring(0, 8)+"]"; - Thread.currentThread().setName(newThreadName); - } - beforeStartAtomicTask(flags, task); - if (!task.isCancelled()) { - result = ((TaskInternal<T>)task).getJob().call(); - } else throw new CancellationException(); - } catch(Throwable e) { - error = e; - } finally { - if (RENAME_THREADS) { - Thread.currentThread().setName(oldThreadName); - } - afterEndAtomicTask(flags, task); - } - if (error!=null) { - /* we throw, after logging debug. - * the throw means the error is available for task submitters to monitor. - * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. - * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) - */ - if (log.isDebugEnabled()) { - // debug only here, because most submitters will handle failures - log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error); - if (log.isTraceEnabled()) - log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error); - } - throw Exceptions.propagate(error); - } - return result; - } finally { - ((TaskInternal<?>)task).runListeners(); - } - } - - @Override - public String toString() { - return "BEM.call("+task+","+flags+")"; - } - } - - private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> { - private final Task<T> task; - - private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) { - super(delegate, list); - this.task = task; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = false; - if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning); - result |= super.cancel(mayInterruptIfRunning); - ((TaskInternal<?>)task).runListeners(); - return result; - } - } - - private final class SubmissionListenerToCallOtherListeners<T> implements Runnable { - private final Task<T> task; - - private SubmissionListenerToCallOtherListeners(Task<T> task) { - this.task = task; - } - - @Override - public void run() { - try { - ((TaskInternal<?>)task).runListeners(); - } catch (Exception e) { - log.warn("Error running task listeners for task "+task+" done", e); - } - - for (ExecutionListener listener : listeners) { - try { - listener.onTaskDone(task); - } catch (Exception e) { - log.warn("Error running execution listener "+listener+" of task "+task+" done", e); - } - } - } - } - - @SuppressWarnings("unchecked") - protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) { - if (task instanceof ScheduledTask) - return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task); - - tasksById.put(task.getId(), task); - totalTaskCount.incrementAndGet(); - - beforeSubmitAtomicTask(flags, task); - - if (((TaskInternal<T>)task).getJob() == null) - throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied."); - - Callable<T> job = new SubmissionCallable<T>(flags, task); - - // If there's a scheduler then use that; otherwise execute it directly - Set<TaskScheduler> schedulers = null; - for (Object tago: task.getTags()) { - TaskScheduler scheduler = getTaskSchedulerForTag(tago); - if (scheduler!=null) { - if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2); - schedulers.add(scheduler); - } - } - Future<T> future; - if (schedulers!=null && !schedulers.isEmpty()) { - if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers); - future = schedulers.iterator().next().submit(job); - } else { - future = runner.submit(job); - } - // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel - // (and we make sure the same ExecutionList is used in the future as in the task) - ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task); - // doesn't matter whether the listener is added to the listenableFuture or the task, - // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown - // (avoid a bunch of ugly warnings in tests which start and stop things a lot!) - // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement] - ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner); - - ((TaskInternal<T>)task).initInternalFuture(listenableFuture); - - return task; - } - - protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { - internalBeforeSubmit(flags, task); - } - protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) { - internalBeforeSubmit(flags, task); - } - /** invoked when a task is submitted */ - protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) { - incompleteTaskIds.put(task.getId(), task.getId()); - - Task<?> currentTask = Tasks.current(); - if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask); - ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis()); - - if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag")); - if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags")); - - for (Object tag: ((TaskInternal<?>)task).getTags()) { - tasksWithTagCreating(tag).add(task); - } - } - - protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) { - internalBeforeStart(flags, task); - } - protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) { - internalBeforeStart(flags, task); - } - - /** invoked in a task's thread when a task is starting to run (may be some time after submitted), - * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ - protected void internalBeforeStart(Map<?,?> flags, Task<?> task) { - activeTaskCount.incrementAndGet(); - - //set thread _before_ start time, so we won't get a null thread when there is a start-time - if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task); - if (!task.isCancelled()) { - Thread thread = Thread.currentThread(); - ((TaskInternal<?>)task).setThread(thread); - if (RENAME_THREADS) { - threadOriginalName.set(thread.getName()); - String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8); - thread.setName(newThreadName); - } - PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); - ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); - } - ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task); - } - - /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ - protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { - internalAfterEnd(flags, task, false, true); - } - /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)}, - * with a per-iteration task generated by the surrounding scheduled task */ - protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) { - internalAfterEnd(flags, scheduledTask, true, false); - } - /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, - * and normally (if not interrupted prior to start) - * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */ - protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) { - internalAfterEnd(flags, task, true, true); - } - /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, - * and, for atomic tasks and scheduled-task submission iterations where - * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */ - protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) { - if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task); - if (startedInThisThread) { - activeTaskCount.decrementAndGet(); - } - if (isEndingAllIterations) { - incompleteTaskIds.remove(task.getId()); - ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task); - ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis()); - } - - if (startedInThisThread) { - PerThreadCurrentTaskHolder.perThreadCurrentTask.remove(); - //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time - if (RENAME_THREADS && startedInThisThread) { - Thread thread = task.getThread(); - if (thread==null) { - log.warn("BasicTask.afterEnd invoked without corresponding beforeStart"); - } else { - thread.setName(threadOriginalName.get()); - threadOriginalName.remove(); - } - } - ((TaskInternal<?>)task).setThread(null); - } - synchronized (task) { task.notifyAll(); } - } - - public TaskScheduler getTaskSchedulerForTag(Object tag) { - return schedulerByTag.get(tag); - } - - public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) { - synchronized (schedulerByTag) { - TaskScheduler old = getTaskSchedulerForTag(tag); - if (old!=null) { - if (scheduler.isAssignableFrom(old.getClass())) { - /* already have such an instance */ - return; - } - //might support multiple in future... - throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")"); - } - try { - TaskScheduler schedulerI = scheduler.newInstance(); - // allow scheduler to have a nice name, for logging etc - if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag); - setTaskSchedulerForTag(tag, schedulerI); - } catch (InstantiationException e) { - throw Exceptions.propagate(e); - } catch (IllegalAccessException e) { - throw Exceptions.propagate(e); - } - } - } - - /** - * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag. - * - * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class) - * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two. - * - * @see #setTaskSchedulerForTag(Object, Class) - */ - public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) { - synchronized (schedulerByTag) { - scheduler.injectExecutor(runner); - - Object old = schedulerByTag.put(tag, scheduler); - if (old!=null && old!=scheduler) { - //might support multiple in future... - throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")"); - } - } - } - - /** - * Forgets that any scheduler was associated with a tag. - * - * @see #setTaskSchedulerForTag(Object, TaskScheduler) - * @see #setTaskSchedulerForTag(Object, Class) - */ - public boolean clearTaskSchedulerForTag(Object tag) { - synchronized (schedulerByTag) { - Object old = schedulerByTag.remove(tag); - return (old!=null); - } - } - - @VisibleForTesting - public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() { - return schedulerByTag; - } - -}
