CAMEL-11443: Add a RouteController SPI to allow to customize routes life-cycle
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e68111ec Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e68111ec Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e68111ec Branch: refs/heads/master Commit: e68111ec13c85b5a2596797f404c1f4e433631e9 Parents: 45616e1 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon Jul 10 18:17:22 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Jul 20 16:20:11 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 15 + .../java/org/apache/camel/Experimental.java | 32 ++ .../api/management/mbean/ManagedRouteMBean.java | 8 + .../apache/camel/impl/DefaultCamelContext.java | 331 +++++++---- .../apache/camel/impl/DefaultRouteContext.java | 23 + .../camel/impl/DefaultRouteController.java | 107 ++++ .../apache/camel/impl/DefaultRouteError.java | 67 +++ .../org/apache/camel/impl/RouteService.java | 1 + .../camel/impl/SupervisingRouteController.java | 556 +++++++++++++++++++ .../camel/management/mbean/ManagedRoute.java | 19 +- .../java/org/apache/camel/spi/RouteContext.java | 36 ++ .../org/apache/camel/spi/RouteController.java | 41 ++ .../java/org/apache/camel/spi/RouteError.java | 42 ++ .../org/apache/camel/util/ObjectHelper.java | 15 + .../org/apache/camel/util/backoff/BackOff.java | 181 ++++++ .../camel/util/backoff/BackOffContext.java | 101 ++++ .../apache/camel/util/backoff/BackOffTimer.java | 97 ++++ .../org/apache/camel/util/backoff/package.html | 25 + .../apache/camel/util/backoff/BackOffTest.java | 102 ++++ .../camel/util/backoff/BackOffTimerTest.java | 105 ++++ .../xml/AbstractCamelContextFactoryBean.java | 8 + .../spring/boot/CamelAutoConfiguration.java | 8 + .../actuate/endpoint/CamelRoutesEndpoint.java | 53 +- .../endpoint/CamelRoutesMvcEndpoint.java | 159 +++++- .../endpoint/CamelRoutesMvcEndpointTest.java | 2 +- .../pom.xml | 154 +++++ .../readme.adoc | 44 ++ .../src/main/java/sample/camel/Application.java | 37 ++ .../java/sample/camel/ApplicationRoutes.java | 43 ++ .../src/main/resources/application.properties | 40 ++ examples/pom.xml | 1 + ...rvisingRouteControllerAutoConfiguration.java | 79 +++ ...SupervisingRouteControllerConfiguration.java | 109 ++++ .../main/resources/META-INF/spring.factories | 1 + 34 files changed, 2512 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 6568121..98d0d6c 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -72,6 +72,7 @@ import org.apache.camel.spi.Registry; import org.apache.camel.spi.ReloadStrategy; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestRegistry; +import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.spi.RuntimeEndpointRegistry; @@ -506,6 +507,20 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { //----------------------------------------------------------------------- /** + * NOTE: experimental api + * + * @param routeController the route controller + */ + void setRouteController(RouteController routeController); + + /** + * NOTE: experimental api + * + * @return the route controller or null if not set. + */ + RouteController getRouteController(); + + /** * Method to signal to {@link CamelContext} that the process to initialize setup routes is in progress. * * @param done <tt>false</tt> to start the process, call again with <tt>true</tt> to signal its done. http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/Experimental.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Experimental.java b/camel-core/src/main/java/org/apache/camel/Experimental.java new file mode 100644 index 0000000..6ea6c1f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/Experimental.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An experimental user-facing API. Experimental API's might change or be removed + * in minor versions. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface Experimental { + int revision() default 1; +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java index 010a6dd..409a654 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java @@ -16,8 +16,10 @@ */ package org.apache.camel.api.management.mbean; +import org.apache.camel.Experimental; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.spi.RouteError; public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @@ -129,5 +131,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @ManagedAttribute(description = "Oldest inflight exchange id") String getOldestInflightExchangeId(); + @Experimental + @ManagedAttribute(description = "Route controller") + Boolean getHasRouteController(); + @Experimental + @ManagedAttribute(description = "Last error") + RouteError getLastError(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 3ebad6d..bf7cb59 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeMap; @@ -157,6 +158,8 @@ import org.apache.camel.spi.ReloadStrategy; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestRegistry; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteController; +import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.spi.RuntimeEndpointRegistry; @@ -309,6 +312,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private final RuntimeCamelCatalog runtimeCamelCatalog = new DefaultRuntimeCamelCatalog(this, true); private SSLContextParameters sslContextParameters; private final ThreadLocal<Set<String>> componentsInCreation = ThreadLocal.withInitial(HashSet::new); + private RouteController routeController; /** * Creates the {@link CamelContext} using {@link JndiRegistry} as registry, @@ -346,6 +350,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon this.managementStrategy = createManagementStrategy(); this.managementMBeanAssembler = createManagementMBeanAssembler(); + // Route controller + this.routeController = new DefaultRouteController(this); + // Call all registered trackers with this context // Note, this may use a partially constructed object CamelContextTrackerRegistry.INSTANCE.contextCreated(this); @@ -852,6 +859,17 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // Route Management Methods // ----------------------------------------------------------------------- + @Override + public void setRouteController(RouteController routeController) { + this.routeController = routeController; + this.routeController.setCamelContext(this); + } + + @Override + public RouteController getRouteController() { + return routeController; + } + public List<RouteStartupOrder> getRouteStartupOrder() { return routeStartupOrder; } @@ -1137,99 +1155,151 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } public synchronized void startRoute(String routeId) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - startRouteService(routeService, false); + try { + startRouteService(routeService, false); + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.START, e); + throw e; + } } } public synchronized void resumeRoute(String routeId) throws Exception { - if (!routeSupportsSuspension(routeId)) { - // start route if suspension is not supported - startRoute(routeId); - return; - } + DefaultRouteError.reset(this, routeId); - RouteService routeService = routeServices.get(routeId); - if (routeService != null) { - resumeRouteService(routeService); - // must resume the route as well - Route route = getRoute(routeId); - ServiceHelper.resumeService(route); + try { + if (!routeSupportsSuspension(routeId)) { + // start route if suspension is not supported + startRoute(routeId); + return; + } + + RouteService routeService = routeServices.get(routeId); + if (routeService != null) { + resumeRouteService(routeService); + // must resume the route as well + Route route = getRoute(routeId); + ServiceHelper.resumeService(route); + } + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.RESUME, e); + throw e; } } public synchronized boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); + try { + RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); - boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout); - if (completed) { - // must stop route service as well - stopRouteService(routeService, false); - } else { - // shutdown was aborted, make sure route is re-started properly - startRouteService(routeService, false); + boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout); + if (completed) { + // must stop route service as well + stopRouteService(routeService, false); + } else { + // shutdown was aborted, make sure route is re-started properly + startRouteService(routeService, false); + } + return completed; + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e); + throw e; } - return completed; } + return false; } public synchronized void stopRoute(String routeId) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); - routes.add(order); + try { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); + routes.add(order); - getShutdownStrategy().shutdown(this, routes); - // must stop route service as well - stopRouteService(routeService, false); + getShutdownStrategy().shutdown(this, routes); + // must stop route service as well + stopRouteService(routeService, false); + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e); + throw e; + } } } public synchronized void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); - routes.add(order); + try { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); + routes.add(order); - getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); - // must stop route service as well - stopRouteService(routeService, false); + getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); + // must stop route service as well + stopRouteService(routeService, false); + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e); + throw e; + } } } public synchronized void shutdownRoute(String routeId) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); - routes.add(order); + try { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); + routes.add(order); - getShutdownStrategy().shutdown(this, routes); - // must stop route service as well (and remove the routes from management) - stopRouteService(routeService, true); + getShutdownStrategy().shutdown(this, routes); + // must stop route service as well (and remove the routes from management) + stopRouteService(routeService, true); + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.SHUTDOWN, e); + throw e; + } } } public synchronized void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + DefaultRouteError.reset(this, routeId); + RouteService routeService = routeServices.get(routeId); if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); - routes.add(order); + try { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); + routes.add(order); - getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); - // must stop route service as well (and remove the routes from management) - stopRouteService(routeService, true); + getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); + // must stop route service as well (and remove the routes from management) + stopRouteService(routeService, true); + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.SHUTDOWN, e); + throw e; + } } } public synchronized boolean removeRoute(String routeId) throws Exception { + DefaultRouteError.reset(this, routeId); + // remove the route from ErrorHandlerBuilder if possible if (getErrorHandlerBuilder() instanceof ErrorHandlerBuilderSupport) { ErrorHandlerBuilderSupport builder = (ErrorHandlerBuilderSupport)getErrorHandlerBuilder(); @@ -1246,39 +1316,45 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon RouteService routeService = routeServices.get(routeId); if (routeService != null) { if (getRouteStatus(routeId).isStopped()) { - routeService.setRemovingRoutes(true); - shutdownRouteService(routeService); - removeRouteDefinition(routeId); - routeServices.remove(routeId); - // remove route from startup order as well, as it was removed - Iterator<RouteStartupOrder> it = routeStartupOrder.iterator(); - while (it.hasNext()) { - RouteStartupOrder order = it.next(); - if (order.getRoute().getId().equals(routeId)) { - it.remove(); + try { + routeService.setRemovingRoutes(true); + shutdownRouteService(routeService); + removeRouteDefinition(routeId); + routeServices.remove(routeId); + // remove route from startup order as well, as it was removed + Iterator<RouteStartupOrder> it = routeStartupOrder.iterator(); + while (it.hasNext()) { + RouteStartupOrder order = it.next(); + if (order.getRoute().getId().equals(routeId)) { + it.remove(); + } } - } - // from the route which we have removed, then remove all its private endpoints - // (eg the endpoints which are not in use by other routes) - Set<Endpoint> toRemove = new LinkedHashSet<Endpoint>(); - for (Endpoint endpoint : endpointsInUse.get(routeId)) { - // how many times is the endpoint in use - int count = 0; - for (Set<Endpoint> endpoints : endpointsInUse.values()) { - if (endpoints.contains(endpoint)) { - count++; + // from the route which we have removed, then remove all its private endpoints + // (eg the endpoints which are not in use by other routes) + Set<Endpoint> toRemove = new LinkedHashSet<Endpoint>(); + for (Endpoint endpoint : endpointsInUse.get(routeId)) { + // how many times is the endpoint in use + int count = 0; + for (Set<Endpoint> endpoints : endpointsInUse.values()) { + if (endpoints.contains(endpoint)) { + count++; + } + } + // notice we will count ourselves so if there is only 1 then its safe to remove + if (count <= 1) { + toRemove.add(endpoint); } } - // notice we will count ourselves so if there is only 1 then its safe to remove - if (count <= 1) { - toRemove.add(endpoint); + for (Endpoint endpoint : toRemove) { + log.debug("Removing: {} which was only in use by route: {}", endpoint, routeId); + removeEndpoint(endpoint); } + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.REMOVE, e); + throw e; } - for (Endpoint endpoint : toRemove) { - log.debug("Removing: {} which was only in use by route: {}", endpoint, routeId); - removeEndpoint(endpoint); - } + return true; } else { return false; @@ -1288,49 +1364,63 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } public synchronized void suspendRoute(String routeId) throws Exception { - if (!routeSupportsSuspension(routeId)) { - // stop if we suspend is not supported - stopRoute(routeId); - return; - } + try { + DefaultRouteError.reset(this, routeId); - RouteService routeService = routeServices.get(routeId); - if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - Route route = routeService.getRoutes().iterator().next(); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService); - routes.add(order); - - getShutdownStrategy().suspend(this, routes); - // must suspend route service as well - suspendRouteService(routeService); - // must suspend the route as well - if (route instanceof SuspendableService) { - ((SuspendableService) route).suspend(); + if (!routeSupportsSuspension(routeId)) { + // stop if we suspend is not supported + stopRoute(routeId); + return; } + + RouteService routeService = routeServices.get(routeId); + if (routeService != null) { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + Route route = routeService.getRoutes().iterator().next(); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService); + routes.add(order); + + getShutdownStrategy().suspend(this, routes); + // must suspend route service as well + suspendRouteService(routeService); + // must suspend the route as well + if (route instanceof SuspendableService) { + ((SuspendableService) route).suspend(); + } + } + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.SUSPEND, e); + throw e; } } public synchronized void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { - if (!routeSupportsSuspension(routeId)) { - stopRoute(routeId, timeout, timeUnit); - return; - } + DefaultRouteError.reset(this, routeId); - RouteService routeService = routeServices.get(routeId); - if (routeService != null) { - List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); - Route route = routeService.getRoutes().iterator().next(); - RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService); - routes.add(order); - - getShutdownStrategy().suspend(this, routes, timeout, timeUnit); - // must suspend route service as well - suspendRouteService(routeService); - // must suspend the route as well - if (route instanceof SuspendableService) { - ((SuspendableService) route).suspend(); + try { + if (!routeSupportsSuspension(routeId)) { + stopRoute(routeId, timeout, timeUnit); + return; + } + + RouteService routeService = routeServices.get(routeId); + if (routeService != null) { + List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); + Route route = routeService.getRoutes().iterator().next(); + RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService); + routes.add(order); + + getShutdownStrategy().suspend(this, routes, timeout, timeUnit); + // must suspend route service as well + suspendRouteService(routeService); + // must suspend the route as well + if (route instanceof SuspendableService) { + ((SuspendableService) route).suspend(); + } } + } catch (Exception e) { + DefaultRouteError.set(this, routeId, RouteError.Phase.SUSPEND, e); + throw e; } } @@ -3035,6 +3125,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // [TODO] Remove in 3.0 Container.Instance.manage(this); + // Start the route controller + ServiceHelper.startServices(this.routeController); + doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup(); // if the context was configured with auto startup = false, and we are already started, @@ -3162,8 +3255,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } if (log.isDebugEnabled()) { - log.debug("Using ClassResolver={}, PackageScanClassResolver={}, ApplicationContextClassLoader={}", - new Object[]{getClassResolver(), getPackageScanClassResolver(), getApplicationContextClassLoader()}); + log.debug("Using ClassResolver={}, PackageScanClassResolver={}, ApplicationContextClassLoader={}, RouteController={}", + getClassResolver(), getPackageScanClassResolver(), getApplicationContextClassLoader(), getRouteController()); } if (isStreamCaching()) { @@ -3363,6 +3456,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon stopWatch.restart(); log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is shutting down"); EventHelper.notifyCamelContextStopping(this); + + // Stop the route controller + ServiceHelper.stopAndShutdownService(this.routeController); // stop route inputs in the same order as they was started so we stop the very first inputs first try { @@ -3879,7 +3975,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon for (LifecycleStrategy strategy : lifecycleStrategies) { strategy.onServiceAdd(this, consumer, route); } - startService(consumer); + try { + startService(consumer); + route.getProperties().remove("route.start.exception"); + } catch (Exception e) { + route.getProperties().put("route.start.exception", e); + throw e; + } + log.info("Route: " + route.getId() + " started and consuming from: " + endpoint); } @@ -3903,7 +4006,13 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon routeService.resume(); } else { // and start the route service (no need to start children as they are already warmed up) - routeService.start(false); + try { + routeService.start(false); + route.getProperties().remove("route.start.exception"); + } catch (Exception e) { + route.getProperties().put("route.start.exception", e); + throw e; + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java index 89342b8..869b660 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java @@ -40,6 +40,8 @@ import org.apache.camel.processor.Pipeline; import org.apache.camel.spi.Contract; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteController; +import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ObjectHelper; @@ -71,6 +73,8 @@ public class DefaultRouteContext implements RouteContext { private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>(); private ShutdownRoute shutdownRoute; private ShutdownRunningTask shutdownRunningTask; + private RouteError routeError; + private RouteController routeController; public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) { this.camelContext = camelContext; @@ -442,4 +446,23 @@ public class DefaultRouteContext implements RouteContext { return routePolicyList; } + @Override + public RouteError getLastError() { + return routeError; + } + + @Override + public void setLastError(RouteError routeError) { + this.routeError = routeError; + } + + @Override + public RouteController getRouteController() { + return routeController; + } + + @Override + public void setRouteController(RouteController routeController) { + this.routeController = routeController; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java new file mode 100644 index 0000000..ff51cf3 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Experimental; +import org.apache.camel.Route; +import org.apache.camel.spi.RouteController; + +@Experimental +public class DefaultRouteController extends org.apache.camel.support.ServiceSupport implements RouteController { + private final List<Route> routes; + private CamelContext camelContext; + + public DefaultRouteController() { + this(null); + } + + public DefaultRouteController(CamelContext camelContext) { + this.camelContext = camelContext; + this.routes = new ArrayList<>(); + } + + // *************************************************** + // Properties + // *************************************************** + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + // *************************************************** + // Life cycle + // *************************************************** + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + + // *************************************************** + // Route management + // *************************************************** + + @Override + public void startRoute(String routeId) throws Exception { + camelContext.startRoute(routeId); + } + + @Override + public void stopRoute(String routeId) throws Exception { + camelContext.stopRoute(routeId); + } + + @Override + public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + camelContext.stopRoute(routeId, timeout, timeUnit); + } + + @Override + public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { + return camelContext.stopRoute(routeId, timeout, timeUnit, abortAfterTimeout); + } + + @Override + public void suspendRoute(String routeId) throws Exception { + camelContext.suspendRoute(routeId); + } + + @Override + public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + camelContext.suspendRoute(routeId, timeout, timeUnit); + } + + @Override + public void resumeRoute(String routeId) throws Exception { + camelContext.resumeRoute(routeId); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java new file mode 100644 index 0000000..0f80594 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.Route; +import org.apache.camel.spi.RouteError; + +public class DefaultRouteError implements RouteError { + private final RouteError.Phase phase; + private final Throwable throwable; + + public DefaultRouteError(Phase phase, Throwable throwable) { + this.phase = phase; + this.throwable = throwable; + } + + @Override + public Phase getPhase() { + return phase; + } + + @Override + public Throwable getException() { + return throwable; + } + + @Override + public String toString() { + return "DefaultRouteError{" + + "phase=" + phase + + ", throwable=" + throwable + + '}'; + } + + // *********************************** + // Helpers + // *********************************** + + public static void set(CamelContext context, String routeId, RouteError.Phase phase, Throwable throwable) { + Route route = context.getRoute(routeId); + if (route != null) { + route.getRouteContext().setLastError(new DefaultRouteError(phase, throwable)); + } + } + + public static void reset(CamelContext context, String routeId) { + Route route = context.getRoute(routeId); + if (route != null) { + route.getRouteContext().setLastError(null); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/RouteService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java index 634f4cd..eb731da 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java +++ b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java @@ -306,6 +306,7 @@ public class RouteService extends ChildServiceSupport { routePolicy.onRemove(route); } } + // fire event EventHelper.notifyRouteRemoved(camelContext, route); } http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java new file mode 100644 index 0000000..1f92653 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Collection; +import java.util.Comparator; +import java.util.EventObject; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Experimental; +import org.apache.camel.Route; +import org.apache.camel.ServiceStatus; +import org.apache.camel.StartupListener; +import org.apache.camel.management.event.CamelContextStartedEvent; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.RouteController; +import org.apache.camel.spi.RoutePolicy; +import org.apache.camel.spi.RoutePolicyFactory; +import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.util.backoff.BackOff; +import org.apache.camel.util.backoff.BackOffContext; +import org.apache.camel.util.backoff.BackOffTimer; +import org.apache.camel.util.function.ThrowingConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple implementation of the {@link RouteController} that delays the startup + * of the routes after the camel context startup and retries to start failing routes. + * + * NOTE: this is experimental/unstable. + */ +@Experimental +public class SupervisingRouteController extends DefaultRouteController { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisingRouteController.class); + private final Object lock; + private final AtomicBoolean contextStarted; + private final Set<Route> startedRoutes; + private final Set<Route> stoppedRoutes; + private final CamelContextStartupListener listener; + private final RouteManager routeManager; + private BackOffTimer timer; + private ScheduledExecutorService executorService; + private BackOff defaultBackOff; + private Map<String, BackOff> backOffConfigurations; + + public SupervisingRouteController() { + final Comparator<Route> comparator = Comparator.comparing( + route -> Optional.ofNullable(route.getRouteContext().getRoute().getStartupOrder()).orElse(Integer.MIN_VALUE) + ); + + this.lock = new Object(); + this.contextStarted = new AtomicBoolean(false); + this.stoppedRoutes = new TreeSet<>(comparator); + this.startedRoutes = new TreeSet<>(comparator.reversed()); + this.routeManager = new RouteManager(); + this.defaultBackOff = BackOff.builder().build(); + this.backOffConfigurations = new HashMap<>(); + + try { + this.listener = new CamelContextStartupListener(); + this.listener.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // ********************************* + // Properties + // ********************************* + + public BackOff getDefaultBackOff() { + return defaultBackOff; + } + + public void setDefaultBackOff(BackOff defaultBackOff) { + this.defaultBackOff = defaultBackOff; + } + + public Map<String, BackOff> getBackOffConfigurations() { + return backOffConfigurations; + } + + public void setBackOffConfigurations(Map<String, BackOff> backOffConfigurations) { + this.backOffConfigurations = backOffConfigurations; + } + + public BackOff getBackOff(String id) { + return backOffConfigurations.getOrDefault(id, defaultBackOff); + } + + public void setBackOff(String id, BackOff backOff) { + backOffConfigurations.put(id, backOff); + } + + // ********************************* + // Lifecycle + // ********************************* + + @Override + protected void doStart() throws Exception { + final CamelContext context = getCamelContext(); + + context.setAutoStartup(false); + context.addRoutePolicyFactory(new ManagedRoutePolicyFactory()); + context.addStartupListener(this.listener); + context.getManagementStrategy().addEventNotifier(this.listener); + + executorService = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "SupervisingRouteController"); + timer = new BackOffTimer(executorService); + } + + @Override + protected void doStop() throws Exception { + if (getCamelContext() != null && executorService != null) { + getCamelContext().getExecutorServiceManager().shutdown(executorService); + executorService = null; + timer = null; + } + } + + @Override + protected void doShutdown() throws Exception { + if (getCamelContext() != null) { + getCamelContext().getManagementStrategy().removeEventNotifier(listener); + } + } + + // ********************************* + // Route management + // ********************************* + + @Override + public void startRoute(String routeId) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStartRoute(context, route, true, r -> super.startRoute(routeId)); + } + + @Override + public void stopRoute(String routeId) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStopRoute(context, route, true, r -> super.stopRoute(routeId)); + } + + @Override + public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStopRoute(context, route, true, r -> super.stopRoute(r.getId(), timeout, timeUnit)); + } + + @Override + public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + final AtomicBoolean result = new AtomicBoolean(false); + + if (route == null) { + return false; + } + + doStopRoute(context, route, true, r -> result.set(super.stopRoute(r.getId(), timeout, timeUnit, abortAfterTimeout))); + + return result.get(); + } + + @Override + public void suspendRoute(String routeId) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStopRoute(context, route, true, r -> super.suspendRoute(r.getId())); + } + + @Override + public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStopRoute(context, route, true, r -> super.suspendRoute(r.getId(), timeout, timeUnit)); + } + + @Override + public void resumeRoute(String routeId) throws Exception { + final CamelContext context = getCamelContext(); + final Route route = context.getRoute(routeId); + + if (route == null) { + return; + } + + doStartRoute(context, route, true, r -> super.startRoute(routeId)); + } + + // ********************************* + // Helpers + // ********************************* + + private void doStopRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception { + synchronized (lock) { + if (checker) { + // remove them from checked routes so they don't get started by the + // routes check task as a manual operation on the routes indicates that + // the route is then managed manually + routeManager.release(route); + } + + ServiceStatus status = context.getRouteStatus(route.getId()); + if (!status.isStoppable()) { + LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); + return; + } + + consumer.accept(route); + + startedRoutes.remove(route); + stoppedRoutes.add(route); + + // Mark the route as un-managed + route.getRouteContext().setRouteController(null); + } + } + + private void doStartRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception { + synchronized (lock) { + ServiceStatus status = context.getRouteStatus(route.getId()); + if (!status.isStartable()) { + LOGGER.debug("Route {} status is {}, skipping", route.getId(), status); + return; + } + + try { + // remove the route from any queue + stoppedRoutes.remove(route); + startedRoutes.remove(route); + + if (checker) { + routeManager.release(route); + } + + // Mark the route as managed + route.getRouteContext().setRouteController(this); + + consumer.accept(route); + + // route started successfully + startedRoutes.add(route); + } catch (Exception e) { + + if (checker) { + // if start fails the route is moved to controller supervision + // so its get (eventually) restarted + routeManager.start(route); + } + + throw e; + } + } + } + + private void startRoutes() { + if (!isRunAllowed()) { + return; + } + + List<String> routes; + + synchronized (lock) { + routes = stoppedRoutes.stream().map(Route::getId).collect(Collectors.toList()); + } + + for (String route: routes) { + try { + startRoute(route); + } catch (Exception e) { + // ignored, exception handled by startRoute + } + } + } + + private synchronized void stopRoutes() { + List<String> routes; + + synchronized (lock) { + routes = startedRoutes.stream().map(Route::getId).collect(Collectors.toList()); + } + + for (String route: routes) { + try { + stopRoute(route); + } catch (Exception e) { + // ignored, exception handled by stopRoute + } + } + } + + // ********************************* + // RouteChecker + // ********************************* + + private class RouteManager { + private final Logger logger; + private final ConcurrentMap<Route, CompletableFuture<BackOffContext>> routes; + + RouteManager() { + this.logger = LoggerFactory.getLogger(RouteManager.class); + this.routes = new ConcurrentHashMap<>(); + } + + void start(Route route) { + route.getRouteContext().setRouteController(SupervisingRouteController.this); + + final CamelContext camelContext = getCamelContext(); + + routes.computeIfAbsent( + route, + r -> { + BackOff backOff = getBackOff(r.getId()); + + logger.info("Start supervising route: {} with back-off: {}", r.getId(), backOff); + + // Return this future as cancel does not have effect on the + // computation (future chain) + CompletableFuture<BackOffContext> future = timer.schedule(backOff, context -> { + try { + logger.info("Try to restart route: {}", r.getId()); + + doStartRoute(camelContext, r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId())); + return false; + } catch (Exception e) { + return true; + } + }); + + future.whenComplete((context, throwable) -> { + if (context == null || context.isExhausted()) { + // This indicates that the future has been cancelled + // or that back-off retry is exhausted thus if the + // route is not started it is moved out of the supervisor. + + if (context != null && context.isExhausted()) { + LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made", route.getId()); + } + + synchronized (lock) { + ServiceStatus status = camelContext.getRouteStatus(route.getId()); + + if (status.isStopped() || status.isStopping()) { + LOGGER.info("Route {} has status {}, stop supervising it", route.getId(), status); + + r.getRouteContext().setRouteController(null); + stoppedRoutes.add(r); + } else if (status.isStarted() || status.isStarting()) { + synchronized (lock) { + startedRoutes.add(r); + } + } + } + } + + routes.remove(r); + }); + + return future; + } + ); + } + + boolean release(Route route) { + CompletableFuture<BackOffContext> future = routes.remove(route); + if (future != null) { + future.cancel(true); + } + + return future != null; + } + + void clear() { + routes.forEach((k, v) -> v.cancel(true)); + routes.clear(); + } + + boolean isSupervising(Route route) { + return routes.containsKey(route); + } + + Collection<Route> routes() { + return routes.keySet(); + } + } + + private boolean isSupervising(Route route) { + synchronized (lock) { + return stoppedRoutes.contains(route) || startedRoutes.contains(route) || routeManager.isSupervising(route); + } + } + + // ********************************* + // Policies + // ********************************* + + private class ManagedRoutePolicyFactory implements RoutePolicyFactory { + private final RoutePolicy policy = new ManagedRoutePolicy(); + + @Override + public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) { + return policy; + } + } + + private class ManagedRoutePolicy implements RoutePolicy { + @Override + public void onInit(Route route) { + route.getRouteContext().setRouteController(SupervisingRouteController.this); + route.getRouteContext().getRoute().setAutoStartup("false"); + + if (contextStarted.get()) { + LOGGER.debug("Context is started: add route {} to startable routes", route.getId()); + try { + SupervisingRouteController.this.doStartRoute( + getCamelContext(), + route, + true, + r -> SupervisingRouteController.super.startRoute(r.getId()) + ); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + LOGGER.debug("Context is not started: add route {} to stopped routes", route.getId()); + stoppedRoutes.add(route); + } + } + + @Override + public void onRemove(Route route) { + } + + @Override + public void onStart(Route route) { + } + + @Override + public void onStop(Route route) { + } + + @Override + public void onSuspend(Route route) { + } + + @Override + public void onResume(Route route) { + } + + @Override + public void onExchangeBegin(Route route, Exchange exchange) { + // NO-OP + } + + @Override + public void onExchangeDone(Route route, Exchange exchange) { + // NO-OP + } + } + + private class CamelContextStartupListener extends EventNotifierSupport implements StartupListener { + @Override + public void notify(EventObject event) throws Exception { + onCamelContextStarted(); + } + + @Override + public boolean isEnabled(EventObject event) { + return event instanceof CamelContextStartedEvent; + } + + @Override + public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + if (alreadyStarted) { + // Invoke it only if the context was already started as this + // method is not invoked at last event as documented but after + // routes warm-up so this is useful for routes deployed after + // the camel context has been started-up. For standard routes + // configuration the notification of the camel context started + // is provided by EventNotifier. + // + // We should check why this callback is not invoked at latest + // stage, or maybe rename it as it is misleading and provide a + // better alternative for intercept camel events. + onCamelContextStarted(); + } + } + + private void onCamelContextStarted() { + // Start managing the routes only when the camel context is started + // so start/stop of managed routes do not clash with CamelContext + // startup + if (contextStarted.compareAndSet(false, true)) { + startRoutes(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index a977350..42e7fa6 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -49,6 +49,7 @@ import org.apache.camel.model.ModelHelper; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.XmlLineNumberParser; @@ -215,28 +216,28 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - context.startRoute(getRouteId()); + context.getRouteController().startRoute(getRouteId()); } public void stop() throws Exception { if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - context.stopRoute(getRouteId()); + context.getRouteController().stopRoute(getRouteId()); } public void stop(long timeout) throws Exception { if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS); + context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS); } public boolean stop(Long timeout, Boolean abortAfterTimeout) throws Exception { if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - return context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout); + return context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout); } public void shutdown() throws Exception { @@ -481,6 +482,16 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList } } + @Override + public Boolean getHasRouteController() { + return route.getRouteContext().getRouteController() != null; + } + + @Override + public RouteError getLastError() { + return route.getRouteContext().getLastError(); + } + /** * Used for sorting the processor mbeans accordingly to their index. */ http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java index 54e6ad0..a8cad9c 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java +++ b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.EndpointAware; +import org.apache.camel.Experimental; import org.apache.camel.Processor; import org.apache.camel.RuntimeConfiguration; import org.apache.camel.model.FromDefinition; @@ -192,4 +193,39 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware { */ int getAndIncrement(ProcessorDefinition<?> node); + /** + * Gets the last error. + * + * @return the error + */ + default RouteError getLastError() { + return null; + } + + /** + * Sets the last error. + * + * @param error the error + */ + default void setLastError(RouteError error) { + } + + /** + * Gets the {@link RouteController} for this route. + * + * @return the route controller, + */ + @Experimental + default RouteController getRouteController() { + return null; + } + + /** + * Sets the {@link RouteController} for this route. + * + * @param controller the RouteController + */ + @Experimental + default void setRouteController(RouteController controller) { + } } http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteController.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteController.java b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java new file mode 100644 index 0000000..0f25a2d --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Experimental; +import org.apache.camel.Service; + +@Experimental +public interface RouteController extends CamelContextAware, Service { + + void startRoute(String routeId) throws Exception; + + void stopRoute(String routeId) throws Exception; + + void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception; + + boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception; + + void suspendRoute(String routeId) throws Exception; + + void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception; + + void resumeRoute(String routeId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteError.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteError.java b/camel-core/src/main/java/org/apache/camel/spi/RouteError.java new file mode 100644 index 0000000..db420f4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/RouteError.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +public interface RouteError { + enum Phase { + START, + STOP, + SUSPEND, + RESUME, + SHUTDOWN, + REMOVE + } + + /** + * Gets the phase associated with the error. + * + * @return the phase. + */ + Phase getPhase(); + + /** + * Gets the error. + * + * @return the error. + */ + Throwable getException(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java index ba9b02f..a1dc3da 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java @@ -423,6 +423,21 @@ public final class ObjectHelper { } /** + * Tests whether the value is <tt>null</tt>, an empty string, an empty collection or a map + * + * @param value the value, if its a String it will be tested for text length as well + * @param supplier the supplier, the supplier to be used to get a value if value is null + */ + public static <T> T supplyIfEmpty(T value, Supplier<T> supplier) { + ObjectHelper.notNull(supplier, "Supplier"); + if (isNotEmpty(value)) { + return value; + } + + return supplier.get(); + } + + /** * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string, an empty collection or a map * * @param value the value, if its a String it will be tested for text length as well http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java new file mode 100644 index 0000000..1e3d326 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.backoff; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.util.ObjectHelper; + +public final class BackOff { + public static final long NEVER = -1L; + public static final Duration MAX_DURATION = Duration.ofMillis(Long.MAX_VALUE); + public static final Duration DEFAULT_DELAY = Duration.ofSeconds(2); + public static final double DEFAULT_MULTIPLIER = 1f; + + private Duration delay; + private Duration maxDelay; + private Duration maxElapsedTime; + private Long maxAttempts; + private Double multiplier; + + public BackOff() { + this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE, DEFAULT_MULTIPLIER); + } + + public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime, Long maxAttempts, Double multiplier) { + this.delay = ObjectHelper.supplyIfEmpty(delay, () -> DEFAULT_DELAY); + this.maxDelay = ObjectHelper.supplyIfEmpty(maxDelay, () -> MAX_DURATION); + this.maxElapsedTime = ObjectHelper.supplyIfEmpty(maxElapsedTime, () -> MAX_DURATION); + this.maxAttempts = ObjectHelper.supplyIfEmpty(maxAttempts, () -> Long.MAX_VALUE); + this.multiplier = ObjectHelper.supplyIfEmpty(multiplier, () -> DEFAULT_MULTIPLIER); + } + + // ************************************* + // Properties + // ************************************* + + public Duration getDelay() { + return delay; + } + + public void setDelay(Duration delay) { + this.delay = delay; + } + + public Duration getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } + + public Duration getMaxElapsedTime() { + return maxElapsedTime; + } + + public void setMaxElapsedTime(Duration maxElapsedTime) { + this.maxElapsedTime = maxElapsedTime; + } + + public Long getMaxAttempts() { + return maxAttempts; + } + + public void setMaxAttempts(Long maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public Double getMultiplier() { + return multiplier; + } + + public void setMultiplier(Double multiplier) { + this.multiplier = multiplier; + } + + @Override + public String toString() { + return "BackOff{" + + "delay=" + delay + + ", maxDelay=" + maxDelay + + ", maxElapsedTime=" + maxElapsedTime + + ", maxAttempts=" + maxAttempts + + ", multiplier=" + multiplier + + '}'; + } + + // ***************************************** + // Builder + // ***************************************** + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private Duration delay = BackOff.DEFAULT_DELAY; + private Duration maxDelay = BackOff.MAX_DURATION; + private Duration maxElapsedTime = BackOff.MAX_DURATION; + private Long maxAttempts = Long.MAX_VALUE; + private Double multiplier = BackOff.DEFAULT_MULTIPLIER; + + public Builder read(BackOff template) { + delay = template.delay; + maxDelay = template.maxDelay; + maxElapsedTime = template.maxElapsedTime; + maxAttempts = template.maxAttempts; + multiplier = template.multiplier; + + return this; + } + + public Builder delay(Duration delay) { + this.delay = delay; + return this; + } + + public Builder delay(long delay, TimeUnit unit) { + return delay(Duration.ofMillis(unit.toMillis(delay))); + } + + public Builder delay(long delay) { + return delay(Duration.ofMillis(delay)); + } + + public Builder maxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + return this; + } + + public Builder maxDelay(long maxDelay, TimeUnit unit) { + return maxDelay(Duration.ofMillis(unit.toMillis(maxDelay))); + } + + public Builder maxDelay(long maxDelay) { + return maxDelay(Duration.ofMillis(maxDelay)); + } + + public Builder maxElapsedTime(Duration maxElapsedTime) { + this.maxElapsedTime = maxElapsedTime; + return this; + } + + public Builder maxElapsedTime(long maxElapsedTime, TimeUnit unit) { + return maxElapsedTime(Duration.ofMillis(unit.toMillis(maxElapsedTime))); + } + + public Builder maxElapsedTime(long maxElapsedTime) { + return maxElapsedTime(Duration.ofMillis(maxElapsedTime)); + } + + public Builder maxAttempts(Long attempts) { + this.maxAttempts = attempts; + return this; + } + + public Builder multiplier(Double multiplier) { + this.multiplier = multiplier; + return this; + } + + public BackOff build() { + return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts, multiplier); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java new file mode 100644 index 0000000..502f0ad --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.backoff; + + +public final class BackOffContext { + private final BackOff backOff; + + private long currentAttempts; + private long currentDelay; + private long currentElapsedTime; + + public BackOffContext(BackOff backOff) { + this.backOff = backOff; + this.currentAttempts = 0; + this.currentDelay = backOff.getDelay().toMillis(); + this.currentElapsedTime = 0; + } + + // ************************************* + // Properties + // ************************************* + + public BackOff backOff() { + return backOff; + } + + public long getCurrentAttempts() { + return currentAttempts; + } + + public long getCurrentDelay() { + return currentDelay; + } + + public long getCurrentElapsedTime() { + return currentElapsedTime; + } + + public boolean isExhausted() { + return currentDelay == BackOff.NEVER; + } + + // ************************************* + // Impl + // ************************************* + + public long next() { + // A call to next when currentDelay is set to NEVER has no effects + // as this means that either the timer is exhausted or it has explicit + // stopped + if (currentDelay != BackOff.NEVER) { + + currentAttempts++; + + if (currentAttempts > backOff.getMaxAttempts()) { + currentDelay = BackOff.NEVER; + } else if (currentElapsedTime > backOff.getMaxElapsedTime().toMillis()) { + currentDelay = BackOff.NEVER; + } else { + if (currentDelay <= backOff.getMaxDelay().toMillis()) { + currentDelay = (long) (currentDelay * backOff().getMultiplier()); + } + + currentElapsedTime += currentDelay; + } + } + + return currentDelay; + } + + public BackOffContext reset() { + this.currentAttempts = 0; + this.currentDelay = 0; + this.currentElapsedTime = 0; + + return this; + } + + public BackOffContext stop() { + this.currentAttempts = 0; + this.currentDelay = BackOff.NEVER; + this.currentElapsedTime = 0; + + return this; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java new file mode 100644 index 0000000..d82c824 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.backoff; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.util.function.ThrowingFunction; + + +public class BackOffTimer { + private final ScheduledExecutorService scheduler; + + public BackOffTimer(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + public CompletableFuture<BackOffContext> schedule(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) { + final BackOffContext context = new BackOffContext(backOff); + final Task task = new Task(context, function); + + long delay = context.next(); + if (delay != BackOff.NEVER) { + scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); + } else { + task.complete(); + } + + return task; + } + + // **************************************** + // TimerTask + // **************************************** + + private final class Task extends CompletableFuture<BackOffContext> implements Runnable { + private final BackOffContext context; + private final ThrowingFunction<BackOffContext, Boolean, Exception> function; + + Task(BackOffContext context, ThrowingFunction<BackOffContext, Boolean, Exception> function) { + this.context = context; + this.function = function; + } + + @Override + public void run() { + if (context.isExhausted() || isDone() || isCancelled()) { + if (!isDone()) { + complete(); + } + + return; + } + + try { + if (function.apply(context)) { + long delay = context.next(); + if (context.isExhausted()) { + complete(); + } else if (!context.isExhausted() && !isDone() && !isCancelled()) { + scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } else { + complete(); + } + } catch (Exception e) { + completeExceptionally(e); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + context.stop(); + + return super.cancel(mayInterruptIfRunning); + } + + boolean complete() { + return super.complete(context); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/package.html ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/package.html b/camel-core/src/main/java/org/apache/camel/util/backoff/package.html new file mode 100644 index 0000000..1323e4d --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/backoff/package.html @@ -0,0 +1,25 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<html> +<head> +</head> +<body> + +Utility classes for BackOff. + +</body> +</html>