This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch tasks2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e4bfae2643010081b28ff1bb47bdf7e3f47ac606
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 27 16:55:02 2025 +0200

    CAMEL-22206: camel-master should not use BackOff but BackgroundTask for 
reconnector
---
 .../master/MasterComponentConfigurer.java          |  4 +-
 .../org/apache/camel/component/master/master.json  |  2 +-
 .../camel/component/master/MasterComponent.java    | 31 ++-----------
 .../camel/component/master/MasterConsumer.java     | 52 ++++++++++++++--------
 .../camel/component/master/MasterEndpoint.java     | 11 ++---
 .../component/master/MasterComponentTest.java      |  8 +++-
 6 files changed, 48 insertions(+), 60 deletions(-)

diff --git 
a/components/camel-master/src/generated/java/org/apache/camel/component/master/MasterComponentConfigurer.java
 
b/components/camel-master/src/generated/java/org/apache/camel/component/master/MasterComponentConfigurer.java
index 81a7ecfe5e7..2d0413d4bcd 100644
--- 
a/components/camel-master/src/generated/java/org/apache/camel/component/master/MasterComponentConfigurer.java
+++ 
b/components/camel-master/src/generated/java/org/apache/camel/component/master/MasterComponentConfigurer.java
@@ -28,7 +28,7 @@ public class MasterComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "backoffdelay":
         case "backOffDelay": target.setBackOffDelay(property(camelContext, 
long.class, value)); return true;
         case "backoffmaxattempts":
-        case "backOffMaxAttempts": 
target.setBackOffMaxAttempts(property(camelContext, long.class, value)); return 
true;
+        case "backOffMaxAttempts": 
target.setBackOffMaxAttempts(property(camelContext, int.class, value)); return 
true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "service": target.setService(property(camelContext, 
org.apache.camel.cluster.CamelClusterService.class, value)); return true;
@@ -46,7 +46,7 @@ public class MasterComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "backoffdelay":
         case "backOffDelay": return long.class;
         case "backoffmaxattempts":
-        case "backOffMaxAttempts": return long.class;
+        case "backOffMaxAttempts": return int.class;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return boolean.class;
         case "service": return 
org.apache.camel.cluster.CamelClusterService.class;
diff --git 
a/components/camel-master/src/generated/resources/META-INF/org/apache/camel/component/master/master.json
 
b/components/camel-master/src/generated/resources/META-INF/org/apache/camel/component/master/master.json
index 840a8e99738..0d27e7fa776 100644
--- 
a/components/camel-master/src/generated/resources/META-INF/org/apache/camel/component/master/master.json
+++ 
b/components/camel-master/src/generated/resources/META-INF/org/apache/camel/component/master/master.json
@@ -27,7 +27,7 @@
     "bridgeErrorHandler": { "index": 0, "kind": "property", "displayName": 
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Allows for bridging the consumer to the Camel routing Error Handler, which 
mean any exceptions (if possible) occurred while the Camel consumer is trying 
to pickup incoming messages, or the like [...]
     "autowiredEnabled": { "index": 1, "kind": "property", "displayName": 
"Autowired Enabled", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether autowiring is enabled. This is used for automatic autowiring options 
(the option must be marked as autowired) by looking up in the registry to find 
if there is a single instance of matching t [...]
     "backOffDelay": { "index": 2, "kind": "property", "displayName": "Back Off 
Delay", "group": "advanced", "label": "advanced", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "description": "When the master becomes leader then backoff is 
in use to repeat starting the consumer until the consumer is successfully 
started or max attempts reached. This option is the delay in millis between 
start attempts." },
-    "backOffMaxAttempts": { "index": 3, "kind": "property", "displayName": 
"Back Off Max Attempts", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": 
false, "secret": false, "description": "When the master becomes leader then 
backoff is in use to repeat starting the consumer until the consumer is 
successfully started or max attempts reached. This option is the maximum number 
of attempts to try." },
+    "backOffMaxAttempts": { "index": 3, "kind": "property", "displayName": 
"Back Off Max Attempts", "group": "advanced", "label": "advanced", "required": 
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": 
false, "secret": false, "description": "When the master becomes leader then 
backoff is in use to repeat starting the consumer until the consumer is 
successfully started or max attempts reached. This option is the maximum number 
of attempts to try." },
     "service": { "index": 4, "kind": "property", "displayName": "Service", 
"group": "advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "org.apache.camel.cluster.CamelClusterService", "deprecated": 
false, "autowired": false, "secret": false, "description": "Inject the service 
to use." },
     "serviceSelector": { "index": 5, "kind": "property", "displayName": 
"Service Selector", "group": "advanced", "label": "advanced", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.cluster.CamelClusterService.Selector", "deprecated": false, 
"autowired": false, "secret": false, "description": "Inject the service 
selector used to lookup the CamelClusterService to use." }
   },
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
index b4d8bc4e59f..95ddbd92434 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.master;
 
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -46,9 +45,7 @@ public class MasterComponent extends DefaultComponent {
     @Metadata(label = "advanced")
     private long backOffDelay = 5000;
     @Metadata(label = "advanced")
-    private long backOffMaxAttempts = 10;
-
-    private ScheduledExecutorService backOffThreadPool;
+    private int backOffMaxAttempts = 10;
 
     public MasterComponent() {
         this(null);
@@ -111,10 +108,6 @@ public class MasterComponent extends DefaultComponent {
         this.serviceSelector = serviceSelector;
     }
 
-    public ScheduledExecutorService getBackOffThreadPool() {
-        return backOffThreadPool;
-    }
-
     public long getBackOffDelay() {
         return backOffDelay;
     }
@@ -129,7 +122,7 @@ public class MasterComponent extends DefaultComponent {
         this.backOffDelay = backOffDelay;
     }
 
-    public long getBackOffMaxAttempts() {
+    public int getBackOffMaxAttempts() {
         return backOffMaxAttempts;
     }
 
@@ -139,7 +132,7 @@ public class MasterComponent extends DefaultComponent {
      *
      * This option is the maximum number of attempts to try.
      */
-    public void setBackOffMaxAttempts(long backOffMaxAttempts) {
+    public void setBackOffMaxAttempts(int backOffMaxAttempts) {
         this.backOffMaxAttempts = backOffMaxAttempts;
     }
 
@@ -153,22 +146,4 @@ public class MasterComponent extends DefaultComponent {
                     () -> new IllegalStateException("No cluster service 
found"));
         }
     }
-
-    @Override
-    protected void doStart() throws Exception {
-        if (backOffThreadPool == null) {
-            backOffThreadPool
-                    = 
getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this,
 "MasterLeaderTask");
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-
-        if (backOffThreadPool != null) {
-            
getCamelContext().getExecutorServiceManager().shutdown(backOffThreadPool);
-            backOffThreadPool = null;
-        }
-    }
 }
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 652c82e2865..603fa9d5073 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.master;
 
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
@@ -31,11 +34,11 @@ import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.resume.AdapterHelper;
 import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.backoff.BackOff;
-import org.apache.camel.util.backoff.BackOffTimer;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,11 +57,10 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
     private volatile Consumer delegatedConsumer;
     private volatile CamelClusterView view;
     private ResumeStrategy resumeStrategy;
-    private BackOffTimer timer;
+    private ScheduledExecutorService leaderPool;
 
     public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, 
CamelClusterService clusterService) {
         super(masterEndpoint, processor);
-
         this.clusterService = clusterService;
         this.masterEndpoint = masterEndpoint;
         this.delegatedEndpoint = masterEndpoint.getEndpoint();
@@ -79,16 +81,16 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
     @Override
     protected void doInit() throws Exception {
         super.doInit();
-        this.timer = 
PluginHelper.getBackOffTimerFactory(masterEndpoint.getCamelContext().getCamelContextExtension())
-                .newBackOffTimer("MasterConsumer", 
masterEndpoint.getComponent().getBackOffThreadPool());
+
+        // used for re-connecting to the database
+        leaderPool = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newSingleThreadScheduledExecutor(this, "Leadership");
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
-        ServiceHelper.startService(timer);
-
         LOG.debug("Using ClusterService instance {} (id={}, type={})", 
clusterService, clusterService.getId(),
                 clusterService.getClass().getName());
 
@@ -106,7 +108,8 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
             view = null;
         }
 
-        ServiceHelper.stopAndShutdownServices(delegatedConsumer, 
delegatedEndpoint, timer);
+        
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(leaderPool);
+        ServiceHelper.stopAndShutdownServices(delegatedConsumer, 
delegatedEndpoint);
         delegatedConsumer = null;
     }
 
@@ -135,6 +138,18 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
     // Helpers
     // **************************************
 
+    private BackgroundTask createTask() {
+        return Tasks.backgroundTask()
+                .withScheduledExecutor(leaderPool)
+                .withBudget(Budgets.iterationTimeBudget()
+                        
.withInterval(Duration.ofMillis(masterEndpoint.getComponent().getBackOffDelay()))
+                        .withInitialDelay(Duration.ofSeconds(1))
+                        
.withMaxIterations(masterEndpoint.getComponent().getBackOffMaxAttempts())
+                        .build())
+                .withName("Leadership")
+                .build();
+    }
+
     private void onLeadershipTaken() throws Exception {
         lock.lock();
         try {
@@ -146,13 +161,12 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
                 return;
             }
 
-            // start consumer using background task up till X attempts
-            long delay = masterEndpoint.getComponent().getBackOffDelay();
-            long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
-
-            
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> 
{
-                LOG.info("Leadership taken. Attempt #{} to start consumer: 
{}", task.getCurrentAttempts(),
-                        delegatedEndpoint);
+            final BackgroundTask leaderTask = createTask();
+            leaderTask.run(() -> {
+                if (!isRunAllowed()) {
+                    return false;
+                }
+                LOG.info("Leadership taken. Attempt #{} to start consumer: 
{}", leaderTask.iteration(), delegatedEndpoint);
 
                 Exception cause = null;
                 try {
@@ -179,13 +193,13 @@ public class MasterConsumer extends DefaultConsumer 
implements ResumeAware<Resum
                 }
 
                 if (cause != null) {
-                    String message = "Leadership taken. Attempt #" + 
task.getCurrentAttempts()
+                    String message = "Leadership taken. Attempt #" + 
leaderTask.iteration()
                                      + " failed to start consumer due to: " + 
cause.getMessage();
                     getExceptionHandler().handleException(message, cause);
                     return true; // retry
                 }
 
-                LOG.info("Leadership taken. Attempt #" + 
task.getCurrentAttempts() + " success. Consumer started: {}",
+                LOG.info("Leadership taken. Attempt #{} success. Consumer 
started: {}", leaderTask.iteration(),
                         delegatedEndpoint);
                 return false; // no more attempts
             });
diff --git 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
index ff52293fe4b..64e22c5ae75 100644
--- 
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
+++ 
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterEndpoint.java
@@ -34,14 +34,10 @@ import org.apache.camel.support.DefaultEndpoint;
  * Have only a single consumer in a cluster consuming from a given endpoint; 
with automatic failover if the JVM dies.
  */
 @ManagedResource(description = "Managed Master Endpoint")
-@UriEndpoint(firstVersion = "2.20.0",
-             scheme = "master",
-             syntax = "master:namespace:delegateUri",
-             consumerOnly = true,
-             title = "Master",
-             lenientProperties = true,
-             category = { Category.CLUSTERING })
+@UriEndpoint(firstVersion = "2.20.0", scheme = "master", syntax = 
"master:namespace:delegateUri", title = "Master",
+             consumerOnly = true, lenientProperties = true, category = { 
Category.CLUSTERING })
 public class MasterEndpoint extends DefaultEndpoint implements 
DelegateEndpoint {
+
     private final Endpoint delegateEndpoint;
     private final CamelClusterService clusterService;
 
@@ -56,7 +52,6 @@ public class MasterEndpoint extends DefaultEndpoint 
implements DelegateEndpoint
     public MasterEndpoint(String uri, MasterComponent component, 
CamelClusterService clusterService, String namespace,
                           String delegateUri) {
         super(uri, component);
-
         this.clusterService = clusterService;
         this.namespace = namespace;
         this.delegateUri = delegateUri;
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
index 5fa7240cf08..6e069c1e644 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/master/MasterComponentTest.java
@@ -90,11 +90,15 @@ public class MasterComponentTest {
             // Start the context after some random time so the startup order
             // changes for each test.
             
Awaitility.await().pollDelay(ThreadLocalRandom.current().nextInt(500), 
TimeUnit.MILLISECONDS)
-                    .untilAsserted(() -> 
Assertions.assertDoesNotThrow(context::start));
+                    .untilAsserted(() -> Assertions.assertDoesNotThrow(() -> {
+                        LOGGER.info("Starting node {}", id);
+                        context.start();
+                    }));
 
+            LOGGER.info("Waiting for {} events on node {}", events, id);
             contextLatch.await();
 
-            LOGGER.debug("Shutting down node {}", id);
+            LOGGER.info("Shutting down node {}", id);
             RESULTS.add(id);
 
             context.stop();

Reply via email to