This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 47c169e248248a9a2d50e826c147f9ea7bb171a2
Author: James Netherton <[email protected]>
AuthorDate: Mon Jan 5 17:31:55 2026 +0000

    CAMEL-22784: Use predictable scheduling for FileLockClusterService lock 
acquisition
---
 .../file/cluster/FileLockClusterService.java       | 13 +++-
 .../file/cluster/FileLockClusterView.java          | 71 +++++++++++++++-------
 ...FileLockClusterServiceAdvancedFailoverTest.java | 15 +++++
 .../FileLockClusterServiceBasicFailoverTest.java   | 65 +++++++++++++++++++-
 .../cluster/FileLockClusterServiceTestBase.java    | 13 +++-
 .../user-manual/modules/ROOT/pages/clustering.adoc |  4 +-
 6 files changed, 152 insertions(+), 29 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
index 3851aea2cdd6..fd7c4e7b2992 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
@@ -70,9 +70,14 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
     }
 
     /**
-     * The time to wait before starting to try to acquire lock, default 1.
+     * The time to wait before starting to try to acquire the cluster lock. 
Note that if FileLockClusterService
+     * determines no cluster members are running or cannot reliably determine 
the cluster state, the initial delay is
+     * computed from the acquireLockInterval, default 1.
      */
     public void setAcquireLockDelay(long acquireLockDelay) {
+        if (acquireLockDelay <= 0) {
+            throw new IllegalArgumentException("acquireLockDelay must be 
greater than 0");
+        }
         this.acquireLockDelay = acquireLockDelay;
     }
 
@@ -97,9 +102,13 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
     }
 
     /**
-     * The time to wait between attempts to try to acquire lock, default 10.
+     * The time to wait between attempts to try to acquire the cluster lock 
evaluated using wall-clock time. All cluster
+     * members must use the same value so leadership checks and leader 
liveness detection remain consistent, default 10.
      */
     public void setAcquireLockInterval(long acquireLockInterval) {
+        if (acquireLockInterval <= 0) {
+            throw new IllegalArgumentException("acquireLockInterval must be 
greater than 0");
+        }
         this.acquireLockInterval = acquireLockInterval;
     }
 
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index c7b5971728a3..4c24bb6ff2ac 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -30,7 +30,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -104,7 +103,7 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                 fireLeadershipChangedEvent((CamelClusterMember) null);
             }
 
-            // Attempt to pre-create cluster data files & directories. On 
failure, it will either be attempted by another cluster member or run again 
within the tryLock task loop
+            // Attempt to pre-create cluster data directories. On failure, it 
will either be attempted by another cluster member or run again within the 
tryLock task loop
             try {
                 if (!Files.exists(leaderLockPath.getParent())) {
                     Files.createDirectories(leaderLockPath.getParent());
@@ -112,22 +111,6 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
             } catch (IOException e) {
                 LOGGER.debug("Error creating directory {}", 
leaderLockPath.getParent(), e);
             }
-
-            try {
-                if (!Files.exists(leaderLockPath)) {
-                    Files.createFile(leaderLockPath);
-                }
-            } catch (IOException e) {
-                LOGGER.debug("Error creating cluster leader lock file {}", 
leaderLockPath, e);
-            }
-
-            try {
-                if (!Files.exists(leaderDataPath)) {
-                    Files.createFile(leaderDataPath);
-                }
-            } catch (IOException e) {
-                LOGGER.debug("Error creating cluster leader data file {}", 
leaderDataPath, e);
-            }
         } finally {
             // End critical section
             contextStartLock.unlock();
@@ -141,11 +124,7 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
 
         heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier();
 
-        ScheduledExecutorService executor = service.getExecutor();
-        task = executor.scheduleWithFixedDelay(this::tryLock,
-                TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), 
service.getAcquireLockDelayUnit()),
-                
TimeUnit.MILLISECONDS.convert(service.getAcquireLockInterval(), 
service.getAcquireLockIntervalUnit()),
-                TimeUnit.MILLISECONDS);
+        scheduleTryLock(true);
 
         localMember.setStatus(ClusterMemberStatus.STARTED);
     }
@@ -250,6 +229,10 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                 LOGGER.debug("Reading cluster leader state from {}", 
leaderDataPath);
                 FileLockClusterLeaderInfo latestClusterLeaderInfo = 
readClusterLeaderInfo();
                 FileLockClusterLeaderInfo previousClusterLeaderInfo = 
clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo);
+
+                // Compare the cluster leader lock interval to our own and 
warn if not in sync
+                validateAcquireLockInterval(latestClusterLeaderInfo);
+
                 // Check if we can attempt to take cluster leadership
                 if (isLeaderStale(latestClusterLeaderInfo, 
previousClusterLeaderInfo)
                         || canReclaimLeadership(latestClusterLeaderInfo)) {
@@ -294,8 +277,50 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                             reason);
                     closeLockFiles();
                 }
+                scheduleTryLock(false);
+            }
+        }
+    }
+
+    void validateAcquireLockInterval(FileLockClusterLeaderInfo 
clusterLeaderInfo) {
+        if (clusterLeaderInfo != null
+                && clusterLeaderInfo.getHeartbeatUpdateIntervalMilliseconds() 
!= acquireLockIntervalMilliseconds) {
+            LOGGER.warn(
+                    "This cluster member (cluster-member-id={}) 
acquireLockIntervalMilliseconds configuration {}ms does not match {}ms set on 
the cluster leader (cluster-member-id={}). This can lead to unpredictable 
behavior. Please ensure the configuration is set consistently for all cluster 
members.",
+                    localMember.getUuid(), acquireLockIntervalMilliseconds,
+                    clusterLeaderInfo.getHeartbeatUpdateIntervalMilliseconds(),
+                    clusterLeaderInfo.getId());
+        }
+    }
+
+    void scheduleTryLock(boolean isFirstRun) {
+        long offset = System.currentTimeMillis() % 
acquireLockIntervalMilliseconds;
+        long delay = acquireLockIntervalMilliseconds - offset;
+        if (delay <= 0) {
+            delay = acquireLockIntervalMilliseconds;
+        }
+
+        if (isFirstRun) {
+            // If it seems that other members are running, apply the user 
provided initial delay
+            if (Files.exists(leaderLockPath.getParent()) && 
Files.exists(leaderLockPath) && Files.exists(leaderDataPath)) {
+                FileLockClusterService service = 
getClusterService().unwrap(FileLockClusterService.class);
+                delay = 
TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), 
service.getAcquireLockDelayUnit());
+            }
+
+            if (delay > 30000) {
+                LOGGER.warn(
+                        "Initial acquire lock delay is high ({} ms). Consider 
reducing acquireLockIntervalMilliseconds or acquireLockDelay for faster leader 
acquisition.",
+                        delay);
             }
+
+            LOGGER.info("Waiting {}ms to attempt initial cluster leadership 
acquisition", delay);
         }
+
+        LOGGER.debug("Scheduling tryLock with delay {}ms", delay);
+
+        getClusterService().unwrap(FileLockClusterService.class)
+                .getExecutor()
+                .schedule(this::tryLock, delay, TimeUnit.MILLISECONDS);
     }
 
     boolean isLeaderStale(FileLockClusterLeaderInfo clusterLeaderInfo, 
FileLockClusterLeaderInfo previousClusterLeaderInfo) {
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
index 29c9a60536bf..11bae8215caf 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -55,6 +55,9 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
 
             clusterLeader.start();
 
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             mockEndpoint.assertIsSatisfied();
 
             AtomicReference<String> leaderId = new AtomicReference<>();
@@ -120,6 +123,10 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
             mockEndpointLeader.expectedMinimumMessageCount(1);
 
             clusterLeader.start();
+
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             clusterFollower.start();
 
             mockEndpointLeader.assertIsSatisfied();
@@ -198,6 +205,10 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
             mockEndpointLeader.expectedMinimumMessageCount(1);
 
             clusterLeader.start();
+
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             clusterFollower.start();
 
             mockEndpointLeader.assertIsSatisfied();
@@ -276,6 +287,10 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
             mockEndpointLeader.expectedMessageCount(5);
 
             clusterLeader.start();
+
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             clusterFollower.start();
 
             mockEndpointLeader.assertIsSatisfied();
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
index 5bb4da929715..8bce74e2c5c5 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
@@ -37,12 +37,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTestBase {
     @Test
     void singleClusterMemberLeaderElection() throws Exception {
+        ClusterConfig leaderConfig = new ClusterConfig();
+        leaderConfig.setAcquireLockInterval(10);
         try (CamelContext clusterLeader = createCamelContext()) {
             MockEndpoint mockEndpoint = 
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
             mockEndpoint.expectedMessageCount(5);
 
             clusterLeader.start();
 
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             mockEndpoint.assertIsSatisfied();
 
             Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() 
-> {
@@ -63,10 +68,13 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
 
     @Test
     void multiClusterMemberLeaderElection() throws Exception {
-        CamelContext clusterLeader = createCamelContext();
+        ClusterConfig leaderConfig = new ClusterConfig();
+        leaderConfig.setAcquireLockInterval(10);
+        CamelContext clusterLeader = createCamelContext(leaderConfig);
 
         ClusterConfig followerConfig = new ClusterConfig();
         followerConfig.setAcquireLockDelay(2);
+        followerConfig.setAcquireLockInterval(10);
         CamelContext clusterFollower = createCamelContext(followerConfig);
 
         try {
@@ -74,6 +82,10 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
             mockEndpointClustered.expectedMessageCount(5);
 
             clusterLeader.start();
+
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             clusterFollower.start();
 
             mockEndpointClustered.assertIsSatisfied();
@@ -118,6 +130,10 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
             mockEndpointClustered.expectedMessageCount(5);
 
             clusterLeader.start();
+
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             clusterFollower.start();
 
             mockEndpointClustered.assertIsSatisfied();
@@ -174,6 +190,9 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
 
             clusterLeader.start();
 
+            Awaitility.await().atMost(Duration.ofSeconds(30))
+                    .until(() -> 
getClusterView(clusterLeader).getLocalMember().isLeader());
+
             mockEndpoint.assertIsSatisfied();
 
             AtomicReference<String> leaderId = new AtomicReference<>();
@@ -219,6 +238,50 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
         assertEquals(0, Files.size(dataFile));
     }
 
+    @Test
+    void negativeAcquireLockDelayThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setAcquireLockDelay(-1);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void zeroAcquireLockDelayThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setAcquireLockDelay(0);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void negativeAcquireLockIntervalThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setAcquireLockInterval(-1);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void zeroAcquireLockIntervalThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setAcquireLockInterval(0);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
     @Test
     void negativeHeartbeatTimeoutMultiplierThrowsException() {
         ClusterConfig config = new ClusterConfig();
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
index b7f7bfbbde55..526ae16af716 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
@@ -24,6 +24,7 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.component.master.MasterComponent;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
@@ -48,6 +49,7 @@ abstract class FileLockClusterServiceTestBase {
 
     protected CamelContext createCamelContext(ClusterConfig config) throws 
Exception {
         CamelContext context = new DefaultCamelContext();
+        context.getComponent("master", 
MasterComponent.class).setBackOffDelay(1000);
         context.addService(createFileLockClusterService(config));
         context.addRoutes(new RouteBuilder() {
             @Override
@@ -64,7 +66,7 @@ abstract class FileLockClusterServiceTestBase {
     protected FileLockClusterService 
createFileLockClusterService(ClusterConfig config) {
         FileLockClusterService service = new FileLockClusterService();
         service.setAcquireLockDelay(config.getAcquireLockDelay());
-        service.setAcquireLockInterval(1);
+        service.setAcquireLockInterval(config.getAcquireLockInterval());
         service.setRoot(clusterDir.toString());
         
service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier());
         
service.setClusterDataTaskMaxAttempts(config.getClusterDataTaskMaxAttempts());
@@ -83,6 +85,7 @@ abstract class FileLockClusterServiceTestBase {
 
     static final class ClusterConfig {
         private long acquireLockDelay = 1;
+        private long acquireLockInterval = 2;
         private long timerRepeatCount = 5;
         private int heartbeatTimeoutMultiplier = 5;
         private int clusterDataTaskMaxAttempts = 5;
@@ -96,6 +99,14 @@ abstract class FileLockClusterServiceTestBase {
             this.acquireLockDelay = acquireLockDelay;
         }
 
+        public long getAcquireLockInterval() {
+            return acquireLockInterval;
+        }
+
+        public void setAcquireLockInterval(long acquireLockInterval) {
+            this.acquireLockInterval = acquireLockInterval;
+        }
+
         long getTimerRepeatCount() {
             return timerRepeatCount;
         }
diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc 
b/docs/user-manual/modules/ROOT/pages/clustering.adoc
index 322443e064c1..29d70b10ba76 100644
--- a/docs/user-manual/modules/ROOT/pages/clustering.adoc
+++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc
@@ -61,9 +61,9 @@ Configuration options:
 [options="header", cols="15,55,15,15"]
 |===
 | Name | Description | Default | Type
-| acquireLockDelay | The time to wait before starting to try to acquire the 
cluster lock | 1 | long
+| acquireLockDelay | The time to wait before starting to try to acquire the 
cluster lock. Note that if FileLockClusterService determines no cluster members 
are running or cannot reliably determine the cluster state, the initial delay 
is computed from the acquireLockInterval | 1 | long
 | acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | 
TimeUnit
-| acquireLockInterval | The time to wait between attempts to try to acquire 
the cluster lock | 10 | long
+| acquireLockInterval | The time to wait between attempts to try to acquire 
the cluster lock evaluated using wall-clock time. All cluster members must use 
the same value so leadership checks and leader liveness detection remain 
consistent | 10 | long
 | acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | 
TimeUnit
 | clusterDataTaskMaxAttempts | Sets how many times a cluster data task will 
run, counting both the first execution and subsequent retries in case of 
failure or timeout. This can be useful when the cluster data root is on network 
based file storage, where I/O operations may occasionally block for long or 
unpredictable periods | 5 | int
 | clusterDataTaskTimeout | Sets the timeout for a cluster data task (reading 
or writing cluster data). Timeouts are useful when the cluster data root is on 
network storage, where I/O operations may occasionally block for long or 
unpredictable periods | 10 | long

Reply via email to