This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 87562f9b934fab79f50870609b4fdda71b0e57c9 Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com> AuthorDate: Tue Nov 24 10:35:33 2020 +0000 add settings to require catalog rebind/init to wait until certain services are available with optional timeout and optional post-init start level change, which can be used e.g. to trigger fileinstall hot deployment --- .../apache/brooklyn/camp/BasicCampPlatform.java | 2 +- .../brooklyn/launcher/osgi/OsgiLauncherImpl.java | 199 ++++++++++++++++++++- .../apache/brooklyn/util/osgi/OsgiActivator.java | 11 ++ 3 files changed, 204 insertions(+), 8 deletions(-) diff --git a/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java b/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java index e0cbf6d..e0a9e29 100644 --- a/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java +++ b/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java @@ -137,7 +137,7 @@ public class BasicCampPlatform extends CampPlatform { if (!committed.get()) { // normal, in the case of errors (which might occur when catalog tries to figure out the right plan format); shouldn't happen otherwise // if we want log.warn visibility of these, then we will have to supply an abandon() method on this interface and ensure that is invoked on errors - log.debug("transaction "+this+" was never applied"); + log.debug("Transaction "+this+" was never applied (normal when attempting to auto-detect plan formats)"); } super.finalize(); } diff --git a/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java b/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java index 7ff388d..6e23e7a 100644 --- a/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java +++ b/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java @@ -16,6 +16,11 @@ package org.apache.brooklyn.launcher.osgi; import com.google.common.base.Stopwatch; +import com.sun.xml.bind.v2.TODO; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; import org.apache.brooklyn.core.BrooklynVersionService; @@ -23,18 +28,26 @@ import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; import org.apache.brooklyn.core.internal.BrooklynProperties; import org.apache.brooklyn.core.mgmt.internal.BrooklynShutdownHooks; import org.apache.brooklyn.core.mgmt.persist.PersistMode; +import org.apache.brooklyn.core.typereg.BrooklynCatalogBundleResolver; import org.apache.brooklyn.launcher.common.BasicLauncher; import org.apache.brooklyn.launcher.common.BrooklynPropertiesFactoryHelper; import org.apache.brooklyn.rest.BrooklynWebConfig; import org.apache.brooklyn.rest.security.provider.BrooklynUserWithRandomPasswordSecurityProvider; +import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.javalang.Threads; +import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes; import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; -import org.osgi.framework.Constants; -import org.osgi.framework.InvalidSyntaxException; +import org.apache.brooklyn.util.time.Time; +import org.osgi.framework.*; +import org.osgi.framework.launch.Framework; +import org.osgi.framework.startlevel.FrameworkStartLevel; import org.osgi.service.cm.Configuration; import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +63,21 @@ public class OsgiLauncherImpl extends BasicLauncher<OsgiLauncherImpl> implements private static final Logger LOG = LoggerFactory.getLogger(OsgiLauncherImpl.class); public static final String BROOKLYN_CONFIG_PID = "brooklyn"; + /** Takes a JSON representation or comma separated list of OSGi filter expressions (cf {@link BundleContext#getServiceReferences(String, String)}, + * where each filter must be satisfied before the catalog will be initialized. + * <p> + * For example: + * <code>(&(osgi.service.blueprint.compname=customBundleResolver))</code> to wait for a service registered via a blueprint with component name 'customBundleResolver', or + * or + * <code>["(&(osgi.service.blueprint.compname=customBundleResolver))","(&(osgi.service.blueprint.compname=customTypePlanTransformer))"] + * to wait for two services registered, 'customBundleResolver' and 'customTypePlanTransformer'</code>. + * <p> + * This can be required where some catalog or rebind items depend on services installed during startup. + * */ + public static final String BROOKLYN_OSGI_DEPENDENCIES_SERVICES_FILTERS = "brooklyn.osgi.dependencies.services.filters"; + public static final String BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT = "brooklyn.osgi.dependencies.services.timeout"; + public static final String BROOKLYN_OSGI_STARTLEVEL_POSTINIT = "brooklyn.osgi.startlevel.postinit"; + private Object reloadLock = new Object(); private BrooklynVersionService brooklynVersion; @@ -67,7 +95,7 @@ public class OsgiLauncherImpl extends BasicLauncher<OsgiLauncherImpl> implements // make sure brooklyn-core bundle is started brooklynVersion.getVersion(); - Configuration brooklynConfig = getConfiguration(configAdmin, BROOKLYN_CONFIG_PID); + Configuration brooklynConfig = getConfiguration(BROOKLYN_CONFIG_PID); // Note that this doesn't check whether the files exist, just that there are potential alternative sources for configuration. if (brooklynConfig == null && Strings.isEmpty(globalBrooklynProperties) && Strings.isEmpty(localBrooklynProperties)) { LOG.warn("Config Admin PID '" + BROOKLYN_CONFIG_PID + "' not found, not using external configuration. Create a brooklyn.cfg file in etc folder."); @@ -79,8 +107,8 @@ public class OsgiLauncherImpl extends BasicLauncher<OsgiLauncherImpl> implements return super.startPartOne(); } - private Configuration getConfiguration(ConfigurationAdmin configAdmin, String brooklynConfigPid) { - String filter = '(' + Constants.SERVICE_PID + '=' + brooklynConfigPid + ')'; + private Configuration getConfiguration(String configPid) { + String filter = '(' + Constants.SERVICE_PID + '=' + configPid + ')'; Configuration[] configs; try { configs = configAdmin.listConfigurations(filter); @@ -108,14 +136,165 @@ public class OsgiLauncherImpl extends BasicLauncher<OsgiLauncherImpl> implements } } + Thread fallbackThread = null; @Override public void startOsgi() { + final Bundle bundle = FrameworkUtil.getBundle(this.getClass()); + + Framework f = (Framework) bundle.getBundleContext().getBundle(0); + int startLevel = f.adapt(FrameworkStartLevel.class).getStartLevel(); + + if (areServiceDependenciesReady(bundle, bundle + " bundle activation")) { + LOG.debug("Starting OSGi catalog/rebind (no service dependencies or all already satisfied, on bundle activation)"); + doStartOsgiAfterBundlesRefreshed(); + + } else { + ServiceListener sl[] = { null }; + + + sl[0] = new ServiceListener() { + @Override + public void serviceChanged(ServiceEvent event) { + if (areServiceDependenciesReady(bundle, "ServiceEvent[" + event.getServiceReference() + " / " + event.getType() + " - " + event.getSource() + "]")) { + LOG.debug("Starting OSGi catalog/rebind - all service dependencies satisfied"); + bundle.getBundleContext().removeServiceListener(sl[0]); + if (fallbackThread!=null) { + fallbackThread.interrupt(); + } + new Thread(() -> { + // do this in a new thread as it takes a while, shouldn't run in service listener thread + doStartOsgiAfterBundlesRefreshed(); + }).start(); + } + } + }; + + Duration timeout; + try { + timeout = Duration.parse((String) getBrooklynProperties().getConfig(BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT)); + } catch (Exception e) { + throw Exceptions.propagateAnnotated("Invalid duration specified for '"+BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT+"'", e); + } + fallbackThread = new Thread(() -> { + if (timeout==null) { + LOG.debug("No timeout specified for '"+BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT+"'; will wait indefinitely for service dependencies"); + return; + } + + CountdownTimer timer = timeout==null ? null : CountdownTimer.newInstanceStarted(timeout); + try { + if (timeout!=null) { + LOG.debug("Service dependencies timeout detected as " + timeout + "; will start catalog/rebind after that delay if service dependencies not fulfilled sooner"); + } + int iteration = 0; + do { + if (iteration>0) { + LOG.debug("Still waiting on service dependencies, " + iteration+"m so far" + + (timer!=null ? ", "+timer.getDurationRemaining()+" remaining until timeout" : " (will wait indefinitely)")); + } + Duration wait = Duration.ONE_MINUTE; + if (timer != null && timer.getDurationRemaining().isShorterThan(wait)) { + wait = timer.getDurationRemaining(); + } + Time.sleep(wait); + iteration++; + } while (timer==null || timer.isNotExpired()); + } catch (RuntimeInterruptedException e) { + // normal - thread aborted probably because service dependencies fulfilled + LOG.debug("OsgiLauncher fallback thread interrupted, probably because service dependencies fulfilled in another thread where OSGi catalog/rebind start should be occurring (or due to shutdown)"); + Thread.interrupted(); + return; + } finally { + fallbackThread = null; + } + + LOG.warn("Starting OSGi catalog/rebind due to timeout waiting for service dependencies"); + bundle.getBundleContext().removeServiceListener(sl[0]); + new Thread(() -> { + // do this in a new thread so it isn't interrupted if fallback[0] is interrupted + if (!doStartOsgiAfterBundlesRefreshed()) { + LOG.debug("Did not start OSGi after timeout; already started"); + } + }).start(); + }); + + bundle.getBundleContext().addServiceListener(sl[0]); + fallbackThread.start(); + } + } + + private boolean areServiceDependenciesReady(Bundle bundle, String context) { + Object deps = getBrooklynProperties().getConfig(BROOKLYN_OSGI_DEPENDENCIES_SERVICES_FILTERS); + if (deps!=null) { + List<String> items = JavaStringEscapes.unwrapJsonishListStringIfPossible(deps.toString()); + LOG.debug("OSGi catalog/rebind service dependency check, on " + context + ": " + items); + + for (String item: items) { + ServiceReference r1[]; + try { + r1 = bundle.getBundleContext().getServiceReferences((String)null, item); + } catch (Exception e) { + throw Exceptions.propagateAnnotated("Error getting service references satisfying '"+item+"'", e); + } + + if (r1 == null || r1.length == 0) { + LOG.debug("OSGi catalog/rebind blocked, service dependency not yet fulfilled (will keep listening for services): '" + item + "'"); + return false; + } + } + + return true; + + + } else { + LOG.debug("No service dependencies specified, on " + context); + return true; + + } + } + + AtomicBoolean startedOsgiAfterBundlesRefreshed = new AtomicBoolean(); + private boolean doStartOsgiAfterBundlesRefreshed() { + if (startedOsgiAfterBundlesRefreshed.getAndSet(true)) { + LOG.debug("OSGi catalog/rebind already started when invoked a second time", new Throwable("Trace for unexpected redundant OSGi catalog/rebind start")); + return false; + } + + doStartOsgi(); + + Object newStartLevelS = null; + try { + newStartLevelS = getBrooklynProperties().getConfig(BROOKLYN_OSGI_STARTLEVEL_POSTINIT); + if (newStartLevelS==null || Strings.isBlank(""+newStartLevelS)) { + LOG.debug("No change required to OSGi start-level after OSGi catalog/rebind ("+BROOKLYN_OSGI_STARTLEVEL_POSTINIT+" unset)"); + + } else { + FrameworkStartLevel fsl = FrameworkUtil.getBundle(this.getClass()).getBundleContext().getBundle(0).adapt(FrameworkStartLevel.class); + + int newStartLevel = TypeCoercions.coerce(newStartLevelS, Integer.class); + if (fsl.getStartLevel()<newStartLevel) { + LOG.debug("Changing OSGi start-level to "+newStartLevelS+" (from "+fsl+") after OSGi catalog/rebind"); + fsl.setStartLevel(newStartLevel); + + } else { + LOG.debug("No change required to OSGi start-level after OSGi catalog/rebind (currently "+fsl.getStartLevel()+", "+BROOKLYN_OSGI_STARTLEVEL_POSTINIT+"="+newStartLevelS+" required)"); + + } + } + } catch (Exception e) { + LOG.error("Error handling post-init start level: "+e, e); + } + + return true; + } + + private void doStartOsgi() { synchronized (reloadLock) { final Stopwatch startupTimer = Stopwatch.createStarted(); - LOG.debug("OsgiLauncher start"); + LOG.debug("OsgiLauncher catalog/rebind running initialization (part two)"); startPartTwo(); startupTimer.stop(); - LOG.info("Brooklyn initialisation (part two) complete after {}", startupTimer.toString()); + LOG.info("Brooklyn initialization (part two) complete after {}", startupTimer.toString()); } } @@ -123,6 +302,12 @@ public class OsgiLauncherImpl extends BasicLauncher<OsgiLauncherImpl> implements public void destroyOsgi() { LOG.debug("Notified of system shutdown, calling shutdown hooks"); Threads.runShutdownHooks(); + + Thread t = fallbackThread; + if (t!=null) { + LOG.debug("Notified of system shutdown, cancelling service dependencies fallback thread"); + t.interrupt(); + } } @Override diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java b/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java index 9881bb0..7649b3f 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java @@ -17,18 +17,29 @@ package org.apache.brooklyn.util.osgi; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; +import org.osgi.framework.launch.Framework; +import org.osgi.framework.startlevel.FrameworkStartLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * handle bundle activation/deactivation */ public class OsgiActivator implements BundleActivator { + private static Logger LOG = LoggerFactory.getLogger(OsgiActivator.class); + @Override public void start(BundleContext context) throws Exception { + Framework f = (Framework) context.getBundle(0); + // https://issues.apache.org/jira/browse/KARAF-6920: framework start level should report 80 but instead says 100 on clean start + // when this is resolved we will no longer need the `startlevel.postinit` property in OsgiLauncherImpl + LOG.debug("Starting "+context.getBundle()+", at OSGi start-level "+f.adapt(FrameworkStartLevel.class).getStartLevel()+", bundle state "+context.getBundle().getState()); } @Override public void stop(BundleContext context) throws Exception { + LOG.debug("Stopping "+context.getBundle()); OsgiUtil.shutdown(); }