This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 292d999766c CAMEL-22154: camel-core - BackOffTask make it possible to manage and observe (#18430) 292d999766c is described below commit 292d999766c9dfe53cfbed762f31886f19647490 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jun 23 10:00:13 2025 +0200 CAMEL-22154: camel-core - BackOffTask make it possible to manage and observe (#18430) * CAMEL-22154: camel-core - BackOffTask make it possible to manage and observe --- .../apache/camel/catalog/dev-consoles.properties | 1 + .../apache/camel/catalog/dev-consoles/backoff.json | 15 ++ .../camel/component/master/MasterConsumer.java | 16 +- .../camel/component/pgevent/PgEventConsumer.java | 18 +- .../consumer/SimpleMessageListenerContainer.java | 13 +- .../org/apache/camel/spi/BackOffTimerFactory.java | 53 ++++++ .../camel/support/service/ServiceHelper.java | 1 + .../camel/impl/engine/AbstractCamelContext.java | 4 + .../camel/impl/engine/DefaultBackOffTimer.java | 94 ++++++++++ .../impl/engine/DefaultBackOffTimerFactory.java | 47 +++++ .../engine/DefaultSupervisingRouteController.java | 27 +-- .../camel/impl/engine/SimpleCamelContext.java | 6 + .../org/apache/camel/dev-console/backoff.json | 15 ++ .../services/org/apache/camel/dev-console/backoff | 2 + .../org/apache/camel/dev-consoles.properties | 2 +- .../camel/impl/console/BackOffDevConsole.java | 93 ++++++++++ .../camel/impl/console/RouteControllerConsole.java | 2 +- .../api/management/mbean/CamelOpenMBeanTypes.java | 19 ++ .../management/mbean/ManagedBackoffTimerMBean.java | 35 ++++ .../management/JmxManagementLifecycleStrategy.java | 4 + .../management/mbean/ManagedBackoffTimer.java | 85 +++++++++ .../camel/management/ManagedBackOffTimerTest.java | 113 ++++++++++++ .../org/apache/camel/support/PluginHelper.java | 9 + .../main/java/org/apache/camel/util/TimeUtils.java | 31 +++- .../org/apache/camel/util/backoff/BackOff.java | 29 ++- .../apache/camel/util/backoff/BackOffTimer.java | 61 ++++--- .../camel/util/backoff/BackOffTimerTask.java | 40 ++++- .../camel/util/backoff/SimpleBackOffTimer.java | 96 ++++++++++ .../org/apache/camel/util/backoff/BackOffTest.java | 8 +- ...fTimerTest.java => SimpleBackOffTimerTest.java} | 74 +++++++- .../ROOT/pages/camel-4x-upgrade-guide-4_13.adoc | 15 ++ .../camel/cli/connector/LocalCliConnector.java | 7 + .../dsl/jbang/core/commands/CamelJBangMain.java | 1 + .../jbang/core/commands/process/ListBackOff.java | 195 +++++++++++++++++++++ .../DependencyDownloaderPropertiesComponent.java | 3 +- 35 files changed, 1161 insertions(+), 73 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties index e33c068a992..37744abd89c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties @@ -1,6 +1,7 @@ aws-secrets aws2-s3 azure-secrets +backoff bean bean-model blocked diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json new file mode 100644 index 00000000000..8cae12a4e92 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "backoff", + "title": "BackOff", + "description": "Display information about BackOff tasks", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.BackOffDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.13.0-SNAPSHOT" + } +} + diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index ded0576fe2b..652c82e2865 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -31,6 +31,7 @@ import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.PluginHelper; import org.apache.camel.support.resume.AdapterHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.backoff.BackOff; @@ -53,6 +54,7 @@ public class MasterConsumer extends DefaultConsumer implements ResumeAware<Resum private volatile Consumer delegatedConsumer; private volatile CamelClusterView view; private ResumeStrategy resumeStrategy; + private BackOffTimer timer; public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) { super(masterEndpoint, processor); @@ -74,10 +76,19 @@ public class MasterConsumer extends DefaultConsumer implements ResumeAware<Resum this.resumeStrategy = resumeStrategy; } + @Override + protected void doInit() throws Exception { + super.doInit(); + this.timer = PluginHelper.getBackOffTimerFactory(masterEndpoint.getCamelContext().getCamelContextExtension()) + .newBackOffTimer("MasterConsumer", masterEndpoint.getComponent().getBackOffThreadPool()); + } + @Override protected void doStart() throws Exception { super.doStart(); + ServiceHelper.startService(timer); + LOG.debug("Using ClusterService instance {} (id={}, type={})", clusterService, clusterService.getId(), clusterService.getClass().getName()); @@ -92,12 +103,10 @@ public class MasterConsumer extends DefaultConsumer implements ResumeAware<Resum if (view != null) { view.removeEventListener(leadershipListener); clusterService.releaseView(view); - view = null; } - ServiceHelper.stopAndShutdownServices(delegatedConsumer, delegatedEndpoint); - + ServiceHelper.stopAndShutdownServices(delegatedConsumer, delegatedEndpoint, timer); delegatedConsumer = null; } @@ -141,7 +150,6 @@ public class MasterConsumer extends DefaultConsumer implements ResumeAware<Resum long delay = masterEndpoint.getComponent().getBackOffDelay(); long max = masterEndpoint.getComponent().getBackOffMaxAttempts(); - BackOffTimer timer = new BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool()); timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> { LOG.info("Leadership taken. Attempt #{} to start consumer: {}", task.getCurrentAttempts(), delegatedEndpoint); diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index 2b0509fb0b4..e5d52ef9db1 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -26,6 +26,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.backoff.BackOff; import org.apache.camel.util.backoff.BackOffTimer; import org.slf4j.Logger; @@ -56,8 +58,7 @@ public class PgEventConsumer extends DefaultConsumer { } @Override - protected void doStart() throws Exception { - super.doStart(); + protected void doInit() throws Exception { if (endpoint.getWorkerPool() != null) { workerPool = endpoint.getWorkerPool(); } else { @@ -67,8 +68,15 @@ public class PgEventConsumer extends DefaultConsumer { // used for re-connecting to the database reconnectPool = getEndpoint().getCamelContext().getExecutorServiceManager() .newSingleThreadScheduledExecutor(this, "Reconnector"); - timer = new BackOffTimer(reconnectPool); + timer = PluginHelper.getBackOffTimerFactory(endpoint.getCamelContext().getCamelContextExtension()) + .newBackOffTimer("PgEventConsumer", reconnectPool); + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(timer); listener.initConnection(); + super.doStart(); } @Override @@ -76,12 +84,12 @@ public class PgEventConsumer extends DefaultConsumer { super.doStop(); listener.closeConnection(); getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(reconnectPool); - timer = null; if (shutdownWorkerPool && workerPool != null) { LOG.debug("Shutting down PgEventConsumer worker threads with timeout {} millis", 10000); endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, 10000); workerPool = null; } + ServiceHelper.stopService(timer); } public class PgEventListener implements PGNotificationListener { @@ -89,7 +97,7 @@ public class PgEventConsumer extends DefaultConsumer { public void reconnect() { BackOff bo = BackOff.builder().delay(endpoint.getReconnectDelay()).build(); timer.schedule(bo, t -> { - LOG.debug("Connecting attempt #" + t.getCurrentAttempts()); + LOG.debug("Connecting attempt #{}", t.getCurrentAttempts()); try { initConnection(); } catch (Exception e) { diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java index cdc902c8998..bcf9c56fc91 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java @@ -35,6 +35,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.component.sjms.SessionMessageListener; import org.apache.camel.component.sjms.SjmsEndpoint; import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.backoff.BackOff; import org.apache.camel.util.backoff.BackOffTimer; @@ -63,6 +65,7 @@ public class SimpleMessageListenerContainer extends ServiceSupport private Set<MessageConsumer> consumers; private Set<Session> sessions; private BackOffTimer.Task recoverTask; + private BackOffTimer timer; private ScheduledExecutorService scheduler; public SimpleMessageListenerContainer(SjmsEndpoint endpoint) { @@ -219,7 +222,13 @@ public class SimpleMessageListenerContainer extends ServiceSupport // we need to recover using a background task if (recoverTask == null || recoverTask.getStatus() != BackOffTimer.Task.Status.Active) { BackOff backOff = BackOff.builder().delay(endpoint.getRecoveryInterval()).build(); - recoverTask = new BackOffTimer(scheduler).schedule(backOff, this::recoverConnection); + if (timer == null) { + timer = PluginHelper.getBackOffTimerFactory(endpoint.getCamelContext().getCamelContextExtension()) + .newBackOffTimer("SjmsConnectionRecovery", + scheduler); + ServiceHelper.startService(timer); + } + recoverTask = timer.schedule(backOff, this::recoverConnection); } } @@ -242,6 +251,8 @@ public class SimpleMessageListenerContainer extends ServiceSupport endpoint.getCamelContext().getExecutorServiceManager().shutdown(scheduler); scheduler = null; } + ServiceHelper.stopService(timer); + timer = null; } @Override diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java new file mode 100644 index 00000000000..497f084582a --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.util.backoff.BackOffTimer; + +/** + * Factory for creating {@link BackOffTimer}. + * + * @see org.apache.camel.util.backoff.BackOff + */ +public interface BackOffTimerFactory { + + /** + * Creates a new {@link BackOffTimer}. + * + * Important: The timer should be started and stopped to control its lifecycle by using + * {@link org.apache.camel.support.service.ServiceHelper}. + * + * @param name logical name of the timer + * @return new empty backoff timer + */ + BackOffTimer newBackOffTimer(String name); + + /** + * Creates a new {@link BackOffTimer} using the given executor service. + * + * Important: The timer should be started and stopped to control its lifecycle by using + * {@link org.apache.camel.support.service.ServiceHelper}. + * + * @param name logical name of the timer + * @param scheduler thread pool to use for running tasks + * @return new empty backoff timer + */ + BackOffTimer newBackOffTimer(String name, ScheduledExecutorService scheduler); + +} diff --git a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java index 1c464db6af9..2c613f4a882 100644 --- a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java +++ b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; * A collection of helper methods for working with {@link Service} objects. */ public final class ServiceHelper { + private static final Logger LOG = LoggerFactory.getLogger(ServiceHelper.class); /** diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 0fddc3ac0ac..e1b7d794251 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -87,6 +87,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogDebugger; import org.apache.camel.spi.AnnotationBasedProcessorFactory; import org.apache.camel.spi.AnnotationScanTypeConverters; import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.BackOffTimerFactory; import org.apache.camel.spi.BacklogDebugger; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; @@ -384,6 +385,7 @@ public abstract class AbstractCamelContext extends BaseService camelContextExtension.lazyAddContextPlugin(AnnotationBasedProcessorFactory.class, this::createAnnotationBasedProcessorFactory); camelContextExtension.lazyAddContextPlugin(DumpRoutesStrategy.class, this::createDumpRoutesStrategy); + camelContextExtension.lazyAddContextPlugin(BackOffTimerFactory.class, this::createBackOffTimerFactory); } protected static <T> T lookup(CamelContext context, String ref, Class<T> type) { @@ -4342,6 +4344,8 @@ public abstract class AbstractCamelContext extends BaseService protected abstract StartupConditionStrategy createStartupConditionStrategy(); + protected abstract BackOffTimerFactory createBackOffTimerFactory(); + protected RestConfiguration createRestConfiguration() { // lookup a global which may have been on a container such spring-boot / CDI / etc. RestConfiguration conf diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java new file mode 100644 index 00000000000..4b43296c65e --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.backoff.BackOff; +import org.apache.camel.util.backoff.BackOffTimer; +import org.apache.camel.util.backoff.BackOffTimerTask; +import org.apache.camel.util.function.ThrowingFunction; + +/** + * Default {@link BackOffTimer}. + */ +public class DefaultBackOffTimer extends ServiceSupport implements BackOffTimer { + + private final CamelContext camelContext; + private final ScheduledExecutorService scheduler; + private final String name; + private final Set<Task> tasks = new CopyOnWriteArraySet<>(); + + public DefaultBackOffTimer(CamelContext camelContext, String name, ScheduledExecutorService scheduler) { + this.camelContext = camelContext; + this.scheduler = scheduler; + this.name = name; + } + + @Override + public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean, Exception> function) { + final BackOffTimerTask task = new BackOffTimerTask(this, backOff, scheduler, function); + + long delay = task.next(); + if (delay != BackOff.NEVER) { + tasks.add(task); + scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); + } else { + task.cancel(); + } + + return task; + } + + @Override + public String getName() { + return name; + } + + @Override + public void remove(Task task) { + tasks.remove(task); + } + + @Override + public Set<Task> getTasks() { + return Collections.unmodifiableSet(tasks); + } + + @Override + public int size() { + return tasks.size(); + } + + @Override + protected void doStart() throws Exception { + camelContext.addService(this); + } + + @Override + protected void doStop() throws Exception { + tasks.clear(); + camelContext.removeService(this); + } + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java new file mode 100644 index 00000000000..005bd6dbaa1 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.BackOffTimerFactory; +import org.apache.camel.util.backoff.BackOffTimer; + +/** + * Default {@link BackOffTimerFactory}. + */ +public class DefaultBackOffTimerFactory implements BackOffTimerFactory { + + private final CamelContext camelContext; + + public DefaultBackOffTimerFactory(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public BackOffTimer newBackOffTimer(String name) { + var scheduler = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "BackOffTimer"); + return newBackOffTimer(name, scheduler); + } + + @Override + public BackOffTimer newBackOffTimer(String name, ScheduledExecutorService scheduler) { + return new DefaultBackOffTimer(camelContext, name, scheduler); + } + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index bcaaa898d1b..0b7a8c8df59 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -52,7 +52,9 @@ import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.support.EventHelper; import org.apache.camel.support.PatternHelper; +import org.apache.camel.support.PluginHelper; import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.apache.camel.util.backoff.BackOff; @@ -231,14 +233,8 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im @Override protected void doStart() throws Exception { - this.backOff = new BackOff( - Duration.ofMillis(backOffDelay), - backOffMaxDelay > 0 ? Duration.ofMillis(backOffMaxDelay) : null, - backOffMaxElapsedTime > 0 ? Duration.ofMillis(backOffMaxElapsedTime) : null, - backOffMaxAttempts > 0 ? backOffMaxAttempts : Long.MAX_VALUE, - backOffMultiplier); - CamelContext context = getCamelContext(); + if (threadPoolSize == 1) { executorService = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "SupervisingRouteController"); @@ -246,16 +242,25 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im executorService = context.getExecutorServiceManager().newScheduledThreadPool(this, "SupervisingRouteController", threadPoolSize); } - timer = new BackOffTimer(executorService); + backOff = new BackOff( + Duration.ofMillis(backOffDelay), + backOffMaxDelay > 0 ? Duration.ofMillis(backOffMaxDelay) : null, + backOffMaxElapsedTime > 0 ? Duration.ofMillis(backOffMaxElapsedTime) : null, + backOffMaxAttempts > 0 ? backOffMaxAttempts : Long.MAX_VALUE, + backOffMultiplier, false); + + timer = PluginHelper.getBackOffTimerFactory(context.getCamelContextExtension()) + .newBackOffTimer("SupervisingRouteController", executorService); + ServiceHelper.startService(timer); } @Override protected void doStop() throws Exception { - if (getCamelContext() != null && executorService != null) { + if (getCamelContext() != null) { getCamelContext().getExecutorServiceManager().shutdown(executorService); - executorService = null; - timer = null; } + executorService = null; + ServiceHelper.stopService(timer); } // ********************************* diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index e964214d1fb..715557ebbc9 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -34,6 +34,7 @@ import org.apache.camel.health.HealthCheckResolver; import org.apache.camel.impl.converter.DefaultTypeConverter; import org.apache.camel.spi.AnnotationBasedProcessorFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.BackOffTimerFactory; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; import org.apache.camel.spi.BeanProxyFactory; @@ -740,6 +741,11 @@ public class SimpleCamelContext extends AbstractCamelContext { return new DefaultStartupConditionStrategy(); } + @Override + protected BackOffTimerFactory createBackOffTimerFactory() { + return new DefaultBackOffTimerFactory(this); + } + @Override protected TransformerRegistry createTransformerRegistry() { return new DefaultTransformerRegistry(getCamelContextReference()); diff --git a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json new file mode 100644 index 00000000000..8cae12a4e92 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "backoff", + "title": "BackOff", + "description": "Display information about BackOff tasks", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.BackOffDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.13.0-SNAPSHOT" + } +} + diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff new file mode 100644 index 00000000000..88b4c5ca27b --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.BackOffDevConsole diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties index cef4ebaffbb..1ff8974c70e 100644 --- a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties @@ -1,5 +1,5 @@ # Generated by camel build tools - do NOT edit this file! -dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables +dev-consoles=backoff bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables groupId=org.apache.camel artifactId=camel-console version=4.13.0-SNAPSHOT diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java new file mode 100644 index 00000000000..4b8f03f3e26 --- /dev/null +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.console; + +import java.util.Map; +import java.util.Set; + +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.backoff.BackOffTimer; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; + +@DevConsole(name = "backoff", displayName = "BackOff", description = "Display information about BackOff tasks") +public class BackOffDevConsole extends AbstractDevConsole { + + public BackOffDevConsole() { + super("camel", "backoff", "BackOff", "Display information about BackOff tasks"); + } + + @Override + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + + Set<BackOffTimer> timers = getCamelContext().hasServices(BackOffTimer.class); + for (BackOffTimer timer : timers) { + sb.append(String.format("\nTimer: %s", timer.getName())); + sb.append(String.format("\nTasks: %s", timer.size())); + int id = 0; + for (BackOffTimer.Task task : timer.getTasks()) { + String failure = task.getException() != null ? task.getException().getMessage() : ""; + sb.append(String.format( + "\n #%d (name=%s status=%s attempts=%d delay=%d elapsed=%d first=%d last=%d next=%d failure=%s", + id, task.getName(), task.getStatus().name(), task.getCurrentAttempts(), task.getCurrentDelay(), + task.getCurrentElapsedTime(), task.getFirstAttemptTime(), task.getLastAttemptTime(), + task.getNextAttemptTime(), failure)); + id++; + } + } + return sb.toString(); + } + + @Override + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + JsonArray arr = new JsonArray(); + root.put("timers", arr); + + Set<BackOffTimer> timers = getCamelContext().hasServices(BackOffTimer.class); + for (BackOffTimer timer : timers) { + JsonObject jo = new JsonObject(); + jo.put("name", timer.getName()); + jo.put("size", timer.size()); + arr.add(jo); + if (timer.size() > 0) { + JsonArray arr2 = new JsonArray(); + jo.put("tasks", arr2); + for (BackOffTimer.Task task : timer.getTasks()) { + String failure = task.getException() != null ? task.getException().getMessage() : ""; + JsonObject jo2 = new JsonObject(); + jo2.put("name", task.getName()); + jo2.put("status", task.getStatus().name()); + jo2.put("attempts", task.getCurrentAttempts()); + jo2.put("delay", task.getCurrentDelay()); + jo2.put("elapsed", task.getCurrentElapsedTime()); + jo2.put("firstTime", task.getFirstAttemptTime()); + jo2.put("lastTime", task.getLastAttemptTime()); + jo2.put("nextTime", task.getNextAttemptTime()); + if (failure != null) { + jo2.put("error", failure); + } + arr2.add(jo2); + } + } + } + return root; + } + +} diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java index 6555d865ca4..2dc6167ee8c 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java @@ -119,7 +119,7 @@ public class RouteControllerConsole extends AbstractDevConsole { sb.append(String.format("\n %s %s (%s) ", status, routeId, uri)); sb.append(String.format("\n Supervising: %s", supervising)); sb.append(String.format("\n Attempts: %s", attempts)); - sb.append(String.format("\n Last Ago: %s", last)); + sb.append(String.format("\n Last: %s", last)); sb.append(String.format("\n Next Attempt: %s", next)); sb.append(String.format("\n Elapsed: %s", elapsed)); if (error != null) { diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index 6a9fde81336..bdc658ca333 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -327,4 +327,23 @@ public final class CamelOpenMBeanTypes { return new TabularType("variables", "Variables", ct, new String[] { "id", "key" }); } + public static TabularType listBackoffTTaskTabularType() throws OpenDataException { + CompositeType ct = listBackoffTaskCompositeType(); + return new TabularType( + "listBackoff", "Lists all the backoff tasks", ct, + new String[] { "name" }); + } + + public static CompositeType listBackoffTaskCompositeType() throws OpenDataException { + return new CompositeType( + "tasks", "Tasks", + new String[] { + "name", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", "failure" }, + new String[] { + "Name", "Status", "Attempts", "Delay", "Elapsed", "FirstTime", "LastTime", "NextTime", "Failure" }, + new OpenType[] { + SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }); + } + } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java new file mode 100644 index 00000000000..0d0812a08ba --- /dev/null +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import javax.management.openmbean.TabularData; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; + +public interface ManagedBackoffTimerMBean { + + @ManagedAttribute(description = "Name of the backoff timer") + String getName(); + + @ManagedAttribute(description = "Number of total tasks") + Integer getSize(); + + @ManagedOperation(description = "Lists all the tasks") + TabularData listTasks(); + +} diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index 0f058a2e009..9661bb1dfd6 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -54,6 +54,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogDebugger; import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; import org.apache.camel.management.mbean.ManagedBacklogDebugger; import org.apache.camel.management.mbean.ManagedBacklogTracer; +import org.apache.camel.management.mbean.ManagedBackoffTimer; import org.apache.camel.management.mbean.ManagedBeanIntrospection; import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedConsumerCache; @@ -118,6 +119,7 @@ import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.backoff.BackOffTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -577,6 +579,8 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li answer = new ManagedValidatorRegistry(context, validatorRegistry); } else if (service instanceof BrowsableVariableRepository variableRepository) { answer = new ManagedVariableRepository(context, variableRepository); + } else if (service instanceof BackOffTimer timer) { + answer = new ManagedBackoffTimer(camelContext, timer); } else if (service instanceof CamelClusterService camelClusterService) { answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, camelClusterService); } else if (service != null) { diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java new file mode 100644 index 00000000000..8e2f35753f9 --- /dev/null +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.Set; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.camel.CamelContext; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.Service; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; +import org.apache.camel.api.management.mbean.ManagedBackoffTimerMBean; +import org.apache.camel.util.backoff.BackOffTimer; + +@ManagedResource(description = "Managed BackoffTimer") +public class ManagedBackoffTimer extends ManagedService implements ManagedBackoffTimerMBean { + + private final BackOffTimer timer; + + public ManagedBackoffTimer(CamelContext context, BackOffTimer timer) { + super(context, (Service) timer); + this.timer = timer; + } + + @Override + public String getName() { + return timer.getName(); + } + + @Override + public Integer getSize() { + return timer.size(); + } + + @Override + public TabularData listTasks() { + try { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listBackoffTTaskTabularType()); + Set<BackOffTimer.Task> tasks = timer.getTasks(); + for (BackOffTimer.Task task : tasks) { + String name = task.getName(); + String status = task.getStatus().name(); + long attempts = task.getCurrentAttempts(); + long delay = task.getCurrentDelay(); + long elapsed = task.getCurrentElapsedTime(); + long firstTime = task.getFirstAttemptTime(); + long lastTime = task.getLastAttemptTime(); + long nextTime = task.getNextAttemptTime(); + String failure = task.getException() != null ? task.getException().getMessage() : null; + CompositeType ct = CamelOpenMBeanTypes.listBackoffTaskCompositeType(); + CompositeData data = new CompositeDataSupport( + ct, + new String[] { + "name", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", + "failure" }, + new Object[] { name, status, attempts, delay, elapsed, firstTime, lastTime, nextTime, failure }); + answer.put(data); + } + return answer; + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } + +} diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java new file mode 100644 index 00000000000..0aae5ca2d9c --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.backoff.BackOff; +import org.apache.camel.util.backoff.BackOffTimer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@DisabledOnOs(OS.AIX) +public class ManagedBackOffTimerTest extends ManagementTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testManageBackOffTimer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + + BackOffTimer timer = PluginHelper.getBackOffTimerFactory(context.getCamelContextExtension()) + .newBackOffTimer("Cheese"); + ServiceHelper.startService(timer); + + context.start(); + + // get the bean introspection for the route + MBeanServer mbeanServer = getMBeanServer(); + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=services,*"), null); + List<ObjectName> list = new ArrayList<>(set); + ObjectName on = null; + for (ObjectName name : list) { + if (name.getCanonicalName().contains("DefaultBackOffTimer")) { + on = name; + break; + } + } + + assertNotNull(on, "Should have found DefaultBackOffTimer"); + + String name = (String) mbeanServer.getAttribute(on, "Name"); + assertEquals("Cheese", name); + + final BackOff backOff = BackOff.builder().delay(100).removeOnComplete(false).build(); + final AtomicLong first = new AtomicLong(); + + BackOffTimer.Task task = timer.schedule( + backOff, + context -> { + assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); + assertEquals(100, context.getCurrentDelay()); + assertEquals(100L * counter.get(), context.getCurrentElapsedTime()); + if (first.get() == 0) { + first.set(context.getFirstAttemptTime()); + } else { + assertEquals(first.get(), context.getFirstAttemptTime()); + } + + return counter.get() < 5; + }); + + task.whenComplete( + (context, throwable) -> { + assertEquals(5, counter.get()); + latch.countDown(); + }); + + latch.await(5, TimeUnit.SECONDS); + + Integer size = (Integer) mbeanServer.getAttribute(on, "Size"); + assertEquals(1, size); + + TabularData data = (TabularData) mbeanServer.invoke(on, "listTasks", null, null); + assertEquals(1, data.size()); + + ServiceHelper.stopService(timer); + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java index 0abc1187795..2c2c776cc78 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java @@ -25,6 +25,7 @@ import org.apache.camel.console.DevConsoleResolver; import org.apache.camel.health.HealthCheckResolver; import org.apache.camel.spi.AnnotationBasedProcessorFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.BackOffTimerFactory; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; import org.apache.camel.spi.BeanProxyFactory; @@ -575,4 +576,12 @@ public final class PluginHelper { ExtendedCamelContext extendedCamelContext) { return extendedCamelContext.getContextPlugin(AnnotationBasedProcessorFactory.class); } + + /** + * Gets the {@link BackOffTimerFactory} to use. + */ + public static BackOffTimerFactory getBackOffTimerFactory( + ExtendedCamelContext extendedCamelContext) { + return extendedCamelContext.getContextPlugin(BackOffTimerFactory.class); + } } diff --git a/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java b/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java index d689b8c5434..df1d3c4dcfa 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java @@ -36,10 +36,10 @@ public final class TimeUtils { } /** - * Prints the since ago in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. + * Prints since age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. * * @param time time of the event (millis since epoch) - * @return ago in human-readable since the given time. + * @return age in human-readable since the given time. */ public static String printSince(long time) { long age = System.currentTimeMillis() - time; @@ -47,15 +47,38 @@ public final class TimeUtils { } /** - * Prints the ago in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. + * Prints since age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. + * + * @param time time of the event (millis since epoch) + * @param precise whether to be precise and include more details + * @return age in human-readable since the given time. + */ + public static String printSince(long time, boolean precise) { + long age = System.currentTimeMillis() - time; + return printDuration(age, precise); + } + + /** + * Prints the age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. * * @param age age in millis - * @return ago in human-readable. + * @return age in human-readable. */ public static String printAge(long age) { return printDuration(age, false); } + /** + * Prints the age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, as seen on Kubernetes etc. + * + * @param age age in millis + * @param precise whether to be precise and include more details + * @return age in human-readable. + */ + public static String printAge(long age, boolean precise) { + return printDuration(age, precise); + } + /** * Prints the duration in a human-readable format as 9s, 27m44s, 3h12m, 3d8h, etc. * diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java index edbe4667908..fef421b678f 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java @@ -35,17 +35,20 @@ public final class BackOff { private Duration maxElapsedTime; private Long maxAttempts; private Double multiplier; + private boolean removeOnComplete; public BackOff() { - this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE, DEFAULT_MULTIPLIER); + this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE, DEFAULT_MULTIPLIER, true); } - public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime, Long maxAttempts, Double multiplier) { + public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime, Long maxAttempts, Double multiplier, + boolean removeOnComplete) { this.delay = ObjectHelper.supplyIfEmpty(delay, () -> DEFAULT_DELAY); this.maxDelay = ObjectHelper.supplyIfEmpty(maxDelay, () -> MAX_DURATION); this.maxElapsedTime = ObjectHelper.supplyIfEmpty(maxElapsedTime, () -> MAX_DURATION); this.maxAttempts = ObjectHelper.supplyIfEmpty(maxAttempts, () -> Long.MAX_VALUE); this.multiplier = ObjectHelper.supplyIfEmpty(multiplier, () -> DEFAULT_MULTIPLIER); + this.removeOnComplete = removeOnComplete; } // ************************************* @@ -109,6 +112,17 @@ public final class BackOff { this.multiplier = multiplier; } + public boolean isRemoveOnComplete() { + return removeOnComplete; + } + + /** + * Should the task be removed from the timer when its complete successfully. + */ + public void setRemoveOnComplete(boolean removeOnComplete) { + this.removeOnComplete = removeOnComplete; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(256); @@ -126,6 +140,7 @@ public final class BackOff { if (multiplier != DEFAULT_MULTIPLIER) { sb.append(", multiplier=").append(multiplier); } + sb.append(", remove=").append(removeOnComplete); sb.append("]"); return sb.toString(); } @@ -151,6 +166,7 @@ public final class BackOff { private Duration maxElapsedTime = BackOff.MAX_DURATION; private Long maxAttempts = Long.MAX_VALUE; private Double multiplier = BackOff.DEFAULT_MULTIPLIER; + private boolean removeOnComplete = true; /** * Read values from the given {@link BackOff} @@ -161,7 +177,7 @@ public final class BackOff { maxElapsedTime = template.maxElapsedTime; maxAttempts = template.maxAttempts; multiplier = template.multiplier; - + removeOnComplete = template.removeOnComplete; return this; } @@ -214,11 +230,16 @@ public final class BackOff { return this; } + public Builder removeOnComplete(boolean removeOnComplete) { + this.removeOnComplete = removeOnComplete; + return this; + } + /** * Build a new instance of {@link BackOff} */ public BackOff build() { - return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts, multiplier); + return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts, multiplier, removeOnComplete); } } } diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java index 6dc1138cdbd..de789ca63f9 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java @@ -16,8 +16,7 @@ */ package org.apache.camel.util.backoff; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.Set; import java.util.function.BiConsumer; import org.apache.camel.util.function.ThrowingFunction; @@ -25,40 +24,55 @@ import org.apache.camel.util.function.ThrowingFunction; /** * A simple timer utility that use a linked {@link BackOff} to determine when a task should be executed. */ -public class BackOffTimer { - private final ScheduledExecutorService scheduler; +public interface BackOffTimer { - public BackOffTimer(ScheduledExecutorService scheduler) { - this.scheduler = scheduler; - } + /** + * Schedules a task to run according to the backoff settings + * + * @param backOff the settings for how often to run the task + * @param function the function to call for each run + * @return the task + */ + Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean, Exception> function); /** - * Schedule the given function/task to be executed some time in the future according to the given backOff. + * Gets the name of this timer. */ - public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean, Exception> function) { - final BackOffTimerTask task = new BackOffTimerTask(backOff, scheduler, function); - - long delay = task.next(); - if (delay != BackOff.NEVER) { - scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); - } else { - task.cancel(); - } + String getName(); - return task; - } + /** + * Removes the task + */ + void remove(Task task); + + /** + * Access to unmodifiable set of all the tasks + */ + Set<Task> getTasks(); + + /** + * Number of tasks + */ + int size(); // **************************************** // TimerTask // **************************************** - public interface Task { + interface Task { enum Status { Active, Inactive, - Exhausted + Exhausted, + Completed, + Failed } + /** + * Name of this task + */ + String getName(); + /** * The back-off associated with this task. */ @@ -99,6 +113,11 @@ public class BackOffTimer { */ long getNextAttemptTime(); + /** + * The task failed for some un-expected exception + */ + Throwable getException(); + /** * Reset the task. */ diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java index 0d6e6bc693c..85f30d1f0fc 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java @@ -28,8 +28,9 @@ import java.util.function.BiConsumer; import org.apache.camel.util.function.ThrowingFunction; -final class BackOffTimerTask implements BackOffTimer.Task, Runnable { +public final class BackOffTimerTask implements BackOffTimer.Task, Runnable { private final Lock lock = new ReentrantLock(); + private final BackOffTimer timer; private final BackOff backOff; private final ScheduledExecutorService scheduler; private final ThrowingFunction<BackOffTimer.Task, Boolean, Exception> function; @@ -43,9 +44,11 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { private long currentElapsedTime; private long lastAttemptTime; private long nextAttemptTime; + private Throwable cause; - BackOffTimerTask(BackOff backOff, ScheduledExecutorService scheduler, - ThrowingFunction<BackOffTimer.Task, Boolean, Exception> function) { + public BackOffTimerTask(BackOffTimer timer, BackOff backOff, ScheduledExecutorService scheduler, + ThrowingFunction<BackOffTimer.Task, Boolean, Exception> function) { + this.timer = timer; this.backOff = backOff; this.scheduler = scheduler; this.status = Status.Active; @@ -66,6 +69,11 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { // Properties // ***************************** + @Override + public String getName() { + return timer.getName(); + } + @Override public BackOff getBackOff() { return backOff; @@ -106,15 +114,21 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { return nextAttemptTime; } + @Override + public Throwable getException() { + return cause; + } + @Override public void reset() { this.currentAttempts = 0; this.currentDelay = 0; this.currentElapsedTime = 0; - this.firstAttemptTime = 0; + this.firstAttemptTime = BackOff.NEVER; this.lastAttemptTime = BackOff.NEVER; this.nextAttemptTime = BackOff.NEVER; this.status = Status.Active; + this.cause = null; } @Override @@ -128,12 +142,20 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { // signal task completion on cancel. complete(null); + + // the task is cancelled and should not be restarted so remove from timer + if (timer != null) { + timer.remove(this); + } } @Override public void whenComplete(BiConsumer<BackOffTimer.Task, Throwable> whenCompleted) { lock.lock(); try { + if (backOff.isRemoveOnComplete()) { + timer.remove(this); + } consumers.add(whenCompleted); } finally { lock.unlock(); @@ -148,7 +170,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { public void run() { if (status == Status.Active) { try { - lastAttemptTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + lastAttemptTime = System.currentTimeMillis(); if (firstAttemptTime < 0) { firstAttemptTime = lastAttemptTime; } @@ -169,6 +191,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { } else { stop(); + status = Status.Completed; // if the function return false no more attempts should // be made so stop the context. complete(null); @@ -176,6 +199,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { } catch (Exception e) { stop(); + status = Status.Failed; complete(e); } } @@ -192,6 +216,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { } void complete(Throwable throwable) { + this.cause = throwable; lock.lock(); try { consumers.forEach(c -> c.accept(this, throwable)); @@ -208,7 +233,7 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { * Return the number of milliseconds to wait before retrying the operation or ${@link BackOff#NEVER} to indicate * that no further attempt should be made. */ - long next() { + public long next() { // A call to next when currentDelay is set to NEVER has no effects // as this means that either the timer is exhausted or it has explicit // stopped @@ -237,7 +262,8 @@ final class BackOffTimerTask implements BackOffTimer.Task, Runnable { @Override public String toString() { return "BackOffTimerTask[" - + "status=" + status + + "name=" + timer.getName() + + ", status=" + status + ", currentAttempts=" + currentAttempts + ", currentDelay=" + currentDelay + ", currentElapsedTime=" + currentElapsedTime diff --git a/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java b/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java new file mode 100644 index 00000000000..b98c8e5a26f --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.backoff; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.util.function.ThrowingFunction; + +/** + * A simple timer utility that use a linked {@link BackOff} to determine when a task should be executed. + */ +public class SimpleBackOffTimer implements BackOffTimer, Closeable { + private final ScheduledExecutorService scheduler; + private final String name; + private final Set<BackOffTimerTask> tasks = new CopyOnWriteArraySet<>(); + + public SimpleBackOffTimer(ScheduledExecutorService scheduler) { + this("SimpleBackOffTimer", scheduler); + } + + public SimpleBackOffTimer(String name, ScheduledExecutorService scheduler) { + this.name = name; + this.scheduler = scheduler; + } + + /** + * Schedule the given function/task to be executed some time in the future according to the given backOff. + */ + public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean, Exception> function) { + final BackOffTimerTask task = new BackOffTimerTask(this, backOff, scheduler, function); + + long delay = task.next(); + if (delay != BackOff.NEVER) { + tasks.add(task); + scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); + } else { + task.cancel(); + } + + return task; + } + + /** + * Gets the name of this timer. + */ + public String getName() { + return name; + } + + /** + * Removes the task + */ + public void remove(Task task) { + tasks.remove(task); + } + + /** + * Access to unmodifiable set of all the tasks + */ + public Set<Task> getTasks() { + return Collections.unmodifiableSet(tasks); + } + + /** + * Number of tasks + */ + public int size() { + return tasks.size(); + } + + /** + * Stops and closes this timer. + */ + public void close() { + tasks.clear(); + } +} diff --git a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java index 1f8edf635df..4e9105d681c 100644 --- a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java +++ b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java @@ -27,7 +27,7 @@ public class BackOffTest { @Test public void testSimpleBackOff() { final BackOff backOff = BackOff.builder().build(); - final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t -> true); + final BackOffTimerTask context = new BackOffTimerTask(null, backOff, null, t -> true); long delay; @@ -43,7 +43,7 @@ public class BackOffTest { @Test public void testBackOffWithMultiplier() { final BackOff backOff = BackOff.builder().multiplier(1.5).build(); - final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t -> true); + final BackOffTimerTask context = new BackOffTimerTask(null, backOff, null, t -> true); long delay = BackOff.DEFAULT_DELAY.toMillis(); long oldDelay; @@ -64,7 +64,7 @@ public class BackOffTest { @Test public void testBackOffWithMaxAttempts() { final BackOff backOff = BackOff.builder().maxAttempts(5L).build(); - final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t -> true); + final BackOffTimerTask context = new BackOffTimerTask(null, backOff, null, t -> true); long delay; @@ -84,7 +84,7 @@ public class BackOffTest { @Test public void testBackOffWithMaxTime() { final BackOff backOff = BackOff.builder().maxElapsedTime(9, TimeUnit.SECONDS).build(); - final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t -> true); + final BackOffTimerTask context = new BackOffTimerTask(null, backOff, null, t -> true); long delay; diff --git a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java b/core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java similarity index 66% rename from core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java rename to core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java index 810a3d6e662..634578742de 100644 --- a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java +++ b/core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java @@ -29,15 +29,15 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class BackOffTimerTest { +public class SimpleBackOffTimerTest { @Test public void testBackOffTimer() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); - final BackOff backOff = BackOff.builder().delay(100).build(); - final BackOffTimer timer = new BackOffTimer(executor); + final BackOff backOff = BackOff.builder().delay(100).removeOnComplete(false).build(); + final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor); final AtomicLong first = new AtomicLong(); BackOffTimer.Task task = timer.schedule( @@ -63,6 +63,10 @@ public class BackOffTimerTest { latch.await(5, TimeUnit.SECONDS); executor.shutdownNow(); + + assertEquals(1, timer.size()); + assertEquals(BackOffTimer.Task.Status.Completed, timer.getTasks().iterator().next().getStatus()); + timer.close(); } @Test @@ -70,8 +74,8 @@ public class BackOffTimerTest { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); - final BackOff backOff = BackOff.builder().delay(100).maxAttempts(5L).build(); - final BackOffTimer timer = new BackOffTimer(executor); + final BackOff backOff = BackOff.builder().delay(100).maxAttempts(5L).removeOnComplete(false).build(); + final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor); BackOffTimer.Task task = timer.schedule( backOff, @@ -92,6 +96,10 @@ public class BackOffTimerTest { latch.await(5, TimeUnit.SECONDS); executor.shutdownNow(); + + assertEquals(1, timer.size()); + assertEquals(BackOffTimer.Task.Status.Exhausted, timer.getTasks().iterator().next().getStatus()); + timer.close(); } @Test @@ -99,8 +107,8 @@ public class BackOffTimerTest { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); - final BackOff backOff = BackOff.builder().delay(100).maxElapsedTime(400).build(); - final BackOffTimer timer = new BackOffTimer(executor); + final BackOff backOff = BackOff.builder().delay(100).maxElapsedTime(400).removeOnComplete(false).build(); + final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor); BackOffTimer.Task task = timer.schedule( backOff, @@ -121,6 +129,10 @@ public class BackOffTimerTest { latch.await(5, TimeUnit.SECONDS); executor.shutdownNow(); + + assertEquals(1, timer.size()); + assertEquals(BackOffTimer.Task.Status.Exhausted, timer.getTasks().iterator().next().getStatus()); + timer.close(); } @Test @@ -128,8 +140,8 @@ public class BackOffTimerTest { final CountDownLatch latch = new CountDownLatch(5); final AtomicBoolean done = new AtomicBoolean(); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); - final BackOff backOff = BackOff.builder().delay(100).build(); - final BackOffTimer timer = new BackOffTimer(executor); + final BackOff backOff = BackOff.builder().delay(100).removeOnComplete(false).build(); + final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor); BackOffTimer.Task task = timer.schedule( backOff, @@ -148,9 +160,53 @@ public class BackOffTimerTest { }); latch.await(2, TimeUnit.SECONDS); + assertEquals(1, timer.size()); + assertEquals(BackOffTimer.Task.Status.Completed, timer.getTasks().iterator().next().getStatus()); task.cancel(); + assertEquals(0, timer.size()); assertTrue(done.get()); executor.shutdownNow(); + + timer.close(); } + + @Test + public void testBackOffTimerRemoveOnComplete() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3); + final BackOff backOff = BackOff.builder().delay(100).removeOnComplete(true).build(); + final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor); + final AtomicLong first = new AtomicLong(); + + BackOffTimer.Task task = timer.schedule( + backOff, + context -> { + assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); + assertEquals(100, context.getCurrentDelay()); + assertEquals(100L * counter.get(), context.getCurrentElapsedTime()); + if (first.get() == 0) { + first.set(context.getFirstAttemptTime()); + } else { + assertEquals(first.get(), context.getFirstAttemptTime()); + } + + return counter.get() < 5; + }); + + task.whenComplete( + (context, throwable) -> { + assertEquals(5, counter.get()); + latch.countDown(); + }); + + latch.await(5, TimeUnit.SECONDS); + executor.shutdownNow(); + + // task is removed + assertEquals(0, timer.size()); + timer.close(); + } + } diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc index a78051592d6..bdf9eb9ad88 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc @@ -11,6 +11,21 @@ from both 4.0 to 4.1 and 4.1 to 4.2. Added a 2nd `lookup` method to `org.apache.camel.spi.TypeConverterRegistry` and changed the `addConverter` to no longer have an empty default noop implementation in the interface. +The class `org.apache.camel.util.backoff.BackOffTimer` has been refactored as an interface, +and the basic implementation is `org.apache.camel.util.backoff.SimpleBackOffTimer` in `camel-util` JAR. + +To get hold of a `BackOffTimer` then use the new factory as shown below: + +[source,java] +---- +BackOffTimer timer = PluginHelper.getBackOffTimerFactory(camelContext.getCamelContextExtension()) + .newBackOffTimer("NameOfTimer", executorService); +---- + +The `BackOffTimer` is mostly used internally in Camel components to conduct tasks that should +be repeated until completed, such as recovery tasks. And as such this refactor is not +expected to impact Camel end users. + === camel-file / camel-ftp / camel-smb / camel-azure-files When using `poll` or `pollEnrich` with the file based components, then the `eagerLimitMaxMessagesPerPoll` option diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index eb2434e0c1a..7ee908eabb6 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -1058,6 +1058,13 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C root.put("receive", json); } } + DevConsole dc24 = dcr.resolveById("backoff"); + if (dc24 != null) { + JsonObject json = (JsonObject) dc24.call(DevConsole.MediaType.JSON); + if (json != null && !json.isEmpty()) { + root.put("backoff", json); + } + } } // various details JsonObject mem = collectMemory(); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index b18d6cd299d..3d628dfa5f4 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -108,6 +108,7 @@ public class CamelJBangMain implements Callable<Integer> { .addSubcommand("event", new CommandLine(new ListEvent(main))) .addSubcommand("inflight", new CommandLine(new ListInflight(main))) .addSubcommand("blocked", new CommandLine(new ListBlocked(main))) + .addSubcommand("backoff", new CommandLine(new ListBackOff(main))) .addSubcommand("bean", new CommandLine(new CamelBeanDump(main))) .addSubcommand("route-controller", new CommandLine(new RouteControllerAction(main))) .addSubcommand("transformer", new CommandLine(new ListTransformer(main))) diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java new file mode 100644 index 00000000000..3e17c8a378e --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.process; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.github.freva.asciitable.AsciiTable; +import com.github.freva.asciitable.Column; +import com.github.freva.asciitable.HorizontalAlign; +import com.github.freva.asciitable.OverflowBehaviour; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates; +import org.apache.camel.dsl.jbang.core.common.ProcessHelper; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +@Command(name = "backoff", + description = "Get back-off tasks of Camel integrations", sortOptions = false, showDefaultValues = true) +public class ListBackOff extends ProcessWatchCommand { + + @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") + String name = "*"; + + @CommandLine.Option(names = { "--sort" }, completionCandidates = PidNameAgeCompletionCandidates.class, + description = "Sort by pid, name or age", defaultValue = "pid") + String sort; + + public ListBackOff(CamelJBangMain main) { + super(main); + } + + @Override + public Integer doProcessWatchCall() throws Exception { + List<Row> rows = new ArrayList<>(); + + List<Long> pids = findPids(name); + ProcessHandle.allProcesses() + .filter(ph -> pids.contains(ph.pid())) + .forEach(ph -> { + JsonObject root = loadStatus(ph.pid()); + // there must be a status file for the running Camel integration + if (root != null) { + Row row = new Row(); + JsonObject context = (JsonObject) root.get("context"); + if (context == null) { + return; + } + row.name = context.getString("name"); + if ("CamelJBang".equals(row.name)) { + row.name = ProcessHelper.extractName(root, ph); + } + row.pid = Long.toString(ph.pid()); + row.uptime = extractSince(ph); + row.age = TimeUtils.printSince(row.uptime); + + JsonObject jo = (JsonObject) root.get("backoff"); + if (jo != null) { + JsonArray arr = (JsonArray) jo.get("timers"); + if (arr != null) { + for (int i = 0; i < arr.size(); i++) { + jo = (JsonObject) arr.get(i); + JsonArray arr2 = (JsonArray) jo.get("tasks"); + for (int j = 0; j < arr2.size(); j++) { + jo = (JsonObject) arr2.get(j); + row = row.copy(); + row.task = jo.getString("name"); + row.status = jo.getString("status"); + row.attempts = jo.getLong("attempts"); + row.delay = jo.getLong("delay"); + row.elapsed = jo.getLong("elapsed"); + row.firstTime = jo.getLong("firstTime"); + row.lastTime = jo.getLong("lastTime"); + row.nextTime = jo.getLong("nextTime"); + row.error = jo.getString("error"); + rows.add(row); + } + } + } + } + } + }); + + // sort rows + rows.sort(this::sortRow); + + if (!rows.isEmpty()) { + printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( + new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), + new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(r -> r.name), + new Column().header("TASK").dataAlign(HorizontalAlign.LEFT).with(r -> r.task), + new Column().header("STATUS").dataAlign(HorizontalAlign.LEFT).with(r -> r.status), + new Column().header("ATTEMPT").dataAlign(HorizontalAlign.LEFT).with(r -> "" + r.attempts), + new Column().header("DELAY").dataAlign(HorizontalAlign.LEFT).with(r -> "" + r.delay), + new Column().header("ELAPSED").dataAlign(HorizontalAlign.LEFT).with(this::getElapsed), + new Column().header("FIRST").dataAlign(HorizontalAlign.LEFT).with(this::getFirst), + new Column().header("LAST").dataAlign(HorizontalAlign.LEFT).with(this::getLast), + new Column().header("NEXT").dataAlign(HorizontalAlign.LEFT).with(this::getNext), + new Column().header("FAILURE").dataAlign(HorizontalAlign.LEFT) + .maxWidth(140, OverflowBehaviour.NEWLINE) + .with(r -> r.error)))); + } + + return 0; + } + + private String getElapsed(Row r) { + return TimeUtils.printAge(r.elapsed); + } + + private String getFirst(Row r) { + if (r.firstTime > 0) { + return TimeUtils.printSince(r.firstTime); + } + return ""; + } + + private String getLast(Row r) { + if (r.lastTime > 0) { + return TimeUtils.printSince(r.lastTime, true); + } + return ""; + } + + private String getNext(Row r) { + if (r.nextTime > 0) { + long age = r.nextTime - System.currentTimeMillis(); + return TimeUtils.printDuration(age, true); + } + return ""; + } + + protected int sortRow(Row o1, Row o2) { + String s = sort; + int negate = 1; + if (s.startsWith("-")) { + s = s.substring(1); + negate = -1; + } + switch (s) { + case "pid": + return Long.compare(Long.parseLong(o1.pid), Long.parseLong(o2.pid)) * negate; + case "name": + return o1.name.compareToIgnoreCase(o2.name) * negate; + case "age": + return Long.compare(o1.uptime, o2.uptime) * negate; + default: + return 0; + } + } + + private static class Row implements Cloneable { + String pid; + String name; + long uptime; + String age; + String task; + String status; + long attempts; + long delay; + long elapsed; + long firstTime; + long lastTime; + long nextTime; + String error; + + Row copy() { + try { + return (Row) clone(); + } catch (CloneNotSupportedException e) { + return null; + } + } + } + +} diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java index b348bb97013..56dec5ef545 100644 --- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java @@ -22,6 +22,7 @@ import java.util.Properties; import org.apache.camel.CamelContext; import org.apache.camel.Expression; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.StaticService; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.Language; import org.apache.camel.support.DefaultExchange; @@ -33,7 +34,7 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DependencyDownloaderPropertiesComponent extends ServiceSupport { +public class DependencyDownloaderPropertiesComponent extends ServiceSupport implements StaticService { private static final Logger LOG = LoggerFactory.getLogger(DependencyDownloaderPropertiesComponent.class);