Author: bdelacretaz Date: Wed Jul 30 13:23:12 2014 New Revision: 1614655 URL: http://svn.apache.org/r1614655 Log: SLING-3744 - optionally schedule execution of health checks via cron-like service property. Contributed by Georg Henzler, thanks!
Added: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/AsyncHealthCheckExecutor.java Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/api/HealthCheck.java sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckFuture.java sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/util/HealthCheckMetadata.java sling/trunk/bundles/extensions/healthcheck/samples/pom.xml sling/trunk/bundles/extensions/healthcheck/samples/src/main/java/org/apache/sling/hc/samples/impl/AsyncHealthCheckSample.java sling/trunk/bundles/extensions/healthcheck/samples/src/main/resources/SLING-CONTENT/apps/hc/demo/install/org.apache.sling.hc.samples.impl.AsyncHealthCheckSample-1.json Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/api/HealthCheck.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/api/HealthCheck.java?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/api/HealthCheck.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/api/HealthCheck.java Wed Jul 30 13:23:12 2014 @@ -60,7 +60,13 @@ public interface HealthCheck { * The value of this property must be of type String or String array. */ String TAGS = "hc.tags"; - + + /** + * Optional service property: If this property is set the health check + * will be executed asynchronously using the cron expression provided. + */ + String ASYNC_CRON_EXPRESSION = "hc.async.cronExpression"; + /** * Execute this health check and return a {@link Result} * This is meant to execute quickly, access to external Added: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/AsyncHealthCheckExecutor.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/AsyncHealthCheckExecutor.java?rev=1614655&view=auto ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/AsyncHealthCheckExecutor.java (added) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/AsyncHealthCheckExecutor.java Wed Jul 30 13:23:12 2014 @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The SF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.sling.hc.core.impl.executor; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.hc.api.HealthCheck; +import org.apache.sling.hc.api.Result; +import org.apache.sling.hc.api.execution.HealthCheckExecutionResult; +import org.apache.sling.hc.core.impl.executor.HealthCheckFuture.Callback; +import org.apache.sling.hc.util.HealthCheckFilter; +import org.apache.sling.hc.util.HealthCheckMetadata; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; +import org.osgi.framework.ServiceReference; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Runs health checks that are configured with a cron expression for asynchronous + * execution. Used by HealthCheckExecutor. + * + */ +@Service({ AsyncHealthCheckExecutor.class }) +@Component(label = "Sling Async Health Check Executor", + description = "Runs async health checks", + metatype = true, immediate = true) +public class AsyncHealthCheckExecutor implements ServiceListener { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncHealthCheckExecutor.class); + + @Reference + private Scheduler scheduler; + + private Map<HealthCheckMetadata, ExecutionResult> asyncResultsByDescriptor = new ConcurrentHashMap<HealthCheckMetadata, ExecutionResult>(); + + private Map<HealthCheckMetadata, HealthCheckAsyncJob> registeredJobs = new HashMap<HealthCheckMetadata, HealthCheckAsyncJob>(); + + private BundleContext bundleContext; + + + + @Activate + protected final void activate(final ComponentContext componentContext) { + this.bundleContext = componentContext.getBundleContext(); + this.bundleContext.addServiceListener(this); + + int count = 0; + HealthCheckFilter healthCheckFilter = new HealthCheckFilter(bundleContext); + final ServiceReference[] healthCheckReferences = healthCheckFilter.getTaggedHealthCheckServiceReferences(new String[0]); + for (ServiceReference serviceReference : healthCheckReferences) { + HealthCheckMetadata healthCheckMetadata = new HealthCheckMetadata(serviceReference); + if (isAsync(healthCheckMetadata)) { + if (scheduleHealthCheck(healthCheckMetadata)) { + count++; + } + } + } + LOG.debug("Scheduled {} jobs for asynchronous health checks", count); + } + + @Deactivate + protected final void deactivate(final ComponentContext componentContext) { + this.bundleContext.removeServiceListener(this); + this.bundleContext = null; + + LOG.debug("Unscheduling {} jobs for asynchronous health checks", registeredJobs.size()); + for (HealthCheckMetadata healthCheckDescriptor : new LinkedList<HealthCheckMetadata>(registeredJobs.keySet())) { + unscheduleHealthCheck(healthCheckDescriptor); + } + + } + + @Override + public void serviceChanged(ServiceEvent event) { + if(bundleContext == null) { + // already deactivated? + return; + } + ServiceReference serviceReference = event.getServiceReference(); + final boolean isHealthCheck = serviceReference.isAssignableTo(bundleContext.getBundle(), HealthCheck.class.getName()); + + if (isHealthCheck) { + HealthCheckMetadata healthCheckMetadata = new HealthCheckMetadata(serviceReference); + int eventType = event.getType(); + LOG.debug("Received service event of type {} for health check {}", eventType, healthCheckMetadata); + if (eventType == ServiceEvent.REGISTERED) { + scheduleHealthCheck(healthCheckMetadata); + } else if (eventType == ServiceEvent.UNREGISTERING) { + unscheduleHealthCheck(healthCheckMetadata); + } else if (eventType == ServiceEvent.MODIFIED) { + unscheduleHealthCheck(healthCheckMetadata); + scheduleHealthCheck(healthCheckMetadata); + } + + } + } + + private boolean scheduleHealthCheck(HealthCheckMetadata descriptor) { + + if(!isAsync(descriptor)) { + return false; + } + + try { + HealthCheckAsyncJob healthCheckAsyncJob = new HealthCheckAsyncJob(descriptor); + LOG.debug("Scheduling job {} with cron expression {}", healthCheckAsyncJob, descriptor.getAsyncCronExpression()); + this.scheduler.addJob(healthCheckAsyncJob.getJobId(), healthCheckAsyncJob, null, descriptor.getAsyncCronExpression(), true); + registeredJobs.put(descriptor, healthCheckAsyncJob); + return true; + } catch (Exception e) { + LOG.warn("Could not schedule job for " + descriptor + ". Exeception: " + e, e); + return false; + } + + } + + private boolean unscheduleHealthCheck(HealthCheckMetadata descriptor) { + + // here no check for isAsync must be used to ensure previously + // scheduled async checks are correctly unscheduled if they have + // changed from async to sync. + + HealthCheckAsyncJob job = registeredJobs.remove(descriptor); + try { + if (job != null) { + LOG.debug("Unscheduling job {} with cron expression '{}'", job, descriptor.getAsyncCronExpression()); + this.scheduler.removeJob(job.getJobId()); + return true; + } + } catch (Exception e) { + LOG.warn("Could not unschedule job " + job + ". Exeception: " + e, e); + } + return false; + + } + + void collectAsyncResults(List<HealthCheckMetadata> healthCheckDescriptors, Collection<HealthCheckExecutionResult> results) { + Iterator<HealthCheckMetadata> checksIt = healthCheckDescriptors.iterator(); + + Set<ExecutionResult> asyncResults = new TreeSet<ExecutionResult>(); + while (checksIt.hasNext()) { + HealthCheckMetadata healthCheckMetadata = checksIt.next(); + if (isAsync(healthCheckMetadata)) { + ExecutionResult result = asyncResultsByDescriptor.get(healthCheckMetadata); + if (result == null) { + + result = new ExecutionResult(healthCheckMetadata, new Result(Result.Status.INFO, "Async Health Check with cron expression '" + + healthCheckMetadata.getAsyncCronExpression() + "' has not yet been executed."), 0L); + + asyncResults.add(result); + } + asyncResults.add(result); + // remove from HC collection to not execute the check in HealthCheckExecutorImpl + checksIt.remove(); + } + } + LOG.debug("Adding {} results from async results", asyncResults.size()); + results.addAll(asyncResults); + + } + + private boolean isAsync(HealthCheckMetadata healthCheckMetadata) { + return StringUtils.isNotBlank(healthCheckMetadata.getAsyncCronExpression()); + } + + + private class HealthCheckAsyncJob implements Runnable { + + private final HealthCheckMetadata healthCheckDescriptor; + + public HealthCheckAsyncJob(HealthCheckMetadata healthCheckDescriptor) { + super(); + this.healthCheckDescriptor = healthCheckDescriptor; + } + + public String getJobId() { + String jobId = "job-hc-" + healthCheckDescriptor.getServiceId(); + return jobId; + } + + @Override + public void run() { + + LOG.debug("Running job {}", this); + HealthCheckFuture healthCheckFuture = new HealthCheckFuture(healthCheckDescriptor, bundleContext, new Callback() { + + @Override + public void finished(HealthCheckExecutionResult result) { + // no action needed here + + }}); + + // run future in same thread (as we are already async via scheduler) + healthCheckFuture.run(); + + try { + ExecutionResult result = healthCheckFuture.get(); + LOG.debug("Aync execution of {} returned {}", healthCheckDescriptor, result); + asyncResultsByDescriptor.put(healthCheckDescriptor, result); + } catch (Exception e) { + LOG.warn("Could not upated async execution result for " + healthCheckDescriptor + ". Exception: " + e, e); + } + + } + + @Override + public String toString() { + return "[Async job for " + this.healthCheckDescriptor + "]"; + } + + } + +} Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java Wed Jul 30 13:23:12 2014 @@ -103,6 +103,9 @@ public class HealthCheckExecutorImpl imp private final Map<HealthCheckMetadata, HealthCheckFuture> stillRunningFutures = new HashMap<HealthCheckMetadata, HealthCheckFuture>(); @Reference + private AsyncHealthCheckExecutor asyncHealthCheckExecutor; + + @Reference private ThreadPoolManager threadPoolManager; private ThreadPool hcThreadPool; @@ -199,6 +202,7 @@ public class HealthCheckExecutorImpl imp final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>(); final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences); + createResultsForDescriptors(healthCheckDescriptors, results); stopWatch.stop(); @@ -223,6 +227,9 @@ public class HealthCheckExecutorImpl imp // -- All methods below check if they can transform a healthCheckDescriptor into a result // -- if yes the descriptor is removed from the list and the result added + // get async results + asyncHealthCheckExecutor.collectAsyncResults(healthCheckDescriptors, results); + // reuse cached results where possible healthCheckResultCache.useValidCacheResults(healthCheckDescriptors, results, resultCacheTtlInMs); Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckFuture.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckFuture.java?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckFuture.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckFuture.java Wed Jul 30 13:23:12 2014 @@ -21,7 +21,6 @@ import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.StopWatch; import org.apache.sling.hc.api.HealthCheck; import org.apache.sling.hc.api.Result; @@ -50,8 +49,7 @@ class HealthCheckFuture extends FutureTa super(new Callable<ExecutionResult>() { @Override public ExecutionResult call() throws Exception { - Thread.currentThread().setName( - "Health-Check-" + StringUtils.substringAfterLast(metadata.getTitle(), ".")); + Thread.currentThread().setName("HealthCheck " + metadata.getTitle()); LOG.debug("Starting check {}", metadata); final StopWatch stopWatch = new StopWatch(); @@ -84,8 +82,8 @@ class HealthCheckFuture extends FutureTa LOG.debug("Time consumed for {}: {}", metadata, HealthCheckExecutorImpl.msHumanReadable(elapsedTime)); } - Thread.currentThread().setName("Health-Check-idle"); callback.finished(executionResult); + Thread.currentThread().setName("HealthCheck idle"); return executionResult; } }); Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/util/HealthCheckMetadata.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/util/HealthCheckMetadata.java?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/util/HealthCheckMetadata.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/util/HealthCheckMetadata.java Wed Jul 30 13:23:12 2014 @@ -41,6 +41,8 @@ public class HealthCheckMetadata { private final long serviceId; private final List<String> tags; + + private final String asyncCronExpression; private final transient ServiceReference serviceReference; @@ -50,6 +52,7 @@ public class HealthCheckMetadata { this.mbeanName = (String) ref.getProperty(HealthCheck.MBEAN_NAME); this.title = getHealthCheckTitle(ref); this.tags = arrayPropertyToListOfStr(ref.getProperty(HealthCheck.TAGS)); + this.asyncCronExpression = (String) ref.getProperty(HealthCheck.ASYNC_CRON_EXPRESSION); this.serviceReference = ref; } @@ -89,6 +92,14 @@ public class HealthCheckMetadata { public List<String> getTags() { return tags; } + + + /** + * Return the cron expression used for asynchronous execution. + */ + public String getAsyncCronExpression() { + return asyncCronExpression; + } /** * Return the service id. Modified: sling/trunk/bundles/extensions/healthcheck/samples/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/samples/pom.xml?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/samples/pom.xml (original) +++ sling/trunk/bundles/extensions/healthcheck/samples/pom.xml Wed Jul 30 13:23:12 2014 @@ -85,7 +85,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.hc.core</artifactId> - <version>1.0.4</version> + <version>1.1.1-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> Modified: sling/trunk/bundles/extensions/healthcheck/samples/src/main/java/org/apache/sling/hc/samples/impl/AsyncHealthCheckSample.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/samples/src/main/java/org/apache/sling/hc/samples/impl/AsyncHealthCheckSample.java?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/samples/src/main/java/org/apache/sling/hc/samples/impl/AsyncHealthCheckSample.java (original) +++ sling/trunk/bundles/extensions/healthcheck/samples/src/main/java/org/apache/sling/hc/samples/impl/AsyncHealthCheckSample.java Wed Jul 30 13:23:12 2014 @@ -43,39 +43,23 @@ import org.slf4j.LoggerFactory; @Property(name=HealthCheck.NAME), @Property(name=HealthCheck.TAGS, unbounded=PropertyUnbounded.ARRAY), @Property(name=HealthCheck.MBEAN_NAME), - - // Period *must* be a Long - @Property(name="scheduler.period", longValue=AsyncHealthCheckSample.PERIOD_SECONDS, propertyPrivate=true), - // Concurrent=false avoids reentrant calls to run() - @Property(name="scheduler.concurrent", boolValue=false) + @Property(name=HealthCheck.ASYNC_CRON_EXPRESSION) + }) -@Service(value={HealthCheck.class,Runnable.class}) -public class AsyncHealthCheckSample implements HealthCheck, Runnable { +@Service +public class AsyncHealthCheckSample implements HealthCheck { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final AtomicInteger counter = new AtomicInteger(); + private final Logger log = LoggerFactory.getLogger(AsyncHealthCheckSample.class); + + // static because for factories, not always the same instance is returned for + // the same service reference + private static final AtomicInteger counter = new AtomicInteger(); public static final int PERIOD_SECONDS = 5; @Override public Result execute() { - final FormattingResultLog resultLog = new FormattingResultLog(); - final int value = counter.get(); - resultLog.debug("{} - counter value is {}", this, value); - if(value % 2 != 0) { - resultLog.warn("Counter value ({}) is not even", value); - } - return new Result(resultLog); - } - - /** Called by the Sling scheduler, every {@link #SCHEDULER_PERIOD} seconds, without - * reentrant calls, as configured by our scheduler.* service properties. - * - * Simulates an expensive operation by waiting a random time up to twice that period - * before incrementing our counter. - */ - @Override - public void run() { + final long toWait = (long)(Math.random() * 2 * PERIOD_SECONDS); log.info("{} - Waiting {} seconds to simulate an expensive operation...", this, toWait); try { @@ -83,7 +67,17 @@ public class AsyncHealthCheckSample impl } catch(InterruptedException iex) { log.warn("Sleep interrupted", iex); } - counter.incrementAndGet(); - log.info("{} - counter set to {}", this, counter.get()); + + final int value = counter.incrementAndGet(); + log.info("{} - counter set to {}", this, value); + + final FormattingResultLog resultLog = new FormattingResultLog(); + + resultLog.debug("{} - counter value is {}", this, value); + if(value % 2 != 0) { + resultLog.warn("Counter value ({}) is not even", value); + } + return new Result(resultLog); } + } \ No newline at end of file Modified: sling/trunk/bundles/extensions/healthcheck/samples/src/main/resources/SLING-CONTENT/apps/hc/demo/install/org.apache.sling.hc.samples.impl.AsyncHealthCheckSample-1.json URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/samples/src/main/resources/SLING-CONTENT/apps/hc/demo/install/org.apache.sling.hc.samples.impl.AsyncHealthCheckSample-1.json?rev=1614655&r1=1614654&r2=1614655&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/samples/src/main/resources/SLING-CONTENT/apps/hc/demo/install/org.apache.sling.hc.samples.impl.AsyncHealthCheckSample-1.json (original) +++ sling/trunk/bundles/extensions/healthcheck/samples/src/main/resources/SLING-CONTENT/apps/hc/demo/install/org.apache.sling.hc.samples.impl.AsyncHealthCheckSample-1.json Wed Jul 30 13:23:12 2014 @@ -2,5 +2,6 @@ "jcr:primaryType" : "sling:OsgiConfig", "hc.name" : "Asynchronous Health Check sample", "hc.tags" : [async], - "hc.mbean.name" : "AsyncHealthCheckSample" + "hc.mbean.name" : "AsyncHealthCheckSample", + "hc.async.cronExpression" : "*/20 * * * * ?" }