This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 87562f9b934fab79f50870609b4fdda71b0e57c9
Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com>
AuthorDate: Tue Nov 24 10:35:33 2020 +0000

    add settings to require catalog rebind/init to wait until certain services 
are available
    
    with optional timeout and optional post-init start level change, which can 
be used e.g. to trigger fileinstall hot deployment
---
 .../apache/brooklyn/camp/BasicCampPlatform.java    |   2 +-
 .../brooklyn/launcher/osgi/OsgiLauncherImpl.java   | 199 ++++++++++++++++++++-
 .../apache/brooklyn/util/osgi/OsgiActivator.java   |  11 ++
 3 files changed, 204 insertions(+), 8 deletions(-)

diff --git 
a/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java 
b/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java
index e0cbf6d..e0a9e29 100644
--- 
a/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java
+++ 
b/camp/camp-base/src/main/java/org/apache/brooklyn/camp/BasicCampPlatform.java
@@ -137,7 +137,7 @@ public class BasicCampPlatform extends CampPlatform {
             if (!committed.get()) {
                 // normal, in the case of errors (which might occur when 
catalog tries to figure out the right plan format); shouldn't happen otherwise
                 // if we want log.warn visibility of these, then we will have 
to supply an abandon() method on this interface and ensure that is invoked on 
errors
-                log.debug("transaction "+this+" was never applied");
+                log.debug("Transaction "+this+" was never applied (normal when 
attempting to auto-detect plan formats)");
             }
             super.finalize();
         }
diff --git 
a/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java
 
b/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java
index 7ff388d..6e23e7a 100644
--- 
a/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java
+++ 
b/karaf/init/src/main/java/org/apache/brooklyn/launcher/osgi/OsgiLauncherImpl.java
@@ -16,6 +16,11 @@
 package org.apache.brooklyn.launcher.osgi;
 
 import com.google.common.base.Stopwatch;
+import com.sun.xml.bind.v2.TODO;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
 import org.apache.brooklyn.core.BrooklynVersionService;
@@ -23,18 +28,26 @@ import 
org.apache.brooklyn.core.catalog.internal.CatalogInitialization;
 import org.apache.brooklyn.core.internal.BrooklynProperties;
 import org.apache.brooklyn.core.mgmt.internal.BrooklynShutdownHooks;
 import org.apache.brooklyn.core.mgmt.persist.PersistMode;
+import org.apache.brooklyn.core.typereg.BrooklynCatalogBundleResolver;
 import org.apache.brooklyn.launcher.common.BasicLauncher;
 import org.apache.brooklyn.launcher.common.BrooklynPropertiesFactoryHelper;
 import org.apache.brooklyn.rest.BrooklynWebConfig;
 import 
org.apache.brooklyn.rest.security.provider.BrooklynUserWithRandomPasswordSecurityProvider;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.javalang.Threads;
+import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
-import org.osgi.framework.Constants;
-import org.osgi.framework.InvalidSyntaxException;
+import org.apache.brooklyn.util.time.Time;
+import org.osgi.framework.*;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.startlevel.FrameworkStartLevel;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +63,21 @@ public class OsgiLauncherImpl extends 
BasicLauncher<OsgiLauncherImpl> implements
     private static final Logger LOG = 
LoggerFactory.getLogger(OsgiLauncherImpl.class);
     public static final String BROOKLYN_CONFIG_PID = "brooklyn";
 
+    /** Takes a JSON representation or comma separated list of OSGi filter 
expressions (cf {@link BundleContext#getServiceReferences(String, String)},
+     * where each filter must be satisfied before the catalog will be 
initialized.
+     * <p>
+     * For example:
+     * <code>(&(osgi.service.blueprint.compname=customBundleResolver))</code> 
to wait for a service registered via a blueprint with component name 
'customBundleResolver', or
+     * or
+     * 
<code>["(&(osgi.service.blueprint.compname=customBundleResolver))","(&(osgi.service.blueprint.compname=customTypePlanTransformer))"]
+     * to wait for two services registered, 'customBundleResolver' and 
'customTypePlanTransformer'</code>.
+     * <p>
+     * This can be required where some catalog or rebind items depend on 
services installed during startup.
+     * */
+    public static final String BROOKLYN_OSGI_DEPENDENCIES_SERVICES_FILTERS = 
"brooklyn.osgi.dependencies.services.filters";
+    public static final String BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT = 
"brooklyn.osgi.dependencies.services.timeout";
+    public static final String BROOKLYN_OSGI_STARTLEVEL_POSTINIT = 
"brooklyn.osgi.startlevel.postinit";
+
     private Object reloadLock = new Object();
 
     private BrooklynVersionService brooklynVersion;
@@ -67,7 +95,7 @@ public class OsgiLauncherImpl extends 
BasicLauncher<OsgiLauncherImpl> implements
         // make sure brooklyn-core bundle is started
         brooklynVersion.getVersion();
 
-        Configuration brooklynConfig = getConfiguration(configAdmin, 
BROOKLYN_CONFIG_PID);
+        Configuration brooklynConfig = getConfiguration(BROOKLYN_CONFIG_PID);
         // Note that this doesn't check whether the files exist, just that 
there are potential alternative sources for configuration.
         if (brooklynConfig == null && 
Strings.isEmpty(globalBrooklynProperties) && 
Strings.isEmpty(localBrooklynProperties)) {
             LOG.warn("Config Admin PID '" + BROOKLYN_CONFIG_PID + "' not 
found, not using external configuration. Create a brooklyn.cfg file in etc 
folder.");
@@ -79,8 +107,8 @@ public class OsgiLauncherImpl extends 
BasicLauncher<OsgiLauncherImpl> implements
         return super.startPartOne();
     }
 
-    private Configuration getConfiguration(ConfigurationAdmin configAdmin, 
String brooklynConfigPid) {
-        String filter = '(' + Constants.SERVICE_PID + '=' + brooklynConfigPid 
+ ')';
+    private Configuration getConfiguration(String configPid) {
+        String filter = '(' + Constants.SERVICE_PID + '=' + configPid + ')';
         Configuration[] configs;
         try {
             configs = configAdmin.listConfigurations(filter);
@@ -108,14 +136,165 @@ public class OsgiLauncherImpl extends 
BasicLauncher<OsgiLauncherImpl> implements
         }
     }
 
+    Thread fallbackThread = null;
     @Override
     public void startOsgi() {
+        final Bundle bundle = FrameworkUtil.getBundle(this.getClass());
+
+        Framework f = (Framework) bundle.getBundleContext().getBundle(0);
+        int startLevel = f.adapt(FrameworkStartLevel.class).getStartLevel();
+
+        if (areServiceDependenciesReady(bundle, bundle + " bundle 
activation")) {
+            LOG.debug("Starting OSGi catalog/rebind (no service dependencies 
or all already satisfied, on bundle activation)");
+            doStartOsgiAfterBundlesRefreshed();
+
+        } else {
+            ServiceListener sl[] = { null };
+
+
+            sl[0] = new ServiceListener() {
+                @Override
+                public void serviceChanged(ServiceEvent event) {
+                    if (areServiceDependenciesReady(bundle, "ServiceEvent[" + 
event.getServiceReference() + " / " + event.getType() + " - " + 
event.getSource() + "]")) {
+                        LOG.debug("Starting OSGi catalog/rebind - all service 
dependencies satisfied");
+                        bundle.getBundleContext().removeServiceListener(sl[0]);
+                        if (fallbackThread!=null) {
+                            fallbackThread.interrupt();
+                        }
+                        new Thread(() -> {
+                            // do this in a new thread as it takes a while, 
shouldn't run in service listener thread
+                            doStartOsgiAfterBundlesRefreshed();
+                        }).start();
+                    }
+                }
+            };
+
+            Duration timeout;
+            try {
+                timeout = Duration.parse((String) 
getBrooklynProperties().getConfig(BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT));
+            } catch (Exception e) {
+                throw Exceptions.propagateAnnotated("Invalid duration 
specified for '"+BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT+"'", e);
+            }
+            fallbackThread = new Thread(() -> {
+                if (timeout==null) {
+                    LOG.debug("No timeout specified for 
'"+BROOKLYN_OSGI_DEPENDENCIES_SERVICES_TIMEOUT+"'; will wait indefinitely for 
service dependencies");
+                    return;
+                }
+
+                CountdownTimer timer = timeout==null ? null : 
CountdownTimer.newInstanceStarted(timeout);
+                try {
+                    if (timeout!=null) {
+                        LOG.debug("Service dependencies timeout detected as " 
+ timeout + "; will start catalog/rebind after that delay if service 
dependencies not fulfilled sooner");
+                    }
+                    int iteration = 0;
+                    do {
+                        if (iteration>0) {
+                            LOG.debug("Still waiting on service dependencies, 
" + iteration+"m so far" +
+                                    (timer!=null ? ", 
"+timer.getDurationRemaining()+" remaining until timeout" :  " (will wait 
indefinitely)"));
+                        }
+                        Duration wait = Duration.ONE_MINUTE;
+                        if (timer != null && 
timer.getDurationRemaining().isShorterThan(wait)) {
+                            wait = timer.getDurationRemaining();
+                        }
+                        Time.sleep(wait);
+                        iteration++;
+                    } while (timer==null || timer.isNotExpired());
+                } catch (RuntimeInterruptedException e) {
+                    // normal - thread aborted probably because service 
dependencies fulfilled
+                    LOG.debug("OsgiLauncher fallback thread interrupted, 
probably because service dependencies fulfilled in another thread where OSGi 
catalog/rebind start should be occurring (or due to shutdown)");
+                    Thread.interrupted();
+                    return;
+                } finally {
+                    fallbackThread = null;
+                }
+
+                LOG.warn("Starting OSGi catalog/rebind due to timeout waiting 
for service dependencies");
+                bundle.getBundleContext().removeServiceListener(sl[0]);
+                new Thread(() -> {
+                    // do this in a new thread so it isn't interrupted if 
fallback[0] is interrupted
+                    if (!doStartOsgiAfterBundlesRefreshed()) {
+                        LOG.debug("Did not start OSGi after timeout; already 
started");
+                    }
+                }).start();
+            });
+
+            bundle.getBundleContext().addServiceListener(sl[0]);
+            fallbackThread.start();
+        }
+    }
+
+    private boolean areServiceDependenciesReady(Bundle bundle, String context) 
{
+        Object deps = 
getBrooklynProperties().getConfig(BROOKLYN_OSGI_DEPENDENCIES_SERVICES_FILTERS);
+        if (deps!=null) {
+            List<String> items = 
JavaStringEscapes.unwrapJsonishListStringIfPossible(deps.toString());
+            LOG.debug("OSGi catalog/rebind service dependency check, on " + 
context + ": " + items);
+
+            for (String item: items) {
+                ServiceReference r1[];
+                try {
+                    r1 = 
bundle.getBundleContext().getServiceReferences((String)null, item);
+                } catch (Exception e) {
+                    throw Exceptions.propagateAnnotated("Error getting service 
references satisfying '"+item+"'", e);
+                }
+
+                if (r1 == null || r1.length == 0) {
+                    LOG.debug("OSGi catalog/rebind blocked, service dependency 
not yet fulfilled (will keep listening for services): '" + item + "'");
+                    return false;
+                }
+            }
+
+            return true;
+
+
+        } else {
+            LOG.debug("No service dependencies specified, on " + context);
+            return true;
+
+        }
+    }
+
+    AtomicBoolean startedOsgiAfterBundlesRefreshed = new AtomicBoolean();
+    private boolean doStartOsgiAfterBundlesRefreshed() {
+        if (startedOsgiAfterBundlesRefreshed.getAndSet(true)) {
+            LOG.debug("OSGi catalog/rebind already started when invoked a 
second time", new Throwable("Trace for unexpected redundant OSGi catalog/rebind 
start"));
+            return false;
+        }
+
+        doStartOsgi();
+
+        Object newStartLevelS = null;
+        try {
+            newStartLevelS = 
getBrooklynProperties().getConfig(BROOKLYN_OSGI_STARTLEVEL_POSTINIT);
+            if (newStartLevelS==null || Strings.isBlank(""+newStartLevelS)) {
+                LOG.debug("No change required to OSGi start-level after OSGi 
catalog/rebind ("+BROOKLYN_OSGI_STARTLEVEL_POSTINIT+" unset)");
+
+            } else {
+                FrameworkStartLevel fsl = 
FrameworkUtil.getBundle(this.getClass()).getBundleContext().getBundle(0).adapt(FrameworkStartLevel.class);
+
+                int newStartLevel = TypeCoercions.coerce(newStartLevelS, 
Integer.class);
+                if (fsl.getStartLevel()<newStartLevel) {
+                    LOG.debug("Changing OSGi start-level to "+newStartLevelS+" 
(from "+fsl+") after OSGi catalog/rebind");
+                    fsl.setStartLevel(newStartLevel);
+
+                } else {
+                    LOG.debug("No change required to OSGi start-level after 
OSGi catalog/rebind (currently "+fsl.getStartLevel()+", 
"+BROOKLYN_OSGI_STARTLEVEL_POSTINIT+"="+newStartLevelS+" required)");
+
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error handling post-init start level: "+e, e);
+        }
+
+        return true;
+    }
+
+    private void doStartOsgi() {
         synchronized (reloadLock) {
             final Stopwatch startupTimer = Stopwatch.createStarted();
-            LOG.debug("OsgiLauncher start");
+            LOG.debug("OsgiLauncher catalog/rebind running initialization 
(part two)");
             startPartTwo();
             startupTimer.stop();
-            LOG.info("Brooklyn initialisation (part two) complete after {}", 
startupTimer.toString());
+            LOG.info("Brooklyn initialization (part two) complete after {}", 
startupTimer.toString());
         }
     }
 
@@ -123,6 +302,12 @@ public class OsgiLauncherImpl extends 
BasicLauncher<OsgiLauncherImpl> implements
     public void destroyOsgi() {
         LOG.debug("Notified of system shutdown, calling shutdown hooks");
         Threads.runShutdownHooks();
+
+        Thread t = fallbackThread;
+        if (t!=null) {
+            LOG.debug("Notified of system shutdown, cancelling service 
dependencies fallback thread");
+            t.interrupt();
+        }
     }
 
     @Override
diff --git 
a/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java 
b/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java
index 9881bb0..7649b3f 100644
--- 
a/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java
+++ 
b/utils/common/src/main/java/org/apache/brooklyn/util/osgi/OsgiActivator.java
@@ -17,18 +17,29 @@ package org.apache.brooklyn.util.osgi;
 
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.startlevel.FrameworkStartLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * handle bundle activation/deactivation
  */
 public class OsgiActivator implements BundleActivator {
 
+  private static Logger LOG = LoggerFactory.getLogger(OsgiActivator.class);
+
   @Override
   public void start(BundleContext context) throws Exception {
+      Framework f = (Framework) context.getBundle(0);
+      // https://issues.apache.org/jira/browse/KARAF-6920: framework start 
level should report 80 but instead says 100 on clean start
+      // when this is resolved we will no longer need the 
`startlevel.postinit` property in OsgiLauncherImpl
+      LOG.debug("Starting "+context.getBundle()+", at OSGi start-level 
"+f.adapt(FrameworkStartLevel.class).getStartLevel()+", bundle state 
"+context.getBundle().getState());
   }
 
   @Override
   public void stop(BundleContext context) throws Exception {
+      LOG.debug("Stopping "+context.getBundle());
       OsgiUtil.shutdown();
   }
 

Reply via email to