http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java 
b/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java
new file mode 100644
index 0000000..849a33b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/osgi/Osgis.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.osgi;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.Attributes;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import javax.annotation.Nullable;
+
+import org.apache.felix.framework.FrameworkFactory;
+import org.apache.felix.framework.util.StringMap;
+import org.apache.felix.framework.util.manifestparser.ManifestParser;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Version;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.namespace.PackageNamespace;
+import org.osgi.framework.wiring.BundleCapability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle;
+import org.apache.brooklyn.core.util.ResourceUtils;
+import org.apache.brooklyn.core.util.osgi.Osgis;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.ReferenceWithError;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
+import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Stopwatch;
+
+/** 
+ * utilities for working with osgi.
+ * osgi support is in early days (June 2014) so this class is beta, subject to 
change,
+ * particularly in how framework is started and bundles installed.
+ * 
+ * @since 0.7.0  */
+@Beta
+public class Osgis {
+    private static final Logger LOG = LoggerFactory.getLogger(Osgis.class);
+
+    private static final String EXTENSION_PROTOCOL = "system";
+    private static final String MANIFEST_PATH = "META-INF/MANIFEST.MF";
+    private static final Set<String> SYSTEM_BUNDLES = MutableSet.of();
+
+    public static class VersionedName {
+        private final String symbolicName;
+        private final Version version;
+        public VersionedName(Bundle b) {
+            this.symbolicName = b.getSymbolicName();
+            this.version = b.getVersion();
+        }
+        public VersionedName(String symbolicName, Version version) {
+            this.symbolicName = symbolicName;
+            this.version = version;
+        }
+        @Override public String toString() {
+            return symbolicName + ":" + Strings.toString(version);
+        }
+        public boolean equals(String sn, String v) {
+            return symbolicName.equals(sn) && (version == null && v == null || 
version != null && version.toString().equals(v));
+        }
+        public boolean equals(String sn, Version v) {
+            return symbolicName.equals(sn) && (version == null && v == null || 
version != null && version.equals(v));
+        }
+        public String getSymbolicName() {
+            return symbolicName;
+        }
+        public Version getVersion() {
+            return version;
+        }
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(symbolicName, version);
+        }
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof VersionedName)) return false;
+            VersionedName o = (VersionedName) other;
+            return Objects.equal(symbolicName, o.symbolicName) && 
Objects.equal(version, o.version);
+        }
+    }
+    
+    public static class BundleFinder {
+        protected final Framework framework;
+        protected String symbolicName;
+        protected String version;
+        protected String url;
+        protected boolean urlMandatory = false;
+        protected final List<Predicate<? super Bundle>> predicates = 
MutableList.of();
+        
+        protected BundleFinder(Framework framework) {
+            this.framework = framework;
+        }
+
+        public BundleFinder symbolicName(String symbolicName) {
+            this.symbolicName = symbolicName;
+            return this;
+        }
+
+        public BundleFinder version(String version) {
+            this.version = version;
+            return this;
+        }
+        
+        public BundleFinder id(String symbolicNameOptionallyWithVersion) {
+            if (Strings.isBlank(symbolicNameOptionallyWithVersion))
+                return this;
+            
+            Maybe<VersionedName> nv = 
parseOsgiIdentifier(symbolicNameOptionallyWithVersion);
+            if (nv.isAbsent())
+                throw new IllegalArgumentException("Cannot parse 
symbolic-name:version string '"+symbolicNameOptionallyWithVersion+"'");
+
+            return id(nv.get());
+        }
+
+        private BundleFinder id(VersionedName nv) {
+            symbolicName(nv.getSymbolicName());
+            if (nv.getVersion() != null) {
+                version(nv.getVersion().toString());
+            }
+            return this;
+        }
+
+        public BundleFinder bundle(CatalogBundle bundle) {
+            if (bundle.isNamed()) {
+                symbolicName(bundle.getSymbolicName());
+                version(bundle.getVersion());
+            }
+            if (bundle.getUrl() != null) {
+                requiringFromUrl(bundle.getUrl());
+            }
+            return this;
+        }
+
+        /** Looks for a bundle matching the given URL;
+         * unlike {@link #requiringFromUrl(String)} however, if the URL does 
not match any bundles
+         * it will return other matching bundles <i>if</if> a {@link 
#symbolicName(String)} is specified.
+         */
+        public BundleFinder preferringFromUrl(String url) {
+            this.url = url;
+            urlMandatory = false;
+            return this;
+        }
+
+        /** Requires the bundle to have the given URL set as its location. */
+        public BundleFinder requiringFromUrl(String url) {
+            this.url = url;
+            urlMandatory = true;
+            return this;
+        }
+
+        /** Finds the best matching bundle. */
+        public Maybe<Bundle> find() {
+            return findOne(false);
+        }
+        
+        /** Finds the matching bundle, requiring it to be unique. */
+        public Maybe<Bundle> findUnique() {
+            return findOne(true);
+        }
+
+        protected Maybe<Bundle> findOne(boolean requireExactlyOne) {
+            if (symbolicName==null && url==null)
+                throw new IllegalStateException(this+" must be given either a 
symbolic name or a URL");
+            
+            List<Bundle> result = findAll();
+            if (result.isEmpty())
+                return Maybe.absent("No bundle matching 
"+getConstraintsDescription());
+            if (requireExactlyOne && result.size()>1)
+                return Maybe.absent("Multiple bundles ("+result.size()+") 
matching "+getConstraintsDescription());
+            
+            return Maybe.of(result.get(0));
+        }
+        
+        /** Finds all matching bundles, in decreasing version order. */
+        public List<Bundle> findAll() {
+            boolean urlMatched = false;
+            List<Bundle> result = MutableList.of();
+            for (Bundle b: framework.getBundleContext().getBundles()) {
+                if (symbolicName!=null && 
!symbolicName.equals(b.getSymbolicName())) continue;
+                if (version!=null && 
!Version.parseVersion(version).equals(b.getVersion())) continue;
+                for (Predicate<? super Bundle> predicate: predicates) {
+                    if (!predicate.apply(b)) continue;
+                }
+
+                // check url last, because if it isn't mandatory we should 
only clear if we find a url
+                // for which the other items also match
+                if (url!=null) {
+                    boolean matches = url.equals(b.getLocation());
+                    if (urlMandatory) {
+                        if (!matches) continue;
+                        else urlMatched = true;
+                    } else {
+                        if (matches) {
+                            if (!urlMatched) {
+                                result.clear();
+                                urlMatched = true;
+                            }
+                        } else {
+                            if (urlMatched) {
+                                // can't use this bundle as we have previously 
found a preferred bundle, with a matching url
+                                continue;
+                            }
+                        }
+                    }
+                }
+                                
+                result.add(b);
+            }
+            
+            if (symbolicName==null && url!=null && !urlMatched) {
+                // if we only "preferred" the url, and we did not match it, 
and we did not have a symbolic name,
+                // then clear the results list!
+                result.clear();
+            }
+
+            Collections.sort(result, new Comparator<Bundle>() {
+                @Override
+                public int compare(Bundle o1, Bundle o2) {
+                    return o2.getVersion().compareTo(o1.getVersion());
+                }
+            });
+            
+            return result;
+        }
+        
+        public String getConstraintsDescription() {
+            List<String> parts = MutableList.of();
+            if (symbolicName!=null) parts.add("symbolicName="+symbolicName);
+            if (version!=null) parts.add("version="+version);
+            if (url!=null)
+                parts.add("url["+(urlMandatory ? "required" : 
"preferred")+"]="+url);
+            if (!predicates.isEmpty())
+                parts.add("predicates="+predicates);
+            return Joiner.on(";").join(parts);
+        }
+        
+        public String toString() {
+            return 
getClass().getCanonicalName()+"["+getConstraintsDescription()+"]";
+        }
+
+        public BundleFinder version(final Predicate<Version> versionPredicate) 
{
+            return satisfying(new Predicate<Bundle>() {
+                @Override
+                public boolean apply(Bundle input) {
+                    return versionPredicate.apply(input.getVersion());
+                }
+            });
+        }
+        
+        public BundleFinder satisfying(Predicate<? super Bundle> predicate) {
+            predicates.add(predicate);
+            return this;
+        }
+    }
+    
+    public static BundleFinder bundleFinder(Framework framework) {
+        return new BundleFinder(framework);
+    }
+
+    /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ 
@Deprecated
+    public static List<Bundle> getBundlesByName(Framework framework, String 
symbolicName, Predicate<Version> versionMatcher) {
+        return 
bundleFinder(framework).symbolicName(symbolicName).version(versionMatcher).findAll();
+    }
+
+    /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ 
@Deprecated
+    public static List<Bundle> getBundlesByName(Framework framework, String 
symbolicName) {
+        return bundleFinder(framework).symbolicName(symbolicName).findAll();
+    }
+
+    /**
+     * Tries to find a bundle in the given framework with name matching either 
`name' or `name:version'.
+     * @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ 
@Deprecated
+    public static Maybe<Bundle> getBundle(Framework framework, String 
symbolicNameOptionallyWithVersion) {
+        return 
bundleFinder(framework).id(symbolicNameOptionallyWithVersion).find();
+    }
+    
+    /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ 
@Deprecated
+    public static Maybe<Bundle> getBundle(Framework framework, String 
symbolicName, String version) {
+        return 
bundleFinder(framework).symbolicName(symbolicName).version(version).find();
+    }
+
+    /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ 
@Deprecated
+    public static Maybe<Bundle> getBundle(Framework framework, String 
symbolicName, Version version) {
+        return 
bundleFinder(framework).symbolicName(symbolicName).version(Predicates.equalTo(version)).findUnique();
+    }
+
+    // -------- creating
+    
+    /*
+     * loading framework factory and starting framework based on:
+     * 
http://felix.apache.org/documentation/subprojects/apache-felix-framework/apache-felix-framework-launching-and-embedding.html
+     */
+    
+    public static FrameworkFactory newFrameworkFactory() {
+        URL url = Osgis.class.getClassLoader().getResource(
+                
"META-INF/services/org.osgi.framework.launch.FrameworkFactory");
+        if (url != null) {
+            try {
+                BufferedReader br = new BufferedReader(new 
InputStreamReader(url.openStream()));
+                try {
+                    for (String s = br.readLine(); s != null; s = 
br.readLine()) {
+                        s = s.trim();
+                        // load the first non-empty, non-commented line
+                        if ((s.length() > 0) && (s.charAt(0) != '#')) {
+                            return (FrameworkFactory) 
Class.forName(s).newInstance();
+                        }
+                    }
+                } finally {
+                    if (br != null) br.close();
+                }
+            } catch (Exception e) {
+                // class creation exceptions are not interesting to caller...
+                throw Exceptions.propagate(e);
+            }
+        }
+        throw new IllegalStateException("Could not find framework factory.");
+    }
+    
+    public static Framework newFrameworkStarted(String felixCacheDir, boolean 
clean, Map<?,?> extraStartupConfig) {
+        Map<Object,Object> cfg = MutableMap.copyOf(extraStartupConfig);
+        if (clean) cfg.put(Constants.FRAMEWORK_STORAGE_CLEAN, "onFirstInit");
+        if (felixCacheDir!=null) cfg.put(Constants.FRAMEWORK_STORAGE, 
felixCacheDir);
+        cfg.put(Constants.FRAMEWORK_BSNVERSION, 
Constants.FRAMEWORK_BSNVERSION_MULTIPLE);
+        FrameworkFactory factory = newFrameworkFactory();
+
+        Stopwatch timer = Stopwatch.createStarted();
+        Framework framework = factory.newFramework(cfg);
+        try {
+            framework.init();
+            installBootBundles(framework);
+            framework.start();
+        } catch (Exception e) {
+            // framework bundle start exceptions are not interesting to 
caller...
+            throw Exceptions.propagate(e);
+        }
+        LOG.debug("System bundles are: "+SYSTEM_BUNDLES);
+        LOG.debug("OSGi framework started in " + Duration.of(timer));
+        return framework;
+    }
+
+    private static void installBootBundles(Framework framework) {
+        Stopwatch timer = Stopwatch.createStarted();
+        LOG.debug("Installing OSGi boot bundles from 
"+Osgis.class.getClassLoader()+"...");
+        Enumeration<URL> resources;
+        try {
+            resources = 
Osgis.class.getClassLoader().getResources(MANIFEST_PATH);
+        } catch (IOException e) {
+            throw Exceptions.propagate(e);
+        }
+        BundleContext bundleContext = framework.getBundleContext();
+        Map<String, Bundle> installedBundles = 
getInstalledBundlesById(bundleContext);
+        while(resources.hasMoreElements()) {
+            URL url = resources.nextElement();
+            ReferenceWithError<?> installResult = 
installExtensionBundle(bundleContext, url, installedBundles, 
getVersionedId(framework));
+            if (installResult.hasError() && 
!installResult.masksErrorIfPresent()) {
+                // it's reported as a critical error, so warn here
+                LOG.warn("Unable to install manifest from "+url+": 
"+installResult.getError(), installResult.getError());
+            } else {
+                Object result = installResult.getWithoutError();
+                if (result instanceof Bundle) {
+                    String v = getVersionedId( (Bundle)result );
+                    SYSTEM_BUNDLES.add(v);
+                    if (installResult.hasError()) {
+                        
LOG.debug(installResult.getError().getMessage()+(result!=null ? " 
("+result+"/"+v+")" : ""));
+                    } else {
+                        LOG.debug("Installed "+v+" from "+url);
+                    }
+                } else if (installResult.hasError()) {
+                    LOG.debug(installResult.getError().getMessage());
+                }
+            }
+        }
+        LOG.debug("Installed OSGi boot bundles in 
"+Time.makeTimeStringRounded(timer)+": 
"+Arrays.asList(framework.getBundleContext().getBundles()));
+    }
+
+    private static Map<String, Bundle> getInstalledBundlesById(BundleContext 
bundleContext) {
+        Map<String, Bundle> installedBundles = new HashMap<String, Bundle>();
+        Bundle[] bundles = bundleContext.getBundles();
+        for (Bundle b : bundles) {
+            installedBundles.put(getVersionedId(b), b);
+        }
+        return installedBundles;
+    }
+
+    /** Wraps the bundle if successful or already installed, wraps TRUE if 
it's the system entry,
+     * wraps null if the bundle is already installed from somewhere else;
+     * in all these cases <i>masking</i> an explanatory error if already 
installed or it's the system entry.
+     * <p>
+     * Returns an instance wrapping null and <i>throwing</i> an error if the 
bundle could not be installed.
+     */
+    private static ReferenceWithError<?> installExtensionBundle(BundleContext 
bundleContext, URL manifestUrl, Map<String, Bundle> installedBundles, String 
frameworkVersionedId) {
+        //ignore http://felix.extensions:9/ system entry
+        if("felix.extensions".equals(manifestUrl.getHost())) 
+            return ReferenceWithError.newInstanceMaskingError(null, new 
IllegalArgumentException("Skipping install of internal extension bundle from 
"+manifestUrl));
+
+        try {
+            Manifest manifest = readManifest(manifestUrl);
+            if (!isValidBundle(manifest)) 
+                return ReferenceWithError.newInstanceMaskingError(null, new 
IllegalArgumentException("Resource at "+manifestUrl+" is not an OSGi bundle: no 
valid manifest"));
+            
+            String versionedId = getVersionedId(manifest);
+            URL bundleUrl = ResourceUtils.getContainerUrl(manifestUrl, 
MANIFEST_PATH);
+
+            Bundle existingBundle = installedBundles.get(versionedId);
+            if (existingBundle != null) {
+                if (!bundleUrl.equals(existingBundle.getLocation()) &&
+                        //the framework bundle is always pre-installed, don't 
display duplicate info
+                        !versionedId.equals(frameworkVersionedId)) {
+                    return ReferenceWithError.newInstanceMaskingError(null, 
new IllegalArgumentException("Bundle "+versionedId+" (from manifest " + 
manifestUrl + ") is already installed, from " + existingBundle.getLocation()));
+                }
+                return 
ReferenceWithError.newInstanceMaskingError(existingBundle, new 
IllegalArgumentException("Bundle "+versionedId+" from manifest " + manifestUrl 
+ " is already installed"));
+            }
+            
+            byte[] jar = buildExtensionBundle(manifest);
+            LOG.debug("Installing boot bundle " + bundleUrl);
+            //mark the bundle as extension so we can detect it later using the 
"system:" protocol
+            //(since we cannot access BundleImpl.isExtension)
+            Bundle newBundle = bundleContext.installBundle(EXTENSION_PROTOCOL 
+ ":" + bundleUrl.toString(), new ByteArrayInputStream(jar));
+            installedBundles.put(versionedId, newBundle);
+            return ReferenceWithError.newInstanceWithoutError(newBundle);
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            return ReferenceWithError.newInstanceThrowingError(null, 
+                new IllegalStateException("Problem installing extension bundle 
" + manifestUrl + ": "+e, e));
+        }
+    }
+
+    private static Manifest readManifest(URL manifestUrl) throws IOException {
+        Manifest manifest;
+        InputStream in = null;
+        try {
+            in = manifestUrl.openStream();
+            manifest = new Manifest(in);
+        } finally {
+            if (in != null) {
+                try {in.close();} 
+                catch (Exception e) {};
+            }
+        }
+        return manifest;
+    }
+
+    private static byte[] buildExtensionBundle(Manifest manifest) throws 
IOException {
+        Attributes atts = manifest.getMainAttributes();
+
+        //the following properties are invalid in extension bundles
+        atts.remove(new Attributes.Name(Constants.IMPORT_PACKAGE));
+        atts.remove(new Attributes.Name(Constants.REQUIRE_BUNDLE));
+        atts.remove(new Attributes.Name(Constants.BUNDLE_NATIVECODE));
+        atts.remove(new Attributes.Name(Constants.DYNAMICIMPORT_PACKAGE));
+        atts.remove(new Attributes.Name(Constants.BUNDLE_ACTIVATOR));
+        
+        //mark as extension bundle
+        atts.putValue(Constants.FRAGMENT_HOST, "system.bundle; 
extension:=framework");
+
+        //create the jar containing the manifest
+        ByteArrayOutputStream jar = new ByteArrayOutputStream();
+        JarOutputStream out = new JarOutputStream(jar, manifest);
+        out.close();
+        return jar.toByteArray();
+    }
+
+    private static boolean isValidBundle(Manifest manifest) {
+        Attributes atts = manifest.getMainAttributes();
+        return atts.containsKey(new 
Attributes.Name(Constants.BUNDLE_MANIFESTVERSION));
+    }
+
+    private static String getVersionedId(Bundle b) {
+        return b.getSymbolicName() + ":" + b.getVersion();
+    }
+
+    private static String getVersionedId(Manifest manifest) {
+        Attributes atts = manifest.getMainAttributes();
+        return atts.getValue(Constants.BUNDLE_SYMBOLICNAME) + ":" +
+            atts.getValue(Constants.BUNDLE_VERSION);
+    }
+
+    /**
+     * Installs a bundle from the given URL, doing a check if already 
installed, and
+     * using the {@link ResourceUtils} loader for this project (brooklyn core)
+     */
+    public static Bundle install(Framework framework, String url) throws 
BundleException {
+        boolean isLocal = isLocalUrl(url);
+        String localUrl = url;
+        if (!isLocal) {
+            localUrl = cacheFile(url);
+        }
+
+        try {
+            Bundle bundle = getInstalledBundle(framework, localUrl);
+            if (bundle != null) {
+                return bundle;
+            }
+    
+            // use our URL resolution so we get classpath items
+            LOG.debug("Installing bundle into {} from url: {}", framework, 
url);
+            InputStream stream = getUrlStream(localUrl);
+            Bundle installedBundle = 
framework.getBundleContext().installBundle(url, stream);
+            
+            return installedBundle;
+        } finally {
+            if (!isLocal) {
+                try {
+                    new File(new URI(localUrl)).delete();
+                } catch (URISyntaxException e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+        }
+    }
+
+    private static String cacheFile(String url) {
+        InputStream in = getUrlStream(url);
+        File cache = Os.writeToTempFile(in, "bundle-cache", "jar");
+        return cache.toURI().toString();
+    }
+
+    private static boolean isLocalUrl(String url) {
+        String protocol = Urls.getProtocol(url);
+        return "file".equals(protocol) ||
+                "classpath".equals(protocol) ||
+                "jar".equals(protocol);
+    }
+
+    private static Bundle getInstalledBundle(Framework framework, String url) {
+        Bundle bundle = framework.getBundleContext().getBundle(url);
+        if (bundle != null) {
+            return bundle;
+        }
+
+        // We now support same version installed multiple times (avail since 
OSGi 4.3+).
+        // However we do not support overriding *system* bundles, ie anything 
already on the classpath.
+        // If we wanted to disable multiple versions, see comments below, and 
reference to FRAMEWORK_BSNVERSION_MULTIPLE above.
+        
+        // Felix already assumes the stream is pointing to a JAR
+        JarInputStream stream;
+        try {
+            stream = new JarInputStream(getUrlStream(url));
+        } catch (IOException e) {
+            throw Exceptions.propagate(e);
+        }
+        Manifest manifest = stream.getManifest();
+        Streams.closeQuietly(stream);
+        if (manifest == null) {
+            throw new IllegalStateException("Missing manifest file in bundle 
or not a jar file.");
+        }
+        String versionedId = getVersionedId(manifest);
+        for (Bundle installedBundle : 
framework.getBundleContext().getBundles()) {
+            if (versionedId.equals(getVersionedId(installedBundle))) {
+                if (SYSTEM_BUNDLES.contains(versionedId)) {
+                    LOG.debug("Already have system bundle "+versionedId+" from 
"+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; 
not installing");
+                    // "System bundles" (ie things on the classpath) cannot be 
overridden
+                    return installedBundle;
+                } else {
+                    LOG.debug("Already have bundle "+versionedId+" from 
"+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; 
but it is not a system bundle so proceeding");
+                    // Other bundles can be installed multiple times. To 
ignore multiples and continue to use the old one, 
+                    // just return the installedBundle as done just above for 
system bundles.
+                }
+            }
+        }
+        return null;
+    }
+
+    private static InputStream getUrlStream(String url) {
+        return ResourceUtils.create(Osgis.class).getResourceFromUrl(url);
+    }
+    
+    public static boolean isExtensionBundle(Bundle bundle) {
+        String location = bundle.getLocation();
+        return location != null && 
+                EXTENSION_PROTOCOL.equals(Urls.getProtocol(location));
+    }
+
+    /** Takes a string which might be of the form "symbolic-name" or 
"symbolic-name:version" (or something else entirely)
+     * and returns a VersionedName. The versionedName.getVersion() will be 
null if if there was no version in the input
+     * (or returning {@link Maybe#absent()} if not valid, with a suitable 
error message). */
+    public static Maybe<VersionedName> parseOsgiIdentifier(String 
symbolicNameOptionalWithVersion) {
+        if (Strings.isBlank(symbolicNameOptionalWithVersion))
+            return Maybe.absent("OSGi identifier is blank");
+        
+        String[] parts = symbolicNameOptionalWithVersion.split(":");
+        if (parts.length>2)
+            return Maybe.absent("OSGi identifier has too many parts; max one 
':' symbol");
+        
+        Version v = null;
+        if (parts.length == 2) {
+            try {
+                v = Version.parseVersion(parts[1]);
+            } catch (IllegalArgumentException e) {
+                return Maybe.absent("OSGi identifier has invalid version 
string ("+e.getMessage()+")");
+            }
+        }
+        
+        return Maybe.of(new VersionedName(parts[0], v));
+    }
+
+    /**
+     * The class is not used, staying for future reference.
+     * Remove after OSGi transition is completed.
+     */
+    public static class ManifestHelper {
+        
+        private static ManifestParser parse;
+        private Manifest manifest;
+        private String source;
+
+        private static final String WIRING_PACKAGE = 
PackageNamespace.PACKAGE_NAMESPACE;
+        
+        public static ManifestHelper forManifestContents(String contents) 
throws IOException, BundleException {
+            ManifestHelper result = 
forManifest(Streams.newInputStreamWithContents(contents));
+            result.source = contents;
+            return result;
+        }
+        
+        public static ManifestHelper forManifest(URL url) throws IOException, 
BundleException {
+            InputStream in = null;
+            try {
+                in = url.openStream();
+                return forManifest(in);
+            } finally {
+                if (in != null) in.close();
+            }
+        }
+        
+        public static ManifestHelper forManifest(InputStream in) throws 
IOException, BundleException {
+            return forManifest(new Manifest(in));
+        }
+
+        public static ManifestHelper forManifest(Manifest manifest) throws 
BundleException {
+            ManifestHelper result = new ManifestHelper();
+            result.manifest = manifest;
+            parse = new ManifestParser(null, null, null, new 
StringMap(manifest.getMainAttributes()));
+            return result;
+        }
+        
+        public String getSymbolicName() {
+            return parse.getSymbolicName();
+        }
+
+        public Version getVersion() {
+            return parse.getBundleVersion();
+        }
+
+        public String getSymbolicNameVersion() {
+            return getSymbolicName()+":"+getVersion();
+        }
+
+        public List<String> getExportedPackages() {
+            MutableList<String> result = MutableList.of();
+            for (BundleCapability c: parse.getCapabilities()) {
+                if (WIRING_PACKAGE.equals(c.getNamespace())) {
+                    result.add((String)c.getAttributes().get(WIRING_PACKAGE));
+                }
+            }
+            return result;
+        }
+        
+        @Nullable public String getSource() {
+            return source;
+        }
+        
+        public Manifest getManifest() {
+            return manifest;
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java
new file mode 100644
index 0000000..3eea5d9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/core/util/task/AbstractExecutionContext.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+
+import com.google.common.collect.Maps;
+
+public abstract class AbstractExecutionContext implements ExecutionContext {
+
+    /**
+     * Submits the given runnable/callable/task for execution (in a separate 
thread);
+     * supported keys in the map include: tags (add'l tags to put on the 
resulting task), 
+     * description (string), and others as described in the reference below
+     *   
+     * @see ExecutionManager#submit(Map, Task) 
+     */
+    @Override
+    public Task<?> submit(Map<?,?> properties, Runnable runnable) { return 
submitInternal(properties, runnable); }
+    
+    /** @see #submit(Map, Runnable) */
+    @Override
+    public Task<?> submit(Runnable runnable) { return 
submitInternal(Maps.newLinkedHashMap(), runnable); }
+ 
+    /** @see #submit(Map, Runnable) */
+    @Override
+    public <T> Task<T> submit(Callable<T> callable) { return 
submitInternal(Maps.newLinkedHashMap(), callable); }
+    
+    /** @see #submit(Map, Runnable) */
+    @Override
+    public <T> Task<T> submit(Map<?,?> properties, Callable<T> callable) { 
return submitInternal(properties, callable); }
+ 
+    /** @see #submit(Map, Runnable) */
+    @Override
+    public <T> Task<T> submit(TaskAdaptable<T> task) { return 
submitInternal(Maps.newLinkedHashMap(), task.asTask()); }
+
+    /** @see #submit(Map, Runnable) */
+    @Override
+    public <T> Task<T> submit(Map<?,?> properties, TaskAdaptable<T> task) { 
return submitInternal(properties, task.asTask()); }
+
+    /**
+     * Provided for compatibility
+     * 
+     * Submit is preferred if a handle on the resulting Task is desired 
(although a task can be passed in so this is not always necessary) 
+     *
+     * @see #submit(Map, Runnable) 
+     */
+    public void execute(Runnable r) { submit(r); }
+
+    /** does the work internally of submitting the task; note that the return 
value may be a wrapper task even if a task is passed in,
+     * if the execution context where the target should run is different (e.g. 
submitting an effector task cross-context) */
+    protected abstract <T> Task<T> submitInternal(Map<?,?> properties, Object 
task);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java
new file mode 100644
index 0000000..1d28b4e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionContext.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.entitlement.EntitlementContext;
+import org.apache.brooklyn.core.management.entitlement.Entitlements;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity;
+import brooklyn.entity.basic.EntityInternal;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+/**
+ * A means of executing tasks against an ExecutionManager with a given 
bucket/set of tags pre-defined
+ * (so that it can look like an {@link Executor} and also supply {@link 
ExecutorService#submit(Callable)}
+ */
+public class BasicExecutionContext extends AbstractExecutionContext {
+    
+    private static final Logger log = 
LoggerFactory.getLogger(BasicExecutionContext.class);
+    
+    static final ThreadLocal<BasicExecutionContext> perThreadExecutionContext 
= new ThreadLocal<BasicExecutionContext>();
+    
+    public static BasicExecutionContext getCurrentExecutionContext() { return 
perThreadExecutionContext.get(); }
+
+    final ExecutionManager executionManager;
+    final Set<Object> tags = new LinkedHashSet<Object>();
+
+    public BasicExecutionContext(ExecutionManager executionManager) {
+        this(Collections.emptyMap(), executionManager);
+    }
+    
+    /**
+     * Supported flags are {@code tag} and {@code tags}
+     * 
+     * @see ExecutionManager#submit(Map, Task)
+     */
+    public BasicExecutionContext(Map<?, ?> flags, ExecutionManager 
executionManager) {
+        this.executionManager = executionManager;
+
+        if (flags.get("tag") != null) tags.add(flags.remove("tag"));
+        if (flags.containsKey("tags")) 
tags.addAll((Collection<?>)flags.remove("tags"));
+
+        // FIXME brooklyn-specific check, just for sanity
+        // the context tag should always be a non-proxy entity, because that 
is what is passed to effector tasks
+        // which may require access to internal methods
+        for (Object tag: tags) {
+            if (tag instanceof BrooklynTaskTags.WrappedEntity) {
+                if 
(Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) {
+                    log.warn(""+this+" has entity proxy in "+tag);
+                }
+            }
+        }
+    }
+
+    public ExecutionManager getExecutionManager() {
+        return executionManager;
+    }
+    
+    /** returns tasks started by this context (or tasks which have all the 
tags on this object) */
+    public Set<Task<?>> getTasks() { return 
executionManager.getTasksWithAllTags((Set<?>)tags); }
+     
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object 
task) {
+        if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) 
+            return submitInternal(propertiesQ, 
((TaskAdaptable<?>)task).asTask());
+        
+        Map properties = propertiesQ;
+        if (properties.get("tags")==null) properties.put("tags", new 
ArrayList()); 
+        Collection taskTags = (Collection)properties.get("tags");
+        
+        // FIXME some of this is brooklyn-specific logic, should be moved to a 
BrooklynExecContext subclass;
+        // the issue is that we want to ensure that cross-entity calls switch 
execution contexts;
+        // previously it was all very messy how that was handled (and it 
didn't really handle it in many cases)
+        if (task instanceof Task<?>) taskTags.addAll( 
((Task<?>)task).getTags() ); 
+        Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, 
BrooklynTaskTags.TARGET_ENTITY);
+        
+        if (target!=null && 
!tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
+            // task is switching execution context boundaries
+            /* 
+             * longer notes:
+             * you fall in to this block if the caller requests a target 
entity different to the current context 
+             * (e.g. where entity X is invoking an effector on Y, it will 
start in X's context, 
+             * but the effector should run in Y's context).
+             * 
+             * if X is invoking an effector on himself in his own context, or 
a sensor or other task, it will not come in to this block.
+             */
+            final ExecutionContext tc = 
((EntityInternal)target).getExecutionContext();
+            if (log.isDebugEnabled())
+                log.debug("Switching task context on execution of "+task+": 
from "+this+" to "+target+" (in "+Tasks.current()+")");
+            
+            if (task instanceof Task<?>) {
+                final Task<T> t = (Task<T>)task;
+                if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() 
instanceof HasTaskChildren) || 
+                        !Iterables.contains( 
((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
+                    // this task is switching execution context boundaries 
_and_ it is not a child and not yet queued,
+                    // so wrap it in a task running in this context to keep a 
reference to the child
+                    // (this matters when we are navigating in the GUI; 
without it we lose the reference to the child 
+                    // when browsing in the context of the parent)
+                    return submit(Tasks.<T>builder().name("Cross-context 
execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() {
+                        public T call() { 
+                            return DynamicTasks.get(t); 
+                        }
+                    }).build());
+                } else {
+                    // if we are already tracked by parent, just submit it 
+                    return tc.submit(t);
+                }
+            } else {
+                // as above, but here we are definitely not a child (what we 
are submitting isn't even a task)
+                // (will only come here if properties defines tags including a 
target entity, which probably never happens) 
+                submit(Tasks.<T>builder().name("Cross-context 
execution").dynamic(true).body(new Callable<T>() {
+                    public T call() {
+                        if (task instanceof Callable) {
+                            return DynamicTasks.queue( 
Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() 
).getUnchecked();
+                        } else if (task instanceof Runnable) {
+                            return DynamicTasks.queue( 
Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked();
+                        } else {
+                            throw new IllegalArgumentException("Unhandled task 
type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
+                        }
+                    }
+                }).build());
+            }
+        }
+        
+        EntitlementContext entitlementContext = 
BrooklynTaskTags.getEntitlement(taskTags);
+        if (entitlementContext==null)
+        entitlementContext = Entitlements.getEntitlementContext();
+        if (entitlementContext!=null) {
+            
taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext));
+        }
+
+        taskTags.addAll(tags);
+        
+        if (Tasks.current()!=null && 
BrooklynTaskTags.isTransient(Tasks.current()) 
+                && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) 
&& !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
+            // tag as transient if submitter is transient, unless explicitly 
tagged as non-transient
+            taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+        }
+        
+        final Object startCallback = properties.get("newTaskStartCallback");
+        properties.put("newTaskStartCallback", new Function<Object,Void>() {
+            public Void apply(Object it) {
+                registerPerThreadExecutionContext();
+                if (startCallback!=null) ExecutionUtils.invoke(startCallback, 
it);
+                return null;
+            }});
+        
+        final Object endCallback = properties.get("newTaskEndCallback");
+        properties.put("newTaskEndCallback", new Function<Object,Void>() {
+            public Void apply(Object it) {
+                try {
+                    if (endCallback!=null) ExecutionUtils.invoke(endCallback, 
it);
+                } finally {
+                    clearPerThreadExecutionContext();
+                }
+                return null;
+            }});
+        
+        if (task instanceof Task) {
+            return executionManager.submit(properties, (Task)task);
+        } else if (task instanceof Callable) {
+            return executionManager.submit(properties, (Callable)task);
+        } else if (task instanceof Runnable) {
+            return (Task<T>) executionManager.submit(properties, 
(Runnable)task);
+        } else {
+            throw new IllegalArgumentException("Unhandled task type: 
task="+task+"; type="+(task!=null ? task.getClass() : "null"));
+        }
+    }
+    
+    private void registerPerThreadExecutionContext() { 
perThreadExecutionContext.set(this); }
+
+    private void clearPerThreadExecutionContext() { 
perThreadExecutionContext.remove(); }
+
+    @Override
+    public boolean isShutdown() {
+        return getExecutionManager().isShutdown();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()+"("+tags+")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java
new file mode 100644
index 0000000..f93abe1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicExecutionManager.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Identifiers;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manages the execution of atomic tasks and scheduled (recurring) tasks,
+ * including setting tags and invoking callbacks.
+ */
+public class BasicExecutionManager implements ExecutionManager {
+    private static final Logger log = 
LoggerFactory.getLogger(BasicExecutionManager.class);
+
+    private static final boolean RENAME_THREADS = 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS);
+    
+    private static class PerThreadCurrentTaskHolder {
+        public static final ThreadLocal<Task<?>> perThreadCurrentTask = new 
ThreadLocal<Task<?>>();
+    }
+
+    public static ThreadLocal<Task<?>> getPerThreadCurrentTask() {
+        return PerThreadCurrentTaskHolder.perThreadCurrentTask;
+    }
+
+    private final ThreadFactory threadFactory;
+    
+    private final ThreadFactory daemonThreadFactory;
+    
+    private final ExecutorService runner;
+        
+    private final ScheduledExecutorService delayedRunner;
+    
+    // TODO Could have a set of all knownTasks; but instead we're having a 
separate set per tag,
+    // so the same task could be listed multiple times if it has multiple 
tags...
+
+    //access to this field AND to members in this field is synchronized, 
+    //to allow us to preserve order while guaranteeing thread-safe
+    //(but more testing is needed before we are completely sure it is 
thread-safe!)
+    //synch blocks are as finely grained as possible for efficiency;
+    //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it 
easier to remove when a tag is empty
+    private Map<Object,Set<Task<?>>> tasksByTag = new 
HashMap<Object,Set<Task<?>>>();
+    
+    private ConcurrentMap<String,Task<?>> tasksById = new 
ConcurrentHashMap<String,Task<?>>();
+
+    private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new 
ConcurrentHashMap<Object, TaskScheduler>();
+
+    /** count of all tasks submitted, including finished */
+    private final AtomicLong totalTaskCount = new AtomicLong();
+    
+    /** tasks submitted but not yet done (or in cases of 
interruption/cancelled not yet GC'd) */
+    private Map<String,String> incompleteTaskIds = new 
ConcurrentHashMap<String,String>();
+    
+    /** tasks started but not yet finished */
+    private final AtomicInteger activeTaskCount = new AtomicInteger();
+    
+    private final List<ExecutionListener> listeners = new 
CopyOnWriteArrayList<ExecutionListener>();
+    
+    private final static ThreadLocal<String> threadOriginalName = new 
ThreadLocal<String>() {
+        protected String initialValue() {
+            // should not happen, as only access is in _afterEnd with a check 
that _beforeStart was invoked 
+            log.warn("No original name recorded for thread 
"+Thread.currentThread().getName()+"; task "+Tasks.current());
+            return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8);
+        }
+    };
+    
+    public BasicExecutionManager(String contextid) {
+        threadFactory = newThreadFactory(contextid);
+        daemonThreadFactory = new ThreadFactoryBuilder()
+                .setThreadFactory(threadFactory)
+                .setDaemon(true)
+                .build();
+                
+        // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout 
of 1s rather than 60s for better shutdown!
+        runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
+                daemonThreadFactory);
+            
+        delayedRunner = new ScheduledThreadPoolExecutor(1, 
daemonThreadFactory);
+    }
+    
+    private final static class UncaughtExceptionHandlerImplementation 
implements Thread.UncaughtExceptionHandler {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            log.error("Uncaught exception in thread "+t.getName(), e);
+        }
+    }
+    
+    /** 
+     * For use by overriders to use custom thread factory.
+     * But be extremely careful: called by constructor, so before sub-class' 
constructor will
+     * have been invoked!
+     */
+    protected ThreadFactory newThreadFactory(String contextid) {
+        return new ThreadFactoryBuilder()
+                .setNameFormat("brooklyn-execmanager-"+contextid+"-%d")
+                .setUncaughtExceptionHandler(new 
UncaughtExceptionHandlerImplementation())
+                .build();
+    }
+    
+    public void shutdownNow() {
+        runner.shutdownNow();
+        delayedRunner.shutdownNow();
+    }
+    
+    public void addListener(ExecutionListener listener) {
+        listeners.add(listener);
+    }
+    
+    public void removeListener(ExecutionListener listener) {
+        listeners.remove(listener);
+    }
+    
+    /**
+     * Deletes the given tag, including all tasks using this tag.
+     * 
+     * Useful, for example, if an entity is being expunged so that we don't 
keep holding
+     * a reference to it as a tag.
+     */
+    public void deleteTag(Object tag) {
+        Set<Task<?>> tasks;
+        synchronized (tasksByTag) {
+            tasks = tasksByTag.remove(tag);
+        }
+        if (tasks != null) {
+            for (Task<?> task : tasks) {
+                deleteTask(task);
+            }
+        }
+    }
+
+    public void deleteTask(Task<?> task) {
+        boolean removed = deleteTaskNonRecursive(task);
+        if (!removed) return;
+        
+        if (task instanceof HasTaskChildren) {
+            List<Task<?>> children = 
ImmutableList.copyOf(((HasTaskChildren)task).getChildren());
+            for (Task<?> child : children) {
+                deleteTask(child);
+            }
+        }
+    }
+
+    protected boolean deleteTaskNonRecursive(Task<?> task) {
+        Set<?> tags = checkNotNull(task, "task").getTags();
+        for (Object tag : tags) {
+            synchronized (tasksByTag) {
+                Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
+                if (tasks != null) {
+                    tasks.remove(task);
+                    if (tasks.isEmpty()) {
+                        tasksByTag.remove(tag);
+                    }
+                }
+            }
+        }
+        Task<?> removed = tasksById.remove(task.getId());
+        incompleteTaskIds.remove(task.getId());
+        if (removed!=null && removed.isSubmitted() && !removed.isDone()) {
+            log.warn("Deleting submitted task before completion: "+removed+"; 
this task will continue to run in the background outwith "+this+", but perhaps 
it should have been cancelled?");
+        }
+        return removed != null;
+    }
+
+    public boolean isShutdown() {
+        return runner.isShutdown();
+    }
+    
+    /** count of all tasks submitted */
+    public long getTotalTasksSubmitted() {
+        return totalTaskCount.get();
+    }
+    
+    /** count of tasks submitted but not ended */
+    public long getNumIncompleteTasks() {
+        return incompleteTaskIds.size();
+    }
+    
+    /** count of tasks started but not ended */
+    public long getNumActiveTasks() {
+        return activeTaskCount.get();
+    }
+
+    /** count of tasks kept in memory, often including ended tasks */
+    public long getNumInMemoryTasks() {
+        return tasksById.size();
+    }
+
+    private Set<Task<?>> tasksWithTagCreating(Object tag) {
+        Preconditions.checkNotNull(tag);
+        synchronized (tasksByTag) {
+            Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+            if (result==null) {
+                result = Collections.synchronizedSet(new 
LinkedHashSet<Task<?>>());
+                tasksByTag.put(tag, result);
+            }
+            return result;
+        }
+    }
+
+    /** exposes live view, for internal use only */
+    @Beta
+    public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
+        synchronized (tasksByTag) {
+            return tasksByTag.get(tag);
+        }
+    }
+
+    @Override
+    public Task<?> getTask(String id) {
+        return tasksById.get(id);
+    }
+    
+    /** not on interface because potentially expensive */
+    public List<Task<?>> getAllTasks() {
+        // not sure if synching makes any difference; have not observed CME's 
yet
+        // (and so far this is only called when a CME was caught on a previous 
operation)
+        synchronized (tasksById) {
+            return MutableList.copyOf(tasksById.values());
+        }
+    }
+    
+    @Override
+    public Set<Task<?>> getTasksWithTag(Object tag) {
+        Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+        if (result==null) return Collections.emptySet();
+        synchronized (result) {
+            return (Set<Task<?>>)Collections.unmodifiableSet(new 
LinkedHashSet<Task<?>>(result));
+        }
+    }
+    
+    @Override
+    public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) {
+        Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+        Iterator<?> ti = tags.iterator();
+        while (ti.hasNext()) {
+            Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
+            if (tasksForTag!=null) {
+                synchronized (tasksForTag) {
+                    result.addAll(tasksForTag);
+                }
+            }
+        }
+        return Collections.unmodifiableSet(result);
+    }
+
+    /** only works with at least one tag; returns empty if no tags */
+    @Override
+    public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
+        //NB: for this method retrieval for multiple tags could be made (much) 
more efficient (if/when it is used with multiple tags!)
+        //by first looking for the least-used tag, getting those tasks, and 
then for each of those tasks
+        //checking whether it contains the other tags (looking for 
second-least used, then third-least used, etc)
+        Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+        boolean first = true;
+        Iterator<?> ti = tags.iterator();
+        while (ti.hasNext()) {
+            Object tag = ti.next();
+            if (first) { 
+                first = false;
+                result.addAll(getTasksWithTag(tag));
+            } else {
+                result.retainAll(getTasksWithTag(tag));
+            }
+        }
+        return Collections.unmodifiableSet(result);
+    }
+
+    /** live view of all tasks, for internal use only */
+    @Beta
+    public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
+    
+    public Set<Object> getTaskTags() { 
+        synchronized (tasksByTag) {
+            return 
Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); 
+        }
+    }
+
+    public Task<?> submit(Runnable r) { return submit(new 
LinkedHashMap<Object,Object>(1), r); }
+    public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, 
new BasicTask<Void>(flags, r)); }
+
+    public <T> Task<T> submit(Callable<T> c) { return submit(new 
LinkedHashMap<Object,Object>(1), c); }
+    public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return 
submit(flags, new BasicTask<T>(flags, c)); }
+
+    public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new 
LinkedHashMap<Object,Object>(1), t); }
+    public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) {
+        if (!(task instanceof Task))
+            task = task.asTask();
+        synchronized (task) {
+            if (((TaskInternal<?>)task).getInternalFuture()!=null) return 
(Task<T>)task;
+            return submitNewTask(flags, (Task<T>) task);
+        }
+    }
+
+    public <T> Task<T> scheduleWith(Task<T> task) { return 
scheduleWith(Collections.emptyMap(), task); }
+    public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) {
+        synchronized (task) {
+            if (((TaskInternal<?>)task).getInternalFuture()!=null) return task;
+            return submitNewTask(flags, task);
+        }
+    }
+
+    protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final 
ScheduledTask task) {
+        tasksById.put(task.getId(), task);
+        totalTaskCount.incrementAndGet();
+        
+        beforeSubmitScheduledTaskAllIterations(flags, task);
+        
+        return submitSubsequentScheduledTask(flags, task);
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, 
final ScheduledTask task) {
+        if (!task.isDone()) {
+            task.internalFuture = delayedRunner.schedule(new 
ScheduledTaskCallable(task, flags),
+                task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
+        } else {
+            afterEndScheduledTaskAllIterations(flags, task);
+        }
+        return task;
+    }
+
+    protected class ScheduledTaskCallable implements Callable<Object> {
+        public ScheduledTask task;
+        public Map<?,?> flags;
+
+        public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
+            this.task = task;
+            this.flags = flags;
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public Object call() {
+            if (task.startTimeUtc==-1) task.startTimeUtc = 
System.currentTimeMillis();
+            TaskInternal<?> taskScheduled = null;
+            try {
+                beforeStartScheduledTaskSubmissionIteration(flags, task);
+                taskScheduled = (TaskInternal<?>) task.newTask();
+                taskScheduled.setSubmittedByTask(task);
+                final Callable<?> oldJob = taskScheduled.getJob();
+                final TaskInternal<?> taskScheduledF = taskScheduled;
+                taskScheduled.setJob(new Callable() { public Object call() {
+                    boolean resubmitted = false;
+                    task.recentRun = taskScheduledF;
+                    try {
+                        synchronized (task) {
+                            task.notifyAll();
+                        }
+                        Object result;
+                        try {
+                            result = oldJob.call();
+                        } catch (Exception e) {
+                            if (!Tasks.isInterrupted()) {
+                                log.warn("Error executing "+oldJob+" 
(scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled 
execution", e);
+                            } else {
+                                log.debug("Interrupted executing "+oldJob+" 
(scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled 
execution: "+e);
+                            }
+                            throw Exceptions.propagate(e);
+                        }
+                        task.runCount++;
+                        if (task.period!=null && !task.isCancelled()) {
+                            task.delay = task.period;
+                            submitSubsequentScheduledTask(flags, task);
+                            resubmitted = true;
+                        }
+                        return result;
+                    } finally {
+                        // do in finally block in case we were interrupted
+                        if (!resubmitted)
+                            afterEndScheduledTaskAllIterations(flags, task);
+                    }
+                }});
+                task.nextRun = taskScheduled;
+                BasicExecutionContext ec = 
BasicExecutionContext.getCurrentExecutionContext();
+                if (ec!=null) return ec.submit(taskScheduled);
+                else return submit(taskScheduled);
+            } finally {
+                afterEndScheduledTaskSubmissionIteration(flags, task, 
taskScheduled);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "ScheduledTaskCallable["+task+","+flags+"]";
+        }
+    }
+
+    private final class SubmissionCallable<T> implements Callable<T> {
+        private final Map<?, ?> flags;
+        private final Task<T> task;
+
+        private SubmissionCallable(Map<?, ?> flags, Task<T> task) {
+            this.flags = flags;
+            this.task = task;
+        }
+
+        public T call() {
+            try {
+                T result = null;
+                Throwable error = null;
+                String oldThreadName = Thread.currentThread().getName();
+                try {
+                    if (RENAME_THREADS) {
+                        String newThreadName = 
oldThreadName+"-"+task.getDisplayName()+
+                            "["+task.getId().substring(0, 8)+"]";
+                        Thread.currentThread().setName(newThreadName);
+                    }
+                    beforeStartAtomicTask(flags, task);
+                    if (!task.isCancelled()) {
+                        result = ((TaskInternal<T>)task).getJob().call();
+                    } else throw new CancellationException();
+                } catch(Throwable e) {
+                    error = e;
+                } finally {
+                    if (RENAME_THREADS) {
+                        Thread.currentThread().setName(oldThreadName);
+                    }
+                    afterEndAtomicTask(flags, task);
+                }
+                if (error!=null) {
+                    /* we throw, after logging debug.
+                     * the throw means the error is available for task 
submitters to monitor.
+                     * however it is possible no one is monitoring it, in 
which case we will have debug logging only for errors.
+                     * (the alternative, of warn-level logging in lots of 
places where we don't want it, seems worse!) 
+                     */
+                    if (log.isDebugEnabled()) {
+                        // debug only here, because most submitters will 
handle failures
+                        log.debug("Exception running task "+task+" 
(rethrowing): "+error.getMessage(), error);
+                        if (log.isTraceEnabled())
+                            log.trace("Trace for exception running task 
"+task+" (rethrowing): "+error.getMessage(), error);
+                    }
+                    throw Exceptions.propagate(error);
+                }
+                return result;
+            } finally {
+                ((TaskInternal<?>)task).runListeners();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "BEM.call("+task+","+flags+")";
+        }
+    }
+
+    private final static class ListenableForwardingFutureForTask<T> extends 
ListenableForwardingFuture<T> {
+        private final Task<T> task;
+
+        private ListenableForwardingFutureForTask(Future<T> delegate, 
ExecutionList list, Task<T> task) {
+            super(delegate, list);
+            this.task = task;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            boolean result = false;
+            if (!task.isCancelled()) result |= 
task.cancel(mayInterruptIfRunning);
+            result |= super.cancel(mayInterruptIfRunning);
+            ((TaskInternal<?>)task).runListeners();
+            return result;
+        }
+    }
+
+    private final class SubmissionListenerToCallOtherListeners<T> implements 
Runnable {
+        private final Task<T> task;
+
+        private SubmissionListenerToCallOtherListeners(Task<T> task) {
+            this.task = task;
+        }
+
+        @Override
+        public void run() {
+            try {
+                ((TaskInternal<?>)task).runListeners();
+            } catch (Exception e) {
+                log.warn("Error running task listeners for task "+task+" 
done", e);
+            }
+            
+            for (ExecutionListener listener : listeners) {
+                try {
+                    listener.onTaskDone(task);
+                } catch (Exception e) {
+                    log.warn("Error running execution listener "+listener+" of 
task "+task+" done", e);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> 
task) {
+        if (task instanceof ScheduledTask)
+            return (Task<T>) submitNewScheduledTask(flags, 
(ScheduledTask)task);
+        
+        tasksById.put(task.getId(), task);
+        totalTaskCount.incrementAndGet();
+        
+        beforeSubmitAtomicTask(flags, task);
+        
+        if (((TaskInternal<T>)task).getJob() == null) 
+            throw new NullPointerException("Task "+task+" submitted with with 
null job: job must be supplied.");
+        
+        Callable<T> job = new SubmissionCallable<T>(flags, task);
+        
+        // If there's a scheduler then use that; otherwise execute it directly
+        Set<TaskScheduler> schedulers = null;
+        for (Object tago: task.getTags()) {
+            TaskScheduler scheduler = getTaskSchedulerForTag(tago);
+            if (scheduler!=null) {
+                if (schedulers==null) schedulers = new 
LinkedHashSet<TaskScheduler>(2);
+                schedulers.add(scheduler);
+            }
+        }
+        Future<T> future;
+        if (schedulers!=null && !schedulers.isEmpty()) {
+            if (schedulers.size()>1) log.warn("multiple schedulers detected, 
using only the first, for "+task+": "+schedulers);
+            future = schedulers.iterator().next().submit(job);
+        } else {
+            future = runner.submit(job);
+        }
+        // on completion, listeners get triggered above; here, below we ensure 
they get triggered on cancel
+        // (and we make sure the same ExecutionList is used in the future as 
in the task)
+        ListenableFuture<T> listenableFuture = new 
ListenableForwardingFutureForTask<T>(future, 
((TaskInternal<T>)task).getListeners(), task);
+        // doesn't matter whether the listener is added to the 
listenableFuture or the task,
+        // except that for the task we can more easily wrap it so that it only 
logs debug if the executor is shutdown
+        // (avoid a bunch of ugly warnings in tests which start and stop 
things a lot!)
+        // [probably even nicer to run this in the same thread, it doesn't do 
much; but that is messier to implement]
+        ((TaskInternal<T>)task).addListener(new 
SubmissionListenerToCallOtherListeners<T>(task), runner);
+        
+        ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
+        
+        return task;
+    }
+    
+    protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, 
Task<?> task) {
+        internalBeforeSubmit(flags, task);
+    }
+    protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
+        internalBeforeSubmit(flags, task);
+    }
+    /** invoked when a task is submitted */
+    protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) {
+        incompleteTaskIds.put(task.getId(), task.getId());
+        
+        Task<?> currentTask = Tasks.current();
+        if (currentTask!=null) 
((TaskInternal<?>)task).setSubmittedByTask(currentTask);
+        ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
+        
+        if (flags.get("tag")!=null) 
((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
+        if (flags.get("tags")!=null) 
((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
+
+        for (Object tag: ((TaskInternal<?>)task).getTags()) {
+            tasksWithTagCreating(tag).add(task);
+        }
+    }
+
+    protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, 
Task<?> task) {
+        internalBeforeStart(flags, task);
+    }
+    protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
+        internalBeforeStart(flags, task);
+    }
+    
+    /** invoked in a task's thread when a task is starting to run (may be some 
time after submitted), 
+     * but before doing any of the task's work, so that we can update 
bookkeeping and notify callbacks */
+    protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
+        activeTaskCount.incrementAndGet();
+        
+        //set thread _before_ start time, so we won't get a null thread when 
there is a start-time
+        if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: 
"+task);
+        if (!task.isCancelled()) {
+            Thread thread = Thread.currentThread();
+            ((TaskInternal<?>)task).setThread(thread);
+            if (RENAME_THREADS) {
+                threadOriginalName.set(thread.getName());
+                String newThreadName = "brooklyn-" + 
CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, 
task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
+                thread.setName(newThreadName);
+            }
+            PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
+            
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
+        }
+        ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+    }
+
+    /** normally (if not interrupted) called once for each call to {@link 
#beforeSubmitScheduledTaskAllIterations(Map, Task)} */
+    protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> 
task) {
+        internalAfterEnd(flags, task, false, true);
+    }
+    /** called once for each call to {@link 
#beforeStartScheduledTaskSubmissionIteration(Map, Task)},
+     * with a per-iteration task generated by the surrounding scheduled task */
+    protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, 
Task<?> scheduledTask, Task<?> taskIteration) {
+        internalAfterEnd(flags, scheduledTask, true, false);
+    }
+    /** called once for each task on which {@link #beforeStartAtomicTask(Map, 
Task)} is invoked,
+     * and normally (if not interrupted prior to start) 
+     * called once for each task on which {@link #beforeSubmitAtomicTask(Map, 
Task)} */
+    protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) {
+        internalAfterEnd(flags, task, true, true);
+    }
+    /** normally (if not interrupted) called once for each call to {@link 
#internalBeforeSubmit(Map, Task)},
+     * and, for atomic tasks and scheduled-task submission iterations where 
+     * always called once if {@link #internalBeforeStart(Map, Task)} is 
invoked and in the same thread as that method */
+    protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean 
startedInThisThread, boolean isEndingAllIterations) {
+        if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
+        if (startedInThisThread) {
+            activeTaskCount.decrementAndGet();
+        }
+        if (isEndingAllIterations) {
+            incompleteTaskIds.remove(task.getId());
+            ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+            ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
+        }
+
+        if (startedInThisThread) {
+            PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
+            //clear thread _after_ endTime set, so we won't get a null thread 
when there is no end-time
+            if (RENAME_THREADS && startedInThisThread) {
+                Thread thread = task.getThread();
+                if (thread==null) {
+                    log.warn("BasicTask.afterEnd invoked without corresponding 
beforeStart");
+                } else {
+                    thread.setName(threadOriginalName.get());
+                    threadOriginalName.remove();
+                }
+            }
+            ((TaskInternal<?>)task).setThread(null);
+        }
+        synchronized (task) { task.notifyAll(); }
+    }
+
+    public TaskScheduler getTaskSchedulerForTag(Object tag) {
+        return schedulerByTag.get(tag);
+    }
+    
+    public void setTaskSchedulerForTag(Object tag, Class<? extends 
TaskScheduler> scheduler) {
+        synchronized (schedulerByTag) {
+            TaskScheduler old = getTaskSchedulerForTag(tag);
+            if (old!=null) {
+                if (scheduler.isAssignableFrom(old.getClass())) {
+                    /* already have such an instance */
+                    return;
+                }
+                //might support multiple in future...
+                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new 
"+scheduler+")");
+            }
+            try {
+                TaskScheduler schedulerI = scheduler.newInstance();
+                // allow scheduler to have a nice name, for logging etc
+                if (schedulerI instanceof CanSetName) 
((CanSetName)schedulerI).setName(""+tag);
+                setTaskSchedulerForTag(tag, schedulerI);
+            } catch (InstantiationException e) {
+                throw Exceptions.propagate(e);
+            } catch (IllegalAccessException e) {
+                throw Exceptions.propagate(e);
+            }
+        }
+    }
+    
+    /**
+     * Defines a {@link TaskScheduler} to run on all subsequently submitted 
jobs with the given tag.
+     *
+     * Maximum of one allowed currently. Resubmissions of the same scheduler 
(or scheduler class)
+     * allowed. If changing, you must call {@link 
#clearTaskSchedulerForTag(Object)} between the two.
+     *
+     * @see #setTaskSchedulerForTag(Object, Class)
+     */
+    public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) {
+        synchronized (schedulerByTag) {
+            scheduler.injectExecutor(runner);
+
+            Object old = schedulerByTag.put(tag, scheduler);
+            if (old!=null && old!=scheduler) {
+                //might support multiple in future...
+                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag "+tag+")");
+            }
+        }
+    }
+
+    /**
+     * Forgets that any scheduler was associated with a tag.
+     *
+     * @see #setTaskSchedulerForTag(Object, TaskScheduler)
+     * @see #setTaskSchedulerForTag(Object, Class)
+     */
+    public boolean clearTaskSchedulerForTag(Object tag) {
+        synchronized (schedulerByTag) {
+            Object old = schedulerByTag.remove(tag);
+            return (old!=null);
+        }
+    }
+    
+    @VisibleForTesting
+    public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() {
+        return schedulerByTag;
+    }
+
+}

Reply via email to