http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/ha/OsgiManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/ha/OsgiManager.java b/core/src/main/java/brooklyn/management/ha/OsgiManager.java deleted file mode 100644 index fd98ed2..0000000 --- a/core/src/main/java/brooklyn/management/ha/OsgiManager.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import java.io.File; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.Enumeration; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; - -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleException; -import org.osgi.framework.launch.Framework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.BrooklynVersion; - -import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; -import org.apache.brooklyn.api.management.ManagementContext; - -import brooklyn.config.BrooklynServerConfig; -import brooklyn.config.BrooklynServerPaths; -import brooklyn.config.ConfigKey; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.os.Os; -import brooklyn.util.os.Os.DeletionResult; -import brooklyn.util.osgi.Osgis; -import brooklyn.util.osgi.Osgis.BundleFinder; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; - -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -public class OsgiManager { - - private static final Logger log = LoggerFactory.getLogger(OsgiManager.class); - - public static final ConfigKey<Boolean> USE_OSGI = BrooklynServerConfig.USE_OSGI; - - /* see Osgis for info on starting framework etc */ - - protected ManagementContext mgmt; - protected Framework framework; - protected File osgiCacheDir; - - public OsgiManager(ManagementContext mgmt) { - this.mgmt = mgmt; - } - - public void start() { - try { - osgiCacheDir = BrooklynServerPaths.getOsgiCacheDirCleanedIfNeeded(mgmt); - - // any extra OSGi startup args could go here - framework = Osgis.newFrameworkStarted(osgiCacheDir.getAbsolutePath(), false, MutableMap.of()); - - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - public void stop() { - try { - if (framework!=null) { - framework.stop(); - framework.waitForStop(0); // 0 means indefinite - } - } catch (BundleException e) { - throw Exceptions.propagate(e); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - if (BrooklynServerPaths.isOsgiCacheForCleaning(mgmt, osgiCacheDir)) { - // See exception reported in https://issues.apache.org/jira/browse/BROOKLYN-72 - // We almost always fail to delete he OSGi temp directory due to a concurrent modification. - // Therefore keep trying. - final AtomicReference<DeletionResult> deletionResult = new AtomicReference<DeletionResult>(); - Repeater.create("Delete OSGi cache dir") - .until(new Callable<Boolean>() { - public Boolean call() { - deletionResult.set(Os.deleteRecursively(osgiCacheDir)); - return deletionResult.get().wasSuccessful(); - }}) - .limitTimeTo(Duration.ONE_SECOND) - .backoffTo(Duration.millis(50)) - .run(); - if (deletionResult.get().getThrowable()!=null) { - log.debug("Unable to delete "+osgiCacheDir+" (possibly being modified concurrently?): "+deletionResult.get().getThrowable()); - } - } - osgiCacheDir = null; - framework = null; - } - - public synchronized void registerBundle(CatalogBundle bundle) { - try { - if (checkBundleInstalledThrowIfInconsistent(bundle)) { - return; - } - - Bundle b = Osgis.install(framework, bundle.getUrl()); - - checkCorrectlyInstalled(bundle, b); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - throw new IllegalStateException("Bundle from "+bundle.getUrl()+" failed to install: " + e.getMessage(), e); - } - } - - private void checkCorrectlyInstalled(CatalogBundle bundle, Bundle b) { - String nv = b.getSymbolicName()+":"+b.getVersion().toString(); - - if (!isBundleNameEqualOrAbsent(bundle, b)) { - throw new IllegalStateException("Bundle already installed as "+nv+" but user explicitly requested "+bundle); - } - - List<Bundle> matches = Osgis.bundleFinder(framework) - .symbolicName(b.getSymbolicName()) - .version(b.getVersion().toString()) - .findAll(); - if (matches.isEmpty()) { - log.error("OSGi could not find bundle "+nv+" in search after installing it from "+bundle.getUrl()); - } else if (matches.size()==1) { - log.debug("Bundle from "+bundle.getUrl()+" successfully installed as " + nv + " ("+b+")"); - } else { - log.warn("OSGi has multiple bundles matching "+nv+", when just installed from "+bundle.getUrl()+": "+matches+"; " - + "brooklyn will prefer the URL-based bundle for top-level references but any dependencies or " - + "import-packages will be at the mercy of OSGi. " - + "It is recommended to use distinct versions for different bundles, and the same URL for the same bundles."); - } - } - - private boolean checkBundleInstalledThrowIfInconsistent(CatalogBundle bundle) { - String bundleUrl = bundle.getUrl(); - if (bundleUrl != null) { - Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).requiringFromUrl(bundleUrl).find(); - if (installedBundle.isPresent()) { - Bundle b = installedBundle.get(); - String nv = b.getSymbolicName()+":"+b.getVersion().toString(); - if (!isBundleNameEqualOrAbsent(bundle, b)) { - throw new IllegalStateException("User requested bundle " + bundle + " but already installed as "+nv); - } else { - log.trace("Bundle from "+bundleUrl+" already installed as "+nv+"; not re-registering"); - } - return true; - } - } else { - Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).symbolicName(bundle.getSymbolicName()).version(bundle.getVersion()).find(); - if (installedBundle.isPresent()) { - log.trace("Bundle "+bundle+" installed from "+installedBundle.get().getLocation()); - } else { - throw new IllegalStateException("Bundle "+bundle+" not previously registered, but URL is empty."); - } - return true; - } - return false; - } - - public static boolean isBundleNameEqualOrAbsent(CatalogBundle bundle, Bundle b) { - return !bundle.isNamed() || - (bundle.getSymbolicName().equals(b.getSymbolicName()) && - bundle.getVersion().equals(b.getVersion().toString())); - } - - public <T> Maybe<Class<T>> tryResolveClass(String type, CatalogBundle... catalogBundles) { - return tryResolveClass(type, Arrays.asList(catalogBundles)); - } - public <T> Maybe<Class<T>> tryResolveClass(String type, Iterable<CatalogBundle> catalogBundles) { - Map<CatalogBundle,Throwable> bundleProblems = MutableMap.of(); - Set<String> extraMessages = MutableSet.of(); - for (CatalogBundle catalogBundle: catalogBundles) { - try { - Maybe<Bundle> bundle = findBundle(catalogBundle); - if (bundle.isPresent()) { - Bundle b = bundle.get(); - Class<T> clazz; - //Extension bundles don't support loadClass. - //Instead load from the app classpath. - if (Osgis.isExtensionBundle(b)) { - @SuppressWarnings("unchecked") - Class<T> c = (Class<T>)Class.forName(type); - clazz = c; - } else { - @SuppressWarnings("unchecked") - Class<T> c = (Class<T>)b.loadClass(type); - clazz = c; - } - return Maybe.of(clazz); - } else { - bundleProblems.put(catalogBundle, ((Maybe.Absent<?>)bundle).getException()); - } - - } catch (Exception e) { - // should come from classloading now; name formatting or missing bundle errors will be caught above - Exceptions.propagateIfFatal(e); - bundleProblems.put(catalogBundle, e); - - Throwable cause = e.getCause(); - if (cause != null && cause.getMessage().contains("Unresolved constraint in bundle")) { - if (BrooklynVersion.INSTANCE.getVersionFromOsgiManifest()==null) { - extraMessages.add("No brooklyn-core OSGi manifest available. OSGi will not work."); - } - if (BrooklynVersion.isDevelopmentEnvironment()) { - extraMessages.add("Your development environment may not have created necessary files. Doing a maven build then retrying may fix the issue."); - } - if (!extraMessages.isEmpty()) log.warn(Strings.join(extraMessages, " ")); - log.warn("Unresolved constraint resolving OSGi bundle "+catalogBundle+" to load "+type+": "+cause.getMessage()); - if (log.isDebugEnabled()) log.debug("Trace for OSGi resolution failure", e); - } - } - } - if (bundleProblems.size()==1) { - Throwable error = Iterables.getOnlyElement(bundleProblems.values()); - if (error instanceof ClassNotFoundException && error.getCause()!=null && error.getCause().getMessage()!=null) { - error = Exceptions.collapseIncludingAllCausalMessages(error); - } - return Maybe.absent("Unable to resolve class "+type+" in "+Iterables.getOnlyElement(bundleProblems.keySet()) - + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), error); - } else { - return Maybe.absent(Exceptions.create("Unable to resolve class "+type+": "+bundleProblems - + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), bundleProblems.values())); - } - } - - public Maybe<Bundle> findBundle(CatalogBundle catalogBundle) { - //Either fail at install time when the user supplied name:version is different - //from the one reported from the bundle - //or - //Ignore user supplied name:version when URL is supplied to be able to find the - //bundle even if it's with a different version. - // - //For now we just log a warning if there's a version discrepancy at install time, - //so prefer URL if supplied. - BundleFinder bundleFinder = Osgis.bundleFinder(framework); - if (catalogBundle.getUrl() != null) { - bundleFinder.requiringFromUrl(catalogBundle.getUrl()); - } else { - bundleFinder.symbolicName(catalogBundle.getSymbolicName()).version(catalogBundle.getVersion()); - } - return bundleFinder.find(); - } - - /** - * Iterates through catalogBundles until one contains a resource with the given name. - */ - public URL getResource(String name, Iterable<CatalogBundle> catalogBundles) { - for (CatalogBundle catalogBundle: catalogBundles) { - try { - Maybe<Bundle> bundle = findBundle(catalogBundle); - if (bundle.isPresent()) { - URL result = bundle.get().getResource(name); - if (result!=null) return result; - } - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - } - } - return null; - } - - /** - * @return An iterable of all resources matching name in catalogBundles. - */ - public Iterable<URL> getResources(String name, Iterable<CatalogBundle> catalogBundles) { - List<URL> resources = Lists.newArrayList(); - for (CatalogBundle catalogBundle : catalogBundles) { - try { - Maybe<Bundle> bundle = findBundle(catalogBundle); - if (bundle.isPresent()) { - Enumeration<URL> result = bundle.get().getResources(name); - resources.addAll(Collections.list(result)); - } - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - } - } - return resources; - } - - public Framework getFramework() { - return framework; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/AbstractManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/AbstractManagementContext.java b/core/src/main/java/brooklyn/management/internal/AbstractManagementContext.java deleted file mode 100644 index 5e7f4a6..0000000 --- a/core/src/main/java/brooklyn/management/internal/AbstractManagementContext.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -import java.net.URI; -import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.basic.BrooklynObject; -import org.apache.brooklyn.api.catalog.BrooklynCatalog; -import org.apache.brooklyn.api.catalog.CatalogItem; -import org.apache.brooklyn.api.entity.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.drivers.EntityDriverManager; -import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager; -import org.apache.brooklyn.api.entity.rebind.RebindManager; -import org.apache.brooklyn.api.location.LocationRegistry; -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.ManagementContext; -import org.apache.brooklyn.api.management.SubscriptionContext; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.classloading.BrooklynClassLoadingContext; -import org.apache.brooklyn.api.management.entitlement.EntitlementManager; -import org.apache.brooklyn.api.management.ha.HighAvailabilityManager; - -import brooklyn.catalog.internal.BasicBrooklynCatalog; -import brooklyn.catalog.internal.CatalogInitialization; -import brooklyn.catalog.internal.CatalogUtils; -import brooklyn.config.BrooklynProperties; -import brooklyn.config.StringConfigMap; -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.drivers.BasicEntityDriverManager; -import brooklyn.entity.drivers.downloads.BasicDownloadsManager; -import brooklyn.entity.rebind.RebindManagerImpl; -import brooklyn.internal.storage.BrooklynStorage; -import brooklyn.internal.storage.DataGrid; -import brooklyn.internal.storage.DataGridFactory; -import brooklyn.internal.storage.impl.BrooklynStorageImpl; -import brooklyn.internal.storage.impl.inmemory.InMemoryDataGridFactory; - -import org.apache.brooklyn.location.basic.BasicLocationRegistry; - -import brooklyn.management.classloading.JavaBrooklynClassLoadingContext; -import brooklyn.management.entitlement.Entitlements; -import brooklyn.management.ha.HighAvailabilityManagerImpl; -import brooklyn.util.GroovyJavaMethods; -import brooklyn.util.ResourceUtils; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.guava.Maybe; -import brooklyn.util.task.BasicExecutionContext; -import brooklyn.util.task.Tasks; - -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -public abstract class AbstractManagementContext implements ManagementContextInternal { - private static final Logger log = LoggerFactory.getLogger(AbstractManagementContext.class); - - private static DataGridFactory loadDataGridFactory(BrooklynProperties properties) { - String clazzName = properties.getFirst(DataGridFactory.class.getName()); - if(clazzName == null){ - clazzName = InMemoryDataGridFactory.class.getName(); - } - - Class<?> clazz; - try{ - //todo: which classloader should we use? - clazz = LocalManagementContext.class.getClassLoader().loadClass(clazzName); - }catch(ClassNotFoundException e){ - throw new IllegalStateException(format("Could not load class [%s]",clazzName),e); - } - - Object instance; - try { - instance = clazz.newInstance(); - } catch (InstantiationException e) { - throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e); - } catch (IllegalAccessException e) { - throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e); - } - - if(!(instance instanceof DataGridFactory)){ - throw new IllegalStateException(format("Class [%s] not an instantiate of class [%s]",clazzName, DataGridFactory.class.getName())); - } - - return (DataGridFactory)instance; - } - - static { - ResourceUtils.addClassLoaderProvider(new Function<Object, BrooklynClassLoadingContext>() { - @Override - public BrooklynClassLoadingContext apply(@Nullable Object input) { - if (input instanceof EntityInternal) { - EntityInternal internal = (EntityInternal)input; - if (internal.getCatalogItemId() != null) { - CatalogItem<?, ?> item = CatalogUtils.getCatalogItemOptionalVersion(internal.getManagementContext(), internal.getCatalogItemId()); - if (item != null) { - return CatalogUtils.newClassLoadingContext(internal.getManagementContext(), item); - } else { - log.error("Can't find catalog item " + internal.getCatalogItemId() + - " used for instantiating entity " + internal + - ". Falling back to application classpath."); - } - } - return apply(internal.getManagementSupport()); - } - - if (input instanceof EntityManagementSupport) - return apply(((EntityManagementSupport)input).getManagementContext()); - if (input instanceof ManagementContext) - return JavaBrooklynClassLoadingContext.create((ManagementContext) input); - return null; - } - }); - } - - private final AtomicLong totalEffectorInvocationCount = new AtomicLong(); - - protected BrooklynProperties configMap; - protected BasicLocationRegistry locationRegistry; - protected final BasicBrooklynCatalog catalog; - protected ClassLoader baseClassLoader; - protected Iterable<URL> baseClassPathForScanning; - - private final RebindManager rebindManager; - private final HighAvailabilityManager highAvailabilityManager; - - protected volatile BrooklynGarbageCollector gc; - - private final EntityDriverManager entityDriverManager; - protected DownloadResolverManager downloadsManager; - - protected EntitlementManager entitlementManager; - - private final BrooklynStorage storage; - - private volatile boolean running = true; - protected boolean startupComplete = false; - protected final List<Throwable> errors = Collections.synchronizedList(MutableList.<Throwable>of()); - - protected Maybe<URI> uri = Maybe.absent(); - protected CatalogInitialization catalogInitialization; - - public AbstractManagementContext(BrooklynProperties brooklynProperties){ - this(brooklynProperties, null); - } - - public AbstractManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) { - this.configMap = brooklynProperties; - this.entityDriverManager = new BasicEntityDriverManager(); - this.downloadsManager = BasicDownloadsManager.newDefault(configMap); - if (datagridFactory == null) { - datagridFactory = loadDataGridFactory(brooklynProperties); - } - DataGrid datagrid = datagridFactory.newDataGrid(this); - - this.catalog = new BasicBrooklynCatalog(this); - - this.storage = new BrooklynStorageImpl(datagrid); - this.rebindManager = new RebindManagerImpl(this); // TODO leaking "this" reference; yuck - this.highAvailabilityManager = new HighAvailabilityManagerImpl(this); // TODO leaking "this" reference; yuck - - this.entitlementManager = Entitlements.newManager(this, brooklynProperties); - } - - @Override - public void terminate() { - highAvailabilityManager.stop(); - running = false; - rebindManager.stop(); - storage.terminate(); - // Don't unmanage everything; different entities get given their events at different times - // so can cause problems (e.g. a group finds out that a member is unmanaged, before the - // group itself has been told that it is unmanaged). - } - - @Override - public boolean isRunning() { - return running; - } - - @Override - public boolean isStartupComplete() { - return startupComplete; - } - - @Override - public BrooklynStorage getStorage() { - return storage; - } - - @Override - public RebindManager getRebindManager() { - return rebindManager; - } - - @Override - public HighAvailabilityManager getHighAvailabilityManager() { - return highAvailabilityManager; - } - - @Override - public long getTotalEffectorInvocations() { - return totalEffectorInvocationCount.get(); - } - - @Override - public ExecutionContext getExecutionContext(Entity e) { - // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity - if (e instanceof AbstractEntity) { - return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.tagForContextEntity(e)), getExecutionManager()); - } else { - return ((EntityInternal)e).getManagementSupport().getExecutionContext(); - } - } - - @Override - public ExecutionContext getServerExecutionContext() { - // BEC is a thin wrapper around EM so fine to create a new one here - return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.BROOKLYN_SERVER_TASK_TAG), getExecutionManager()); - } - - @Override - public SubscriptionContext getSubscriptionContext(Entity e) { - // BSC is a thin wrapper around SM so fine to create a new one here - return new BasicSubscriptionContext(getSubscriptionManager(), e); - } - - @Override - public EntityDriverManager getEntityDriverManager() { - return entityDriverManager; - } - - @Override - public DownloadResolverManager getEntityDownloadsManager() { - return downloadsManager; - } - - @Override - public EntitlementManager getEntitlementManager() { - return entitlementManager; - } - - protected abstract void manageIfNecessary(Entity entity, Object context); - - @Override - public <T> Task<T> invokeEffector(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) { - return runAtEntity(entity, eff, parameters); - } - - protected <T> T invokeEffectorMethodLocal(Entity entity, Effector<T> eff, Object args) { - assert isManagedLocally(entity) : "cannot invoke effector method at "+this+" because it is not managed here"; - totalEffectorInvocationCount.incrementAndGet(); - Object[] transformedArgs = EffectorUtils.prepareArgsForEffector(eff, args); - return GroovyJavaMethods.invokeMethodOnMetaClass(entity, eff.getName(), transformedArgs); - } - - /** - * Method for entity to make effector happen with correct semantics (right place, right task context), - * when a method is called on that entity. - * @throws ExecutionException - */ - @Override - public <T> T invokeEffectorMethodSync(final Entity entity, final Effector<T> eff, final Object args) throws ExecutionException { - try { - Task<?> current = Tasks.current(); - if (current == null || !entity.equals(BrooklynTaskTags.getContextEntity(current)) || !isManagedLocally(entity)) { - manageIfNecessary(entity, eff.getName()); - // Wrap in a task if we aren't already in a task that is tagged with this entity - Task<T> task = runAtEntity( EffectorUtils.getTaskFlagsForEffectorInvocation(entity, eff, - ConfigBag.newInstance().configureStringKey("args", args)), - entity, - new Callable<T>() { - public T call() { - return invokeEffectorMethodLocal(entity, eff, args); - }}); - return task.get(); - } else { - return invokeEffectorMethodLocal(entity, eff, args); - } - } catch (Exception e) { - // don't need to attach any message or warning because the Effector impl hierarchy does that (see calls to EffectorUtils.handleException) - throw new ExecutionException(e); - } - } - - /** - * Whether the master entity record is local, and sensors and effectors can be properly accessed locally. - */ - public abstract boolean isManagedLocally(Entity e); - - /** - * Causes the indicated runnable to be run at the right location for the given entity. - * - * Returns the actual task (if it is local) or a proxy task (if it is remote); - * if management for the entity has not yet started this may start it. - * - * @deprecated since 0.6.0 use effectors (or support {@code runAtEntity(Entity, Effector, Map)} if something else is needed); - * (Callable with Map flags is too open-ended, bothersome to support, and not used much) - */ - @Deprecated - public abstract <T> Task<T> runAtEntity(@SuppressWarnings("rawtypes") Map flags, Entity entity, Callable<T> c); - - /** Runs the given effector in the right place for the given entity. - * The task is immediately submitted in the background, but also recorded in the queueing context (if present) - * so it appears as a child, but marked inessential so it does not fail the parent task, who will ordinarily - * call {@link Task#get()} on the object and may do their own failure handling. - */ - protected abstract <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters); - - @Override - public StringConfigMap getConfig() { - return configMap; - } - - @Override - public BrooklynProperties getBrooklynProperties() { - return configMap; - } - - @Override - public synchronized LocationRegistry getLocationRegistry() { - if (locationRegistry==null) locationRegistry = new BasicLocationRegistry(this); - return locationRegistry; - } - - @Override - public BrooklynCatalog getCatalog() { - if (!getCatalogInitialization().hasRunAnyInitialization()) { - // catalog init is needed; normally this will be done from start sequence, - // but if accessed early -- and in tests -- we will load it here - getCatalogInitialization().injectManagementContext(this); - getCatalogInitialization().populateUnofficial(catalog); - } - return catalog; - } - - @Override - public ClassLoader getCatalogClassLoader() { - // catalog does not have to be initialized - return catalog.getRootClassLoader(); - } - - /** - * Optional class-loader that this management context should use as its base, - * as the first-resort in the catalog, and for scanning (if scanning the default in the catalog). - * In most instances the default classloader (ManagementContext.class.getClassLoader(), assuming - * this was in the JARs used at boot time) is fine, and in those cases this method normally returns null. - * (Surefire does some weird stuff, but the default classloader is fine for loading; - * however it requires a custom base classpath to be set for scanning.) - */ - @Override - public ClassLoader getBaseClassLoader() { - return baseClassLoader; - } - - /** See {@link #getBaseClassLoader()}. Only settable once and must be invoked before catalog is loaded. */ - public void setBaseClassLoader(ClassLoader cl) { - if (baseClassLoader==cl) return; - if (baseClassLoader!=null) throw new IllegalStateException("Cannot change base class loader (in "+this+")"); - if (catalog!=null) throw new IllegalStateException("Cannot set base class after catalog has been loaded (in "+this+")"); - this.baseClassLoader = cl; - } - - /** Optional mechanism for setting the classpath which should be scanned by the catalog, if the catalog - * is scanning the default classpath. Usually it infers the right thing, but some classloaders - * (e.g. surefire) do funny things which the underlying org.reflections.Reflections library can't see in to. - * <p> - * This should normally be invoked early in the server startup. Setting it after the catalog is loaded will not - * take effect without an explicit internal call to do so. Once set, it can be changed prior to catalog loading - * but it cannot be <i>changed</i> once the catalog is loaded. - * <p> - * ClasspathHelper.forJavaClassPath() is often a good argument to pass, and is used internally in some places - * when no items are found on the catalog. */ - @Override - public void setBaseClassPathForScanning(Iterable<URL> urls) { - if (Objects.equal(baseClassPathForScanning, urls)) return; - if (baseClassPathForScanning != null) { - if (catalog==null) - log.warn("Changing scan classpath to "+urls+" from "+baseClassPathForScanning); - else - throw new IllegalStateException("Cannot change base class path for scanning (in "+this+")"); - } - this.baseClassPathForScanning = urls; - } - /** - * @see #setBaseClassPathForScanning(Iterable) - */ - @Override - public Iterable<URL> getBaseClassPathForScanning() { - return baseClassPathForScanning; - } - - public BrooklynGarbageCollector getGarbageCollector() { - return gc; - } - - @Override - public void setManagementNodeUri(URI uri) { - this.uri = Maybe.of(checkNotNull(uri, "uri")); - } - - @Override - public Maybe<URI> getManagementNodeUri() { - return uri; - } - - private Object catalogInitMutex = new Object(); - @Override - public CatalogInitialization getCatalogInitialization() { - synchronized (catalogInitMutex) { - if (catalogInitialization!=null) return catalogInitialization; - CatalogInitialization ci = new CatalogInitialization(); - setCatalogInitialization(ci); - return ci; - } - } - - @Override - public void setCatalogInitialization(CatalogInitialization catalogInitialization) { - synchronized (catalogInitMutex) { - Preconditions.checkNotNull(catalogInitialization, "initialization must not be null"); - if (this.catalogInitialization!=null && this.catalogInitialization != catalogInitialization) - throw new IllegalStateException("Changing catalog init from "+this.catalogInitialization+" to "+catalogInitialization+"; changes not permitted"); - catalogInitialization.injectManagementContext(this); - this.catalogInitialization = catalogInitialization; - } - } - - public BrooklynObject lookup(String id) { - return lookup(id, BrooklynObject.class); - } - - @SuppressWarnings("unchecked") - public <T extends BrooklynObject> T lookup(String id, Class<T> type) { - Object result; - result = getEntityManager().getEntity(id); - if (result!=null && type.isInstance(result)) return (T)result; - - result = getLocationManager().getLocation(id); - if (result!=null && type.isInstance(result)) return (T)result; - - // TODO policies, enrichers, feeds - return null; - } - - @Override - public List<Throwable> errors() { - return errors; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/AbstractSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/AbstractSubscriptionManager.java b/core/src/main/java/brooklyn/management/internal/AbstractSubscriptionManager.java deleted file mode 100644 index 7e8effc..0000000 --- a/core/src/main/java/brooklyn/management/internal/AbstractSubscriptionManager.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collections; -import java.util.Map; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.api.management.SubscriptionHandle; -import org.apache.brooklyn.api.management.SubscriptionManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Predicate; - -public abstract class AbstractSubscriptionManager implements SubscriptionManager { - - // TODO Perhaps could use guava's SynchronizedSetMultimap? But need to check its synchronization guarantees. - // That would replace the utils used for subscriptionsBySubscriber etc. - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionManager.class); - - /** performs the actual subscription; should return the subscription parameter as the handle */ - protected abstract <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s); - /** performs the actual publishing -- ie distribution to subscriptions */ - public abstract <T> void publish(final SensorEvent<T> event); - - public static class EntitySensorToken { - Entity e; - Sensor<?> s; - String sName; - public EntitySensorToken(Entity e, Sensor<?> s) { - this.e = e; - this.s = s; - this.sName = (s == null) ? null : checkNotNull(s.getName(), "sensor must have non-null name: %s", s); - } - @Override - public int hashCode() { - return Objects.hashCode(e, sName); - } - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof EntitySensorToken)) return false; - if (!Objects.equal(e, ((EntitySensorToken)obj).e)) return false; - if (!Objects.equal(sName, ((EntitySensorToken)obj).sName)) return false; - return true; - } - @Override - public String toString() { - return (e != null ? e.getId() : "*")+":"+(s != null ? sName : "*"); - } - } - static Object makeEntitySensorToken(Entity e, Sensor<?> s) { - return new EntitySensorToken(e, s); - } - static Object makeEntitySensorToken(SensorEvent<?> se) { - return makeEntitySensorToken(se.getSource(), se.getSensor()); - } - - /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ - public final <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener); - } - - /** - * This implementation handles the following flags, in addition to those described in the {@link SubscriptionManager} - * interface: - * <ul> - * <li>subscriberExecutionManagerTag - a tag to pass to execution manager (without setting any execution semantics / TaskPreprocessor); - * if not supplied and there is a subscriber, this will be inferred from the subscriber and set up with SingleThreadedScheduler - * <li>eventFilter - a Predicate<SensorEvent> instance to filter what events are delivered - * </ul> - * - * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) - */ - public final <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribe(flags, new Subscription<T>(producer, sensor, listener)); - } - - /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ - public final <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ - public final <T> SubscriptionHandle subscribeToChildren(Map<String, Object> flags, final Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() { - public boolean apply(SensorEvent<T> input) { - return parent != null && input.getSource() != null && parent.equals(input.getSource().getParent()); - } - }; - flags.put("eventFilter", eventFilter); - return subscribe(flags, null, sensor, listener); - } - - /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ - public final <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ - public final <T> SubscriptionHandle subscribeToMembers(Map<String, Object> flags, final Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() { - public boolean apply(SensorEvent<T> input) { - return parent.getMembers().contains(input.getSource()); - } - }; - flags.put("eventFilter", eventFilter); - return subscribe(flags, null, sensor, listener); - } - - protected <T> Object getSubscriber(Map<String, Object> flags, Subscription<T> s) { - return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/AccessManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/AccessManager.java b/core/src/main/java/brooklyn/management/internal/AccessManager.java deleted file mode 100644 index baa805a..0000000 --- a/core/src/main/java/brooklyn/management/internal/AccessManager.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import org.apache.brooklyn.api.management.AccessController; - -import com.google.common.annotations.Beta; - -@Beta -public interface AccessManager { - - AccessController getAccessController(); - - boolean isLocationProvisioningAllowed(); - - boolean isLocationManagementAllowed(); - - boolean isEntityManagementAllowed(); - - void setLocationProvisioningAllowed(boolean allowed); - - void setLocationManagementAllowed(boolean allowed); - - void setEntityManagementAllowed(boolean allowed); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/AsyncCollectionChangeAdapter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/AsyncCollectionChangeAdapter.java b/core/src/main/java/brooklyn/management/internal/AsyncCollectionChangeAdapter.java deleted file mode 100644 index e3e5193..0000000 --- a/core/src/main/java/brooklyn/management/internal/AsyncCollectionChangeAdapter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.brooklyn.api.management.ExecutionManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.task.BasicExecutionManager; -import brooklyn.util.task.SingleThreadedScheduler; - -public class AsyncCollectionChangeAdapter<Item> implements CollectionChangeListener<Item> { - - private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectionChangeAdapter.class); - - private final ExecutionManager executor; - private final CollectionChangeListener<Item> delegate; - - public AsyncCollectionChangeAdapter(ExecutionManager executor, CollectionChangeListener<Item> delegate) { - this.executor = checkNotNull(executor, "executor"); - this.delegate = checkNotNull(delegate, "delegate"); - ((BasicExecutionManager) executor).setTaskSchedulerForTag(delegate, SingleThreadedScheduler.class); - } - - @Override - public void onItemAdded(final Item item) { - executor.submit(MutableMap.of("tag", delegate), new Runnable() { - public void run() { - try { - delegate.onItemAdded(item); - } catch (Throwable t) { - LOG.warn("Error notifying listener of itemAdded("+item+")", t); - Exceptions.propagate(t); - } - } - }); - } - - @Override - public void onItemRemoved(final Item item) { - executor.submit(MutableMap.of("tag", delegate), new Runnable() { - public void run() { - try { - delegate.onItemRemoved(item); - } catch (Throwable t) { - LOG.warn("Error notifying listener of itemAdded("+item+")", t); - Exceptions.propagate(t); - } - } - }); - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - @Override - public boolean equals(Object other) { - return (other instanceof AsyncCollectionChangeAdapter) && - delegate.equals(((AsyncCollectionChangeAdapter<?>) other).delegate); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/BasicSubscriptionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/BasicSubscriptionContext.java b/core/src/main/java/brooklyn/management/internal/BasicSubscriptionContext.java deleted file mode 100644 index cb60545..0000000 --- a/core/src/main/java/brooklyn/management/internal/BasicSubscriptionContext.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static brooklyn.util.JavaGroovyEquivalents.mapOf; -import groovy.lang.Closure; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.event.Sensor; -import org.apache.brooklyn.api.event.SensorEvent; -import org.apache.brooklyn.api.event.SensorEventListener; -import org.apache.brooklyn.api.management.SubscriptionContext; -import org.apache.brooklyn.api.management.SubscriptionHandle; -import org.apache.brooklyn.api.management.SubscriptionManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; - -/** - * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}. - */ -public class BasicSubscriptionContext implements SubscriptionContext { - - private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class); - - private final SubscriptionManager manager; - private final Object subscriber; - private final Map<String,Object> flags; - - public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) { - this(Collections.<String,Object>emptyMap(), manager, subscriber); - } - - public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) { - this.manager = manager; - this.subscriber = subscriber; - this.flags = mapOf("subscriber", subscriber); - if (flags!=null) this.flags.putAll(flags); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, Closure c) { - return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) { - return subscribe(newFlags, producer, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribe(subscriptionFlags, producer, sensor, listener); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, Closure c) { - return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, Closure c) { - return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, Closure c) { - return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, Closure c) { - return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener); - } - - @SuppressWarnings("rawtypes") - @Override - public boolean unsubscribe(SubscriptionHandle subscriptionId) { - Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null"); - Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber); - return manager.unsubscribe(subscriptionId); - } - - /** @see SubscriptionManager#publish(SensorEvent) */ - @Override - public <T> void publish(SensorEvent<T> event) { - manager.publish(event); - } - - /** Return the subscriptions associated with this context */ - @Override - public Set<SubscriptionHandle> getSubscriptions() { - return manager.getSubscriptionsForSubscriber(subscriber); - } - - @Override - public int unsubscribeAll() { - int count = 0; - - // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it - Set<SubscriptionHandle> subscriptions = getSubscriptions(); - Collection<SubscriptionHandle> subscriptionsCopy; - synchronized (subscriptions) { - subscriptionsCopy = ImmutableList.copyOf(subscriptions); - } - - for (SubscriptionHandle s : subscriptionsCopy) { - count++; - boolean result = unsubscribe(s); - if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s); - } - return count; - } - - @SuppressWarnings("rawtypes") - private <T> SensorEventListener<T> toSensorEventListener(final Closure c) { - return new SensorEventListener<T>() { - @Override public void onEvent(SensorEvent<T> event) { - c.call(event); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java deleted file mode 100644 index 0c54909..0000000 --- a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.BrooklynProperties; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; -import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.Entities; -import brooklyn.internal.storage.BrooklynStorage; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.javalang.MemoryUsageTracker; -import brooklyn.util.task.BasicExecutionManager; -import brooklyn.util.task.ExecutionListener; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; - -import com.google.common.base.Objects; -import com.google.common.annotations.Beta; -import com.google.common.collect.Iterables; - -/** - * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory. - * - * The deletion policy is configurable: - * <ul> - * <li>Period - how frequently to look at the existing tasks to delete some, if required - * <li>Max task age - the time after which a completed task will be automatically deleted - * (i.e. any root task completed more than maxTaskAge ago will be deleted) - * <li>Max tasks per <various categories> - the maximum number of tasks to be kept for a given tag, - * split into categories based on what is seeming to be useful - * </ul> - * - * The default is to check with a period of one minute, deleting tasks after 30 days, - * and keeping at most 100000 tasks in the system, - * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag - * within that entity (or global if not attached to an entity). - * - * @author aled - */ -public class BrooklynGarbageCollector { - - private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class); - - public static final ConfigKey<Duration> GC_PERIOD = ConfigKeys.newDurationConfigKey( - "brooklyn.gc.period", "the period for checking if any tasks need to be deleted", - Duration.minutes(1)); - - public static final ConfigKey<Boolean> DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey( - "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false); - - /** - * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task? - * default to yes, despite it can be some extra loops, to make sure we GC them promptly. - * @since 0.7.0 */ - // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked - // transient so it is destroyed prompty; there may be others, however; - // but OTOH it might be expensive to check for these all the time! - // TODO probably we can set this false (remove this and related code), - // and just rely on usual GC to pick up background tasks; the lifecycle of background task - // should normally be independent of the submitter. (DST was the exception, and marking - // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not, - // and we don't want the submitted to show up at the root in the GUI, which it will if its - // submitter has been GC'd) - @Beta - public static final ConfigKey<Boolean> CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey( - "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true); - - public static final ConfigKey<Integer> MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksPerTag", - "the maximum number of tasks to be kept for a given tag " - + "within an execution context (e.g. entity); " - + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full", - 50); - - public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksPerEntity", - "the maximum number of tasks to be kept for a given entity", - 1000); - - public static final ConfigKey<Integer> MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksGlobal", - "the maximum number of tasks to be kept across the entire system", - 100000); - - public static final ConfigKey<Duration> MAX_TASK_AGE = ConfigKeys.newDurationConfigKey( - "brooklyn.gc.maxTaskAge", - "the duration after which a completed task will be automatically deleted", - Duration.days(30)); - - protected final static Comparator<Task<?>> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator<Task<?>>() { - @Override public int compare(Task<?> t1, Task<?> t2) { - long end1 = t1.getEndTimeUtc(); - long end2 = t2.getEndTimeUtc(); - return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1); - } - }; - - private final BasicExecutionManager executionManager; - private final BrooklynStorage storage; - private final BrooklynProperties brooklynProperties; - private final ScheduledExecutorService executor; - private ScheduledFuture<?> activeCollector; - private Map<Entity,Task<?>> unmanagedEntitiesNeedingGc = new LinkedHashMap<Entity, Task<?>>(); - - private Duration gcPeriod; - private final boolean doSystemGc; - private volatile boolean running = true; - - public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) { - this.executionManager = executionManager; - this.storage = storage; - this.brooklynProperties = brooklynProperties; - - doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC); - - executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override public Thread newThread(Runnable r) { - return new Thread(r, "brooklyn-gc"); - }}); - - executionManager.addListener(new ExecutionListener() { - @Override public void onTaskDone(Task<?> task) { - BrooklynGarbageCollector.this.onTaskDone(task); - }}); - - scheduleCollector(true); - } - - protected synchronized void scheduleCollector(boolean canInterruptCurrent) { - if (activeCollector != null) activeCollector.cancel(canInterruptCurrent); - - gcPeriod = brooklynProperties.getConfig(GC_PERIOD); - if (gcPeriod!=null) { - activeCollector = executor.scheduleWithFixedDelay( - new Runnable() { - @Override public void run() { - gcIteration(); - } - }, - gcPeriod.toMillisecondsRoundingUp(), - gcPeriod.toMillisecondsRoundingUp(), - TimeUnit.MILLISECONDS); - } - } - - /** force a round of Brooklyn garbage collection */ - public void gcIteration() { - try { - logUsage("brooklyn gc (before)"); - gcTasks(); - logUsage("brooklyn gc (after)"); - - if (doSystemGc) { - // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing - // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant - // amount of memory having been released, as though a full-gc had been run. But this is highly - // dependent on the JVM implementation. - System.gc(); System.gc(); - logUsage("brooklyn gc (after system gc)"); - } - } catch (Throwable t) { - Exceptions.propagateIfFatal(t); - LOG.warn("Error during management-context GC: "+t, t); - // previously we bailed on all errors, but I don't think we should do that -Alex - } - } - - public void logUsage(String prefix) { - if (LOG.isDebugEnabled()) - LOG.debug(prefix+" - using "+getUsageString()); - } - - public static String makeBasicUsageString() { - return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+ - Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" + - " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+ - Thread.activeCount()+" threads"; - } - - public String getUsageString() { - return makeBasicUsageString()+"; "+ - "storage: " + storage.getStorageMetrics() + "; " + - "tasks: " + - executionManager.getNumActiveTasks()+" active, "+ - executionManager.getNumIncompleteTasks()+" unfinished; "+ - executionManager.getNumInMemoryTasks()+" remembered, "+ - executionManager.getTotalTasksSubmitted()+" total submitted)"; - } - - public void shutdownNow() { - running = false; - if (activeCollector != null) activeCollector.cancel(true); - if (executor != null) executor.shutdownNow(); - } - - public void onUnmanaged(Entity entity) { - // defer task deletions until the entity is completely unmanaged - // (this is usually invoked during the stop sequence) - synchronized (unmanagedEntitiesNeedingGc) { - unmanagedEntitiesNeedingGc.put(entity, Tasks.current()); - } - } - - public void deleteTasksForEntity(Entity entity) { - // remove all references to this entity from tasks - executionManager.deleteTag(entity); - executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity)); - executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity)); - executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity)); - } - - public void onUnmanaged(Location loc) { - // No-op currently; no tasks are tracked through their location - } - - public void onTaskDone(Task<?> task) { - if (shouldDeleteTaskImmediately(task)) { - executionManager.deleteTask(task); - } - } - - /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */ - @Deprecated - public boolean shouldDeleteTask(Task<?> task) { - return shouldDeleteTaskImmediately(task); - } - /** whether this task should be deleted on completion, - * because it is transient, or because it is submitted background without much context information */ - protected boolean shouldDeleteTaskImmediately(Task<?> task) { - if (!task.isDone()) return false; - - Set<Object> tags = task.getTags(); - if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG)) - return true; - if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) - return false; - - if (task.getSubmittedByTask()!=null) { - Task<?> parent = task.getSubmittedByTask(); - if (executionManager.getTask(parent.getId())==null) { - // parent is already cleaned up - return true; - } - if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) { - // it is a child, let the parent manage this task's death - return false; - } - Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); - if (associatedEntity!=null) { - // this is associated to an entity; destroy only if the entity is unmanaged - return !Entities.isManaged(associatedEntity); - } - // if not associated to an entity, then delete immediately - return true; - } - - // e.g. scheduled tasks, sensor events, etc - // TODO (in future may keep some of these with another limit, based on a new TagCategory) - // there may also be a server association for server-side tasks which should be kept - // (but be careful not to keep too many subscriptions!) - - return true; - } - - /** - * Deletes old tasks. The age/number of tasks to keep is controlled by fields like - * {@link #maxTasksPerTag} and {@link #maxTaskAge}. - */ - protected synchronized int gcTasks() { - // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks. - // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help. - // - // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That - // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than - // 32 bytes overhead per object for HashSet). - // - // More notes on optimization is in the history of this file. - - if (!running) return 0; - - Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD); - if (!Objects.equal(gcPeriod, newPeriod)) { - // caller has changed period, reschedule on next run - scheduleCollector(false); - } - - expireUnmanagedEntityTasks(); - expireAgedTasks(); - expireTransientTasks(); - - // now look at overcapacity tags, non-entity tags first - - Set<Object> taskTags = executionManager.getTaskTags(); - - int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY); - int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); - - Map<Object,AtomicInteger> taskNonEntityTagsOverCapacity = MutableMap.of(); - Map<Object,AtomicInteger> taskEntityTagsOverCapacity = MutableMap.of(); - - Map<Object,AtomicInteger> taskAllTagsOverCapacity = MutableMap.of(); - - for (Object tag : taskTags) { - if (isTagIgnoredForGc(tag)) continue; - - Set<Task<?>> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag); - if (tasksWithTag==null) continue; - AtomicInteger overA = null; - if (tag instanceof WrappedEntity) { - int over = tasksWithTag.size() - maxTasksPerEntity; - if (over>0) { - overA = new AtomicInteger(over); - taskEntityTagsOverCapacity.put(tag, overA); - } - } else { - int over = tasksWithTag.size() - maxTasksPerTag; - if (over>0) { - overA = new AtomicInteger(over); - taskNonEntityTagsOverCapacity.put(tag, overA); - } - } - if (overA!=null) { - taskAllTagsOverCapacity.put(tag, overA); - } - } - - int deletedCount = 0; - deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); - deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); - deletedCount += expireSubTasksWhoseSubmitterIsExpired(); - - int deletedGlobally = expireIfOverCapacityGlobally(); - deletedCount += deletedGlobally; - if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired(); - - return deletedCount; - } - - protected static boolean isTagIgnoredForGc(Object tag) { - if (tag == null) return true; - if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true; - if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true; - if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true; - if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; - if (tag instanceof WrappedStream) { - return true; - } - - return false; - } - - protected void expireUnmanagedEntityTasks() { - Iterator<Entry<Entity, Task<?>>> ei; - synchronized (unmanagedEntitiesNeedingGc) { - ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator(); - } - while (ei.hasNext()) { - Entry<Entity, Task<?>> ee = ei.next(); - if (Entities.isManaged(ee.getKey())) continue; - if (ee.getValue()!=null && !ee.getValue().isDone()) continue; - deleteTasksForEntity(ee.getKey()); - synchronized (unmanagedEntitiesNeedingGc) { - unmanagedEntitiesNeedingGc.remove(ee.getKey()); - } - } - } - - protected void expireAgedTasks() { - Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE); - - Collection<Task<?>> allTasks = executionManager.allTasksLive(); - Collection<Task<?>> tasksToDelete = MutableList.of(); - - try { - for (Task<?> task: allTasks) { - if (!task.isDone()) continue; - if (BrooklynTaskTags.isSubTask(task)) continue; - - if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc()))) - tasksToDelete.add(task); - } - - } catch (ConcurrentModificationException e) { - // delete what we've found so far - LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); - } - - for (Task<?> task: tasksToDelete) { - executionManager.deleteTask(task); - } - } - - protected void expireTransientTasks() { - Set<Task<?>> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG); - for (Task<?> t: transientTasks) { - if (!t.isDone()) continue; - executionManager.deleteTask(t); - } - } - - protected int expireSubTasksWhoseSubmitterIsExpired() { - // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS - if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS)) - return 0; - - Collection<Task<?>> allTasks = executionManager.allTasksLive(); - Collection<Task<?>> tasksToDelete = MutableList.of(); - try { - for (Task<?> task: allTasks) { - if (!task.isDone()) continue; - Task<?> submitter = task.getSubmittedByTask(); - // if we've leaked, ie a subtask which is not a child task, - // and the submitter is GC'd, then delete this also - if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) { - tasksToDelete.add(task); - } - } - - } catch (ConcurrentModificationException e) { - // delete what we've found so far - LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); - } - - for (Task<?> task: tasksToDelete) { - executionManager.deleteTask(task); - } - return tasksToDelete.size(); - } - - protected enum TagCategory { - ENTITY, NON_ENTITY_NORMAL; - - public boolean acceptsTag(Object tag) { - if (isTagIgnoredForGc(tag)) return false; - if (tag instanceof WrappedEntity) return this==ENTITY; - if (this==ENTITY) return false; - return true; - } - } - - - /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */ - protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) { - if (emptyFilterNeeded) { - // previous run may have decremented counts - MutableList<Object> nowOkayTags = MutableList.of(); - for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) { - if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey()); - } - for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag); - } - - if (taskTagsInCategoryOverCapacity.isEmpty()) - return 0; - - Collection<Task<?>> tasks = executionManager.allTasksLive(); - List<Task<?>> tasksToConsiderDeleting = MutableList.of(); - try { - for (Task<?> task: tasks) { - if (!task.isDone()) continue; - - Set<Object> tags = task.getTags(); - - int categoryTags = 0, tooFullCategoryTags = 0; - for (Object tag: tags) { - if (category.acceptsTag(tag)) { - categoryTags++; - if (taskTagsInCategoryOverCapacity.containsKey(tag)) - tooFullCategoryTags++; - } - } - if (tooFullCategoryTags>0) { - if (categoryTags==tooFullCategoryTags) { - // all buckets are full, delete this one - tasksToConsiderDeleting.add(task); - } else { - // if any bucket is under capacity, then give grace to the other buckets in this category - for (Object tag: tags) { - if (category.acceptsTag(tag)) { - AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag); - if (over!=null) { - if (over.decrementAndGet()<=0) { - // and remove it from over-capacity if so - taskTagsInCategoryOverCapacity.remove(tag); - if (taskTagsInCategoryOverCapacity.isEmpty()) - return 0; - } - } - } - } - } - } - } - - } catch (ConcurrentModificationException e) { - // do CME's happen with these data structures? - // if so, let's just delete what we've found so far - LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e); - } - - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" " - + "tags over capacity, expiring old tasks; " - + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: " - + taskTagsInCategoryOverCapacity); - - Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR); - // now try deleting tasks which are overcapacity for each (non-entity) tag - int deleted = 0; - for (Task<?> task: tasksToConsiderDeleting) { - boolean delete = true; - for (Object tag: task.getTags()) { - if (!category.acceptsTag(tag)) - continue; - if (taskTagsInCategoryOverCapacity.get(tag)==null) { - // no longer over capacity in this tag - delete = false; - break; - } - } - if (delete) { - // delete this and update overcapacity info - deleted++; - executionManager.deleteTask(task); - for (Object tag: task.getTags()) { - AtomicInteger counter = taskAllTagsOverCapacity.get(tag); - if (counter!=null && counter.decrementAndGet()<=0) - taskTagsInCategoryOverCapacity.remove(tag); - } - if (LOG.isTraceEnabled()) - LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity); - if (taskTagsInCategoryOverCapacity.isEmpty()) - break; - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; " - + "capacities now: " + taskTagsInCategoryOverCapacity); - return deleted; - } - - protected int expireIfOverCapacityGlobally() { - Collection<Task<?>> tasksLive = executionManager.allTasksLive(); - if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL)) - return 0; - LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some"); - - try { - tasksLive = MutableList.copyOf(tasksLive); - } catch (ConcurrentModificationException e) { - tasksLive = executionManager.getTasksWithAllTags(MutableList.of()); - } - - MutableList<Task<?>> tasks = MutableList.of(); - for (Task<?> task: tasksLive) { - if (task.isDone()) { - tasks.add(task); - } - } - - int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL); - if (numToDelete <= 0) { - LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any"); - return 0; - } - - Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR); - - int numDeleted = 0; - while (numDeleted < numToDelete && tasks.size()>numDeleted) { - executionManager.deleteTask( tasks.get(numDeleted++) ); - } - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size()); - return numDeleted; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/BrooklynObjectManagementMode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/BrooklynObjectManagementMode.java b/core/src/main/java/brooklyn/management/internal/BrooklynObjectManagementMode.java deleted file mode 100644 index dd9d8bc..0000000 --- a/core/src/main/java/brooklyn/management/internal/BrooklynObjectManagementMode.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */ -public enum BrooklynObjectManagementMode { - /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */ - NONEXISTENT, - /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */ - UNMANAGED_PERSISTED, - /** item is loaded but read-only (ie not actively managed here) */ - LOADED_READ_ONLY, - /** item is actively managed here */ - MANAGED_PRIMARY -} \ No newline at end of file
