This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.14.x by this push:
new 3a7bd3a3fdd [4.14.x] CAMEL-22430: Make FileClusterService more
resilient to split-brain scenarios (#19281)
3a7bd3a3fdd is described below
commit 3a7bd3a3fdd9bc5ac93ff37c4972990ede834b18
Author: James Netherton <[email protected]>
AuthorDate: Mon Sep 22 19:57:02 2025 +0100
[4.14.x] CAMEL-22430: Make FileClusterService more resilient to split-brain
scenarios (#19281)
* CAMEL-22430: Make FileClusterService more resilient to split-brain
scenarios
* Add docs for cluster service configuration
---
components/camel-file/pom.xml | 6 +
.../file/cluster/FileLockClusterLeaderInfo.java | 71 +++++
.../file/cluster/FileLockClusterService.java | 22 +-
.../file/cluster/FileLockClusterUtils.java | 172 +++++++++++
.../file/cluster/FileLockClusterView.java | 246 +++++++++++----
.../file/cluster/FileLockClusterUtilsTest.java | 196 ++++++++++++
...FileLockClusterServiceAdvancedFailoverTest.java | 333 +++++++++++++++++++++
.../FileLockClusterServiceBasicFailoverTest.java | 248 +++++++++++++++
.../cluster/FileLockClusterServiceTestBase.java | 115 +++++++
.../user-manual/modules/ROOT/pages/clustering.adoc | 85 +++++-
10 files changed, 1440 insertions(+), 54 deletions(-)
diff --git a/components/camel-file/pom.xml b/components/camel-file/pom.xml
index 4763c525153..4b7556f1c7d 100644
--- a/components/camel-file/pom.xml
+++ b/components/camel-file/pom.xml
@@ -48,5 +48,11 @@
<artifactId>commons-codec</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
new file mode 100644
index 00000000000..6d83d335718
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.util.Objects;
+
+/**
+ * Holds information about a file lock cluster leader.
+ */
+final class FileLockClusterLeaderInfo {
+ private final String id;
+ private final long heartbeatUpdateIntervalNanoseconds;
+ private final long heartbeatNanoseconds;
+
+ /**
+ * Constructs a {@link FileLockClusterLeaderInfo}.
+ *
+ * @param id The unique UUID assigned to
the cluster leader
+ * @param heartbeatUpdateIntervalNanoseconds The cluster leader heartbeat
update interval value in nanoseconds
+ * @param heartbeatNanoseconds The cluster leader heartbeat
value in nanoseconds
+ */
+ FileLockClusterLeaderInfo(String id, long
heartbeatUpdateIntervalNanoseconds, long heartbeatNanoseconds) {
+ Objects.requireNonNull(id);
+ this.id = id;
+ this.heartbeatUpdateIntervalNanoseconds =
heartbeatUpdateIntervalNanoseconds;
+ this.heartbeatNanoseconds = heartbeatNanoseconds;
+ }
+
+ String getId() {
+ return id;
+ }
+
+ long getHeartbeatNanoseconds() {
+ return heartbeatNanoseconds;
+ }
+
+ long getHeartbeatUpdateIntervalNanoseconds() {
+ return heartbeatUpdateIntervalNanoseconds;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FileLockClusterLeaderInfo that = (FileLockClusterLeaderInfo) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
index e5522efdf33..e21568925f0 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
@@ -31,12 +31,14 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
private long acquireLockInterval;
private TimeUnit acquireLockIntervalUnit;
private ScheduledExecutorService executor;
+ private int heartbeatTimeoutMultiplier;
public FileLockClusterService() {
this.acquireLockDelay = 1;
this.acquireLockDelayUnit = TimeUnit.SECONDS;
this.acquireLockInterval = 10;
this.acquireLockIntervalUnit = TimeUnit.SECONDS;
+ this.heartbeatTimeoutMultiplier = 5;
}
@Override
@@ -76,7 +78,7 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
}
/**
- * The time unit fo the acquireLockDelay, default to TimeUnit.SECONDS.
+ * The time unit for the acquireLockDelay, default to TimeUnit.SECONDS.
*/
public void setAcquireLockDelayUnit(TimeUnit acquireLockDelayUnit) {
this.acquireLockDelayUnit = acquireLockDelayUnit;
@@ -103,12 +105,28 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
}
/**
- * The time unit fo the acquireLockInterva, default to TimeUnit.SECONDS.
+ * The time unit for the acquireLockInterval, default to TimeUnit.SECONDS.
*/
public void setAcquireLockIntervalUnit(TimeUnit acquireLockIntervalUnit) {
this.acquireLockIntervalUnit = acquireLockIntervalUnit;
}
+ /**
+ * Multiplier applied to the cluster leader {@code acquireLockInterval} to
determine how long followers should wait
+ * before considering the leader "stale".
+ * <p>
+ * For example, if the leader updates its heartbeat every 2 seconds and
the {@code heartbeatTimeoutMultiplier} is 3,
+ * followers will tolerate up to {@code 2s * 3 = 6s} of silence before
declaring the leader unavailable.
+ * <p>
+ */
+ public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
+ this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
+ }
+
+ public int getHeartbeatTimeoutMultiplier() {
+ return heartbeatTimeoutMultiplier;
+ }
+
@Override
protected void doStop() throws Exception {
super.doStop();
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
new file mode 100644
index 00000000000..ebda14fe104
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Objects;
+
+/**
+ * Miscellaneous utility methods for managing the file lock cluster state.
+ */
+final class FileLockClusterUtils {
+ /**
+ * Length of byte[] obtained from java.util.UUID.
+ */
+ static final int UUID_BYTE_LENGTH = 36;
+ /**
+ * The lock file buffer capacity when writing data for the cluster leader.
+ * <ul>
+ * <li>Cluster leader ID (UUID String) - 36 bytes</li>
+ * <li>Cluster leader heartbeat timestamp (long) - 8 bytes</li>
+ * <li>Cluster leader update interval (long) - 8 bytes</li>
+ * </ul>
+ */
+ static final int LOCKFILE_BUFFER_SIZE = UUID_BYTE_LENGTH + 2 * Long.BYTES;
+
+ private FileLockClusterUtils() {
+ // Utility class
+ }
+
+ /**
+ * Writes information about the state of the cluster leader to the lock
file.
+ *
+ * @param leaderDataPath The path to the lock file
+ * @param channel The file channel to write to
+ * @param clusterLeaderInfo The {@link FileLockClusterLeaderInfo}
instance where the cluster leader state is held
+ * @param forceMetaData Whether to force changes to both the file
content and metadata
+ * @throws IOException If the lock file is missing or writing data
failed
+ */
+ static void writeClusterLeaderInfo(
+ Path leaderDataPath,
+ FileChannel channel,
+ FileLockClusterLeaderInfo clusterLeaderInfo,
+ boolean forceMetaData)
+ throws IOException {
+
+ Objects.requireNonNull(channel, "channel cannot be null");
+ Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be
null");
+
+ if (!Files.exists(leaderDataPath)) {
+ throw new FileNotFoundException("Cluster leader data file " +
leaderDataPath + " not found");
+ }
+
+ String uuidStr = clusterLeaderInfo.getId();
+ byte[] uuidBytes = uuidStr.getBytes(StandardCharsets.UTF_8);
+
+ ByteBuffer buf = ByteBuffer.allocate(LOCKFILE_BUFFER_SIZE);
+ buf.put(uuidBytes);
+ buf.putLong(clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds());
+ buf.putLong(clusterLeaderInfo.getHeartbeatNanoseconds());
+ buf.flip();
+
+ if (forceMetaData) {
+ channel.truncate(0);
+ }
+
+ channel.position(0);
+ while (buf.hasRemaining()) {
+ channel.write(buf);
+ }
+ channel.force(forceMetaData);
+ }
+
+ /**
+ * Reads information about the state of the cluster leader from the lock
file.
+ *
+ * @param leaderDataPath The path to the lock file
+ * @return {@link FileLockClusterLeaderInfo} instance
representing the state of the cluster leader.
+ * {@code null} if the lock file does not exist or
reading the file content is in an
+ * inconsistent state
+ * @throws IOException If reading the lock file failed
+ */
+ static FileLockClusterLeaderInfo readClusterLeaderInfo(Path
leaderDataPath) throws IOException {
+ if (!Files.exists(leaderDataPath)) {
+ return null;
+ }
+
+ byte[] bytes = Files.readAllBytes(leaderDataPath);
+ if (bytes.length < LOCKFILE_BUFFER_SIZE) {
+ // Data is incomplete or in a transient / corrupt state
+ return null;
+ }
+
+ // Parse the cluster leader data
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ byte[] uuidBytes = new byte[UUID_BYTE_LENGTH];
+ buf.get(uuidBytes);
+
+ String uuidStr = new String(uuidBytes, StandardCharsets.UTF_8);
+ long intervalNanos = buf.getLong();
+ long lastHeartbeat = buf.getLong();
+
+ return new FileLockClusterLeaderInfo(uuidStr, intervalNanos,
lastHeartbeat);
+ }
+
+ /**
+ * Determines whether the current cluster leader is stale. Typically, when
the leader has not updated the cluster
+ * lock file within acceptable bounds.
+ *
+ * @param latestClusterLeaderInfo The {@link FileLockClusterLeaderInfo}
instance representing the latest cluster
+ * leader state
+ * @param previousClusterLeaderInfo The {@link FileLockClusterLeaderInfo}
instance representing the previously
+ * recorded cluster leader state
+ * @param currentNanoTime The current time in nanoseconds, as
returned by {@link System#nanoTime()} is
+ * held
+ * @return {@code true} if the leader is
considered stale. {@code false} if the leader is
+ * still active
+ */
+ static boolean isLeaderStale(
+ FileLockClusterLeaderInfo latestClusterLeaderInfo,
+ FileLockClusterLeaderInfo previousClusterLeaderInfo,
+ long currentNanoTime,
+ int heartbeatTimeoutMultiplier) {
+
+ if (latestClusterLeaderInfo == null) {
+ return true;
+ }
+
+ // Cluster leader changed since last observation so assume not stale
+ if (!latestClusterLeaderInfo.equals(previousClusterLeaderInfo)) {
+ return false;
+ }
+
+ final long latestHeartbeat =
latestClusterLeaderInfo.getHeartbeatNanoseconds();
+ final long previousObservedHeartbeat =
previousClusterLeaderInfo.getHeartbeatNanoseconds();
+
+ if (latestHeartbeat > previousObservedHeartbeat) {
+ // Not stale. Cluster leader is alive and updating the lock file
+ return false;
+ }
+
+ if (latestHeartbeat < previousObservedHeartbeat) {
+ // Heartbeat somehow went backwards, maybe due to stale data
+ return true;
+ }
+
+ // Check if cluster leader has updated the lock file within acceptable
limits
+ final long elapsed = currentNanoTime - previousObservedHeartbeat;
+ final long heartbeatUpdateIntervalNanoseconds =
latestClusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds();
+ final long timeout = heartbeatUpdateIntervalNanoseconds * (long)
heartbeatTimeoutMultiplier;
+ return elapsed > timeout;
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index a2e9ad9dfd7..0fe775ae582 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.file.cluster;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
@@ -25,12 +26,13 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.support.cluster.AbstractCamelClusterView;
@@ -39,20 +41,24 @@ import org.slf4j.LoggerFactory;
public class FileLockClusterView extends AbstractCamelClusterView {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileLockClusterView.class);
-
- private static final Lock LOCK = new ReentrantLock();
private final ClusterMember localMember;
- private final Path path;
- private RandomAccessFile lockFile;
+ private final Path leaderLockPath;
+ private final Path leaderDataPath;
+ private final AtomicReference<FileLockClusterLeaderInfo>
clusterLeaderInfoRef = new AtomicReference<>();
+ private RandomAccessFile leaderLockFile;
+ private RandomAccessFile leaderDataFile;
private FileLock lock;
private ScheduledFuture<?> task;
+ private int heartbeatTimeoutMultiplier;
+ private long acquireLockIntervalDelayNanoseconds;
FileLockClusterView(FileLockClusterService cluster, String namespace) {
super(cluster, namespace);
+ Objects.requireNonNull(cluster.getRoot(), "FileLockClusterService root
directory must be specified");
this.localMember = new ClusterMember();
- this.path = Paths.get(cluster.getRoot(), namespace);
-
+ this.leaderLockPath = Paths.get(cluster.getRoot(), namespace);
+ this.leaderDataPath = Paths.get(cluster.getRoot(), namespace + ".dat");
}
@Override
@@ -67,63 +73,105 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
@Override
public List<CamelClusterMember> getMembers() {
- // It may be useful to lock only a region of the file an then have
views
+ // It may be useful to lock only a region of the file and then have
views
// appending their id to the file on different regions so we can
// have a list of members. Root/Header region that is used for locking
- // purpose may also contains the lock holder.
+ // purpose may also contain the lock holder.
return Collections.emptyList();
}
@Override
protected void doStart() throws Exception {
- if (lockFile != null) {
+ if (leaderLockFile != null) {
closeInternal();
-
fireLeadershipChangedEvent((CamelClusterMember) null);
}
- if (!Files.exists(path.getParent())) {
- Files.createDirectories(path.getParent());
+ if (!Files.exists(leaderLockPath.getParent())) {
+ Files.createDirectories(leaderLockPath.getParent());
+ }
+
+ if (!Files.exists(leaderLockPath)) {
+ Files.createFile(leaderLockPath);
+ }
+
+ if (!Files.exists(leaderDataPath)) {
+ Files.createFile(leaderDataPath);
}
FileLockClusterService service =
getClusterService().unwrap(FileLockClusterService.class);
- ScheduledExecutorService executor = service.getExecutor();
+ acquireLockIntervalDelayNanoseconds = TimeUnit.NANOSECONDS.convert(
+ service.getAcquireLockInterval(),
+ service.getAcquireLockIntervalUnit());
- task = executor.scheduleAtFixedRate(this::tryLock,
+ heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier();
+ if (heartbeatTimeoutMultiplier <= 0) {
+ throw new IllegalArgumentException("HeartbeatTimeoutMultiplier
must be greater than 0");
+ }
+
+ ScheduledExecutorService executor = service.getExecutor();
+ task = executor.scheduleWithFixedDelay(this::tryLock,
TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(),
service.getAcquireLockDelayUnit()),
TimeUnit.MILLISECONDS.convert(service.getAcquireLockInterval(),
service.getAcquireLockIntervalUnit()),
TimeUnit.MILLISECONDS);
+
+ localMember.setStatus(ClusterMemberStatus.STARTED);
}
@Override
protected void doStop() throws Exception {
+ if (localMember.isLeader() && leaderDataFile != null) {
+ try {
+ FileChannel channel = leaderDataFile.getChannel();
+ channel.truncate(0);
+ channel.force(true);
+ } catch (Exception e) {
+ // Log and ignore since we need to release the file lock and
do cleanup
+ LOGGER.debug("Failed to truncate {} on {} stop",
leaderDataPath, getClass().getSimpleName(), e);
+ }
+ }
+
closeInternal();
+ localMember.setStatus(ClusterMemberStatus.STOPPED);
+ clusterLeaderInfoRef.set(null);
}
- // *********************************
- //
- // *********************************
-
- private void closeInternal() throws Exception {
+ private void closeInternal() {
if (task != null) {
task.cancel(true);
}
- if (lock != null) {
- lock.release();
+ releaseFileLock();
+ closeLockFiles();
+ }
+
+ private void closeLockFiles() {
+ if (leaderLockFile != null) {
+ try {
+ leaderLockFile.close();
+ } catch (Exception ignore) {
+ // Ignore
+ }
+ leaderLockFile = null;
}
- closeLockFile();
+ if (leaderDataFile != null) {
+ try {
+ leaderDataFile.close();
+ } catch (Exception ignore) {
+ // Ignore
+ }
+ leaderDataFile = null;
+ }
}
- private void closeLockFile() {
- if (lockFile != null) {
+ private void releaseFileLock() {
+ if (lock != null) {
try {
- lockFile.close();
+ lock.release();
} catch (Exception ignore) {
// Ignore
}
- lockFile = null;
}
}
@@ -132,55 +180,132 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
Exception reason = null;
try {
- if (localMember.isLeader()) {
- LOGGER.trace("Holding the lock on file {} (lock={})",
path, lock);
+ if (isLeaderInternal()) {
+ LOGGER.debug("Holding the lock on file {} (lock={},
cluster-member-id={})", leaderLockPath, lock,
+ localMember.getUuid());
+ try {
+ // Update the cluster data file with the leader state
so that other cluster members can interrogate it
+ writeClusterLeaderInfo(false);
+ return;
+ } catch (IOException e) {
+ LOGGER.debug("Failed writing cluster leader data to
{}", leaderDataPath, e);
+ }
+ }
+
+ // Non-null lock at this point signifies leadership has been
lost or relinquished
+ if (lock != null) {
+ LOGGER.info("Lock on file {} lost (lock={},
cluster-member-id={})", leaderLockPath, lock,
+ localMember.getUuid());
+ localMember.setStatus(ClusterMemberStatus.FOLLOWER);
+ releaseFileLock();
+ closeLockFiles();
+ lock = null;
+ fireLeadershipChangedEvent((CamelClusterMember) null);
return;
}
- LOCK.lock();
- try {
- if (lock != null) {
- LOGGER.info("Lock on file {} lost (lock={})", path,
lock);
- fireLeadershipChangedEvent((CamelClusterMember) null);
+ // Must be follower to reach here
+ localMember.setStatus(ClusterMemberStatus.FOLLOWER);
+
+ // Get & update cluster leader state
+ LOGGER.debug("Reading cluster leader state from {}",
leaderDataPath);
+ FileLockClusterLeaderInfo latestClusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+ FileLockClusterLeaderInfo previousClusterLeaderInfo =
clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo);
+
+ // Check if we can attempt to take cluster leadership
+ if (isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo)
+ || canReclaimLeadership(latestClusterLeaderInfo)) {
+ if (previousClusterLeaderInfo != null &&
canReclaimLeadership(previousClusterLeaderInfo)) {
+ // Backoff so the current cluster leader can notice
leadership is relinquished
+ return;
}
- LOGGER.debug("Try to acquire a lock on {}", path);
- lockFile = new RandomAccessFile(path.toFile(), "rw");
+ // Attempt to obtain cluster leadership
+ LOGGER.debug("Try to acquire a lock on {}
(cluster-member-id={})", leaderLockPath, localMember.getUuid());
+ leaderLockFile = new
RandomAccessFile(leaderLockPath.toFile(), "rw");
+ leaderDataFile = new
RandomAccessFile(leaderDataPath.toFile(), "rw");
lock = null;
- lock = lockFile.getChannel().tryLock(0, Math.max(1,
lockFile.getChannel().size()), false);
+ if (Files.isReadable(leaderLockPath)) {
+ lock = leaderLockFile.getChannel().tryLock(0,
Math.max(1, leaderLockFile.getChannel().size()), false);
+ }
if (lock != null) {
- LOGGER.info("Lock on file {} acquired (lock={})",
path, lock);
+ LOGGER.info("Lock on file {} acquired (lock={},
cluster-member-id={})", leaderLockPath, lock,
+ localMember.getUuid());
+ localMember.setStatus(ClusterMemberStatus.LEADER);
+ clusterLeaderInfoRef.set(null);
fireLeadershipChangedEvent(localMember);
+ writeClusterLeaderInfo(true);
} else {
- LOGGER.debug("Lock on file {} not acquired ", path);
+ LOGGER.debug("Lock on file {} not acquired",
leaderLockPath);
}
- } finally {
- LOCK.unlock();
+ } else {
+ LOGGER.debug("Existing cluster leader is valid. Retrying
leadership acquisition on next interval");
}
} catch (OverlappingFileLockException e) {
reason = new IOException(e);
} catch (Exception e) {
reason = e;
+ } finally {
+ if (lock == null) {
+ LOGGER.debug("Lock on file {} not acquired
(cluster-member-id={})", leaderLockPath, localMember.getUuid(),
+ reason);
+ closeLockFiles();
+ }
}
+ }
+ }
+
+ boolean isLeaderStale(FileLockClusterLeaderInfo clusterLeaderInfo,
FileLockClusterLeaderInfo previousClusterLeaderInfo) {
+ return FileLockClusterUtils.isLeaderStale(
+ clusterLeaderInfo,
+ previousClusterLeaderInfo,
+ System.nanoTime(),
+ heartbeatTimeoutMultiplier);
+ }
+
+ boolean canReclaimLeadership(FileLockClusterLeaderInfo leaderInfo) {
+ return leaderInfo != null &&
localMember.getUuid().equals(leaderInfo.getId());
+ }
+
+ void writeClusterLeaderInfo(boolean forceMetaData) throws IOException {
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ localMember.getUuid(),
+ acquireLockIntervalDelayNanoseconds,
+ System.nanoTime());
- if (lock == null) {
- LOGGER.debug("Lock on file {} not acquired ", path, reason);
- closeLockFile();
+ FileLockClusterUtils.writeClusterLeaderInfo(
+ leaderDataPath,
+ leaderDataFile.getChannel(),
+ latestClusterLeaderInfo,
+ forceMetaData);
+ }
+
+ boolean isLeaderInternal() {
+ if (localMember.isLeader()) {
+ try {
+ FileLockClusterLeaderInfo leaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+ return lock != null
+ && lock.isValid()
+ && Files.exists(leaderLockPath)
+ && leaderInfo != null
+ && localMember.getUuid().equals(leaderInfo.getId());
+ } catch (Exception e) {
+ LOGGER.debug("Failed to read {} (cluster-member-id={})",
leaderLockPath, localMember.getUuid(), e);
+ return false;
}
}
+ return false;
}
private final class ClusterMember implements CamelClusterMember {
+ private final AtomicReference<ClusterMemberStatus> status = new
AtomicReference<>(ClusterMemberStatus.STOPPED);
+ private final String uuid = UUID.randomUUID().toString();
+
@Override
public boolean isLeader() {
- LOCK.lock();
- try {
- return lock != null && lock.isValid();
- } finally {
- LOCK.unlock();
- }
+ return getStatus().equals(ClusterMemberStatus.LEADER);
}
@Override
@@ -192,5 +317,24 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
public String getId() {
return getClusterService().getId();
}
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public ClusterMemberStatus getStatus() {
+ return status.get();
+ }
+
+ private void setStatus(ClusterMemberStatus status) {
+ this.status.set(status);
+ }
+ }
+
+ private enum ClusterMemberStatus {
+ FOLLOWER,
+ LEADER,
+ STARTED,
+ STOPPED
}
}
diff --git
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
new file mode 100644
index 00000000000..072ce576cd6
--- /dev/null
+++
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static
org.apache.camel.component.file.cluster.FileLockClusterUtils.LOCKFILE_BUFFER_SIZE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class FileLockClusterUtilsTest {
+ @Test
+ void nullLeaderInfoIsStale() {
+ assertTrue(FileLockClusterUtils.isLeaderStale(null, null,
System.nanoTime(), 5));
+ }
+
+ @Test
+ void newHeartbeatNotStale() {
+ String clusterMemberId = UUID.randomUUID().toString();
+ FileLockClusterLeaderInfo previousClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime());
+
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime());
+
+ assertFalse(
+ FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo, System.nanoTime(), 5));
+ }
+
+ @Test
+ void sameHeartbeatIsStale() {
+ String clusterMemberId = UUID.randomUUID().toString();
+ long heartbeatNanoseconds = System.nanoTime() -
TimeUnit.SECONDS.toNanos(10);
+ FileLockClusterLeaderInfo previousClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ heartbeatNanoseconds);
+
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ heartbeatNanoseconds);
+
+ assertTrue(
+ FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo, System.nanoTime(), 3));
+ }
+
+ @Test
+ void oldHeartbeatStale() {
+ String clusterMemberId = UUID.randomUUID().toString();
+ FileLockClusterLeaderInfo previousClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime() - TimeUnit.SECONDS.toNanos(5));
+
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime() - TimeUnit.SECONDS.toNanos(10));
+
+ assertTrue(
+ FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo, System.nanoTime(), 3));
+ }
+
+ @Test
+ void heartbeatExactlyAtThreshold() {
+ int heartbeatMultiplier = 3;
+ long now = System.nanoTime();
+ long updateInterval = TimeUnit.SECONDS.toNanos(1);
+ long heartbeat = now - (updateInterval * heartbeatMultiplier);
+
+ String clusterMemberId = UUID.randomUUID().toString();
+ FileLockClusterLeaderInfo previousClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ heartbeat);
+
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ clusterMemberId,
+ TimeUnit.SECONDS.toNanos(1),
+ heartbeat);
+
+
assertFalse(FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo, now,
+ heartbeatMultiplier));
+ }
+
+ @Test
+ void leaderChangedNotStale() {
+ FileLockClusterLeaderInfo previousClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ UUID.randomUUID().toString(),
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime());
+
+ FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
+ UUID.randomUUID().toString(),
+ TimeUnit.SECONDS.toNanos(1),
+ System.nanoTime());
+
+ assertFalse(
+ FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo, System.nanoTime(), 3));
+ }
+
+ @Test
+ void expectedFileLockBufferSize() {
+ // To catch cases where the lock file format is modified but the
buffer size was not updated
+ assertEquals(52, LOCKFILE_BUFFER_SIZE);
+ }
+
+ @Test
+ void writeClusterLeaderInfoLockNullChannel() {
+ assertThrows(NullPointerException.class, () -> {
+ FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("."), null,
new FileLockClusterLeaderInfo("", 1L, 1L), true);
+ });
+ }
+
+ @Test
+ void writeClusterLeaderInfoWithNullData(@TempDir Path tempDir) {
+ assertThrows(NullPointerException.class, () -> {
+ try (RandomAccessFile raf = new
RandomAccessFile(tempDir.resolve("lock").toFile(), "rw")) {
+ FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("."),
raf.getChannel(), null, true);
+ }
+ });
+ }
+
+ @Test
+ void writeClusterLeaderInfoClusterDataFileNotFound(@TempDir Path tempDir) {
+ assertThrows(FileNotFoundException.class, () -> {
+ try (RandomAccessFile raf = new
RandomAccessFile(tempDir.resolve("leader.dat").toFile(), "rw")) {
+ FileLockClusterLeaderInfo leaderInfo = new
FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 1L);
+
FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("/invalid/data/file"),
raf.getChannel(), leaderInfo,
+ true);
+ }
+ });
+ }
+
+ @Test
+ void writeClusterLeaderInfoData(@TempDir Path tempDir) throws IOException {
+ Path clusterData = tempDir.resolve("leader.dat");
+ try (RandomAccessFile raf = new RandomAccessFile(clusterData.toFile(),
"rw")) {
+ FileLockClusterLeaderInfo leaderInfo = new
FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 2L);
+ FileLockClusterUtils.writeClusterLeaderInfo(clusterData,
raf.getChannel(), leaderInfo, true);
+ assertEquals(LOCKFILE_BUFFER_SIZE, Files.size(clusterData));
+ }
+ }
+
+ @Test
+ void readClusterLeaderInfoLockFileNotFound() throws Exception {
+
assertNull(FileLockClusterUtils.readClusterLeaderInfo(Paths.get("/invalid/data/file")));
+ }
+
+ @Test
+ void readClusterLeaderInfoLock(@TempDir Path tempDir) throws Exception {
+ writeClusterLeaderInfoData(tempDir);
+
+ Path lockFile = tempDir.resolve("leader.dat");
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(lockFile);
+ assertNotNull(clusterLeaderInfo);
+
+ assertEquals(1L,
clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds());
+ assertEquals(2L, clusterLeaderInfo.getHeartbeatNanoseconds());
+ assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId()));
+ }
+}
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
new file mode 100644
index 00000000000..e7746679d04
--- /dev/null
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Platform file locking impl
prevents cluster data move / deletion")
+class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceTestBase {
+ @Test
+ void singleClusterMemberRecoversLeadershipIfLockFileDeleted() throws
Exception {
+ ClusterConfig config = new ClusterConfig();
+ config.setTimerRepeatCount(-1);
+
+ try (CamelContext clusterLeader = createCamelContext()) {
+ MockEndpoint mockEndpoint =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpoint.expectedMinimumMessageCount(1);
+
+ clusterLeader.start();
+
+ mockEndpoint.assertIsSatisfied();
+
+ AtomicReference<String> leaderId = new AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ assertNotNull(clusterLeaderInfo.getId());
+ assertDoesNotThrow(() ->
UUID.fromString(clusterLeaderInfo.getId()));
+ leaderId.set(clusterLeaderInfo.getId());
+ });
+
+ // Delete the lock file
+ Files.deleteIfExists(lockFile);
+
+ mockEndpoint.reset();
+ mockEndpoint.expectedMinimumMessageCount(1);
+
+ // Wait for leadership to be relinquished
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertFalse(getClusterMember(clusterLeader).isLeader());
+ });
+
+ // Leadership should be retained
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo recoveredClusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(recoveredClusterLeaderInfo);
+
+ String recoveredLeaderId = recoveredClusterLeaderInfo.getId();
+ assertNotNull(recoveredLeaderId);
+ assertDoesNotThrow(() -> UUID.fromString(recoveredLeaderId));
+ assertEquals(leaderId.get(), recoveredLeaderId);
+
+ mockEndpoint.assertIsSatisfied();
+ });
+ }
+
+ String leaderId = Files.readString(dataFile);
+ assertTrue(leaderId.isEmpty());
+ }
+
+ @Test
+ void multipleClusterMembersReelectLeaderIfLockFileDeleted() throws
Exception {
+ ClusterConfig leaderConfig = new ClusterConfig();
+ leaderConfig.setTimerRepeatCount(-1);
+
+ CamelContext clusterLeader = createCamelContext(leaderConfig);
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setTimerRepeatCount(-1);
+ followerConfig.setAcquireLockDelay(2);
+
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointLeader.expectedMinimumMessageCount(1);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointLeader.assertIsSatisfied();
+
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+ mockEndpointLeader.reset();
+ mockEndpointLeader.expectedMinimumMessageCount(1);
+
+ // Delete the lock file
+ Files.deleteIfExists(lockFile);
+
+ // Wait for leadership to be relinquished
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
+ assertFalse(getClusterMember(clusterLeader).isLeader());
+ });
+
+ // Wait for leadership to be gained by one of the members
+ CamelContext oldLeader = clusterLeader;
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
+ boolean newLeaderElected = false;
+
+ // Original cluster leader regained leadership
+ if (getClusterMember(oldLeader).isLeader()) {
+ newLeaderElected = true;
+ mockEndpointLeader.assertIsSatisfied();
+ }
+
+ // A different cluster member gained leadership
+ if (getClusterMember(clusterFollower).isLeader()) {
+ newLeaderElected = true;
+ mockEndpointFollower.assertIsSatisfied();
+ }
+
+ assertTrue(newLeaderElected);
+ });
+ } finally {
+ clusterLeader.stop();
+ clusterFollower.stop();
+ }
+ }
+
+ @Test
+ void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir
Path clusterMovedLocation) throws Exception {
+ ClusterConfig leaderConfig = new ClusterConfig();
+ leaderConfig.setTimerRepeatCount(-1);
+
+ CamelContext clusterLeader = createCamelContext(leaderConfig);
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setTimerRepeatCount(-1);
+ followerConfig.setAcquireLockDelay(2);
+
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointLeader.expectedMessageCount(5);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointLeader.assertIsSatisfied();
+
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+ mockEndpointLeader.reset();
+ mockEndpointLeader.expectedMinimumMessageCount(1);
+
+ // Simulate the file system becoming detached by moving the
cluster data directory
+ Files.move(clusterDir, clusterMovedLocation,
StandardCopyOption.REPLACE_EXISTING);
+
+ // Wait for leadership to be relinquished
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
+ assertFalse(getClusterMember(clusterLeader).isLeader());
+ });
+
+ // Simulate reattaching the file system by moving the cluster
directory back to the original location
+ Files.move(clusterMovedLocation, clusterDir,
StandardCopyOption.REPLACE_EXISTING);
+
+ // Since the lock file is not considered 'stale', the original
leader should resume leadership
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+ mockEndpointLeader.assertIsSatisfied();
+ });
+
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+ } finally {
+ clusterLeader.stop();
+ clusterFollower.stop();
+ }
+ }
+
+ @Test
+ void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path
clusterMovedLocation) throws Exception {
+ ClusterConfig leaderConfig = new ClusterConfig();
+ leaderConfig.setTimerRepeatCount(-1);
+
+ CamelContext clusterLeader = createCamelContext(leaderConfig);
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setTimerRepeatCount(-1);
+ followerConfig.setAcquireLockDelay(2);
+
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointLeader.expectedMessageCount(5);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointLeader.assertIsSatisfied();
+
+ AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new
AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+ leaderInfo.set(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+ mockEndpointLeader.reset();
+ mockEndpointLeader.expectedMinimumMessageCount(1);
+
+ // Simulate the file system becoming detached by moving the lock
directory
+ Files.move(clusterDir, clusterMovedLocation,
StandardCopyOption.REPLACE_EXISTING);
+
+ // Wait for leadership to be relinquished
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertFalse(getClusterMember(clusterLeader).isLeader());
+ });
+
+ // Stop the cluster leader to simulate it going 'offline' while
the lock file system is detached
+ clusterLeader.stop();
+
+ // Make the cluster data file appear stale (i.e. not updated
within acceptable bounds)
+ long staleHeartbeatTimestamp =
leaderInfo.get().getHeartbeatNanoseconds() - TimeUnit.SECONDS.toNanos(100);
+
+ FileLockClusterLeaderInfo updatedInfo
+ = new FileLockClusterLeaderInfo(
+ leaderInfo.get().getId(),
TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp);
+ Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
+ try (RandomAccessFile file = new RandomAccessFile(data.toFile(),
"rw")) {
+ FileLockClusterUtils.writeClusterLeaderInfo(data,
file.getChannel(), updatedInfo,
+ true);
+ }
+
+ // Simulate reattaching the file system by moving the cluster
directory back to the original location
+ Files.move(clusterMovedLocation, clusterDir,
StandardCopyOption.REPLACE_EXISTING);
+
+ mockEndpointFollower.expectedMinimumMessageCount(1);
+
+ // Since the lock file is considered 'stale', the follower should
be elected the leader
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(getClusterMember(clusterFollower).isLeader());
+ mockEndpointFollower.assertIsSatisfied();
+ });
+ } finally {
+ clusterFollower.stop();
+ }
+ }
+}
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
new file mode 100644
index 00000000000..72b49bc1b48
--- /dev/null
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class FileLockClusterServiceBasicFailoverTest extends
FileLockClusterServiceTestBase {
+ @Test
+ void singleClusterMemberLeaderElection() throws Exception {
+ try (CamelContext clusterLeader = createCamelContext()) {
+ MockEndpoint mockEndpoint =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpoint.expectedMessageCount(5);
+
+ clusterLeader.start();
+
+ mockEndpoint.assertIsSatisfied();
+
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+ }
+
+ assertEquals(0, Files.size(dataFile));
+ }
+
+ @Test
+ void multiClusterMemberLeaderElection() throws Exception {
+ CamelContext clusterLeader = createCamelContext();
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setAcquireLockDelay(2);
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointClustered =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointClustered.expectedMessageCount(5);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointClustered.assertIsSatisfied();
+
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+ } finally {
+ clusterFollower.stop();
+ clusterLeader.stop();
+ }
+
+ assertEquals(0, Files.size(dataFile));
+ }
+
+ @Test
+ void clusterFailoverWhenLeaderCamelContextStopped() throws Exception {
+ CamelContext clusterLeader = createCamelContext();
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setAcquireLockDelay(2);
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointClustered =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointClustered.expectedMessageCount(5);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointClustered.assertIsSatisfied();
+
+ AtomicReference<String> leaderId = new AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ assertNotNull(clusterLeaderInfo.getId());
+ assertDoesNotThrow(() ->
UUID.fromString(clusterLeaderInfo.getId()));
+ leaderId.set(clusterLeaderInfo.getId());
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+ // Stop the cluster leader
+ clusterLeader.stop();
+
+ // Verify the follower was elected as the new cluster leader
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(getClusterMember(clusterFollower).isLeader());
+
+ FileLockClusterLeaderInfo updatedClusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(updatedClusterLeaderInfo);
+
+ String newLeaderId = updatedClusterLeaderInfo.getId();
+ assertNotNull(newLeaderId);
+ assertDoesNotThrow(() -> UUID.fromString(newLeaderId));
+ assertNotEquals(leaderId.get(), newLeaderId);
+ assertEquals(5, mockEndpointFollower.getExchanges().size());
+ });
+ } finally {
+ clusterFollower.stop();
+ }
+
+ assertEquals(0, Files.size(dataFile));
+ }
+
+ @Test
+ void singleClusterMemberRecoversLeadershipIfUUIDRemovedFromLockFile()
throws Exception {
+ try (CamelContext clusterLeader = createCamelContext()) {
+ MockEndpoint mockEndpoint =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpoint.expectedMinimumMessageCount(1);
+
+ clusterLeader.start();
+
+ mockEndpoint.assertIsSatisfied();
+
+ AtomicReference<String> leaderId = new AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+
+ assertNotNull(clusterLeaderInfo.getId());
+ assertDoesNotThrow(() ->
UUID.fromString(clusterLeaderInfo.getId()));
+ leaderId.set(clusterLeaderInfo.getId());
+ });
+
+ // Truncate the lock file
+ Files.write(dataFile, new byte[0]);
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ // Leadership should be lost
+ assertFalse(getClusterMember(clusterLeader).isLeader());
+ });
+
+ mockEndpoint.reset();
+ mockEndpoint.expectedMinimumMessageCount(1);
+
+ // Await recovery
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo recoveredClusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(recoveredClusterLeaderInfo);
+
+ String recoveredLeaderId = recoveredClusterLeaderInfo.getId();
+ assertNotNull(recoveredLeaderId);
+ assertDoesNotThrow(() -> UUID.fromString(recoveredLeaderId));
+ assertEquals(leaderId.get(), recoveredLeaderId);
+
+ mockEndpoint.assertIsSatisfied();
+ });
+ }
+
+ assertEquals(0, Files.size(dataFile));
+ }
+
+ @Test
+ void negativeHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+ ClusterConfig config = new ClusterConfig();
+ config.setHeartbeatTimeoutMultiplier(-1);
+
+ Exception exception = assertThrows(Exception.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ assertIsInstanceOf(IllegalArgumentException.class,
exception.getCause());
+ }
+
+ @Test
+ void zeroHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+ ClusterConfig config = new ClusterConfig();
+ config.setHeartbeatTimeoutMultiplier(0);
+
+ Exception exception = assertThrows(Exception.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ assertIsInstanceOf(IllegalArgumentException.class,
exception.getCause());
+ }
+}
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
new file mode 100644
index 00000000000..72290aaacbb
--- /dev/null
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+abstract class FileLockClusterServiceTestBase {
+ protected static final String NAMESPACE = "test-ns";
+
+ @TempDir
+ protected Path clusterDir;
+ protected Path lockFile;
+ protected Path dataFile;
+
+ @BeforeEach
+ public void beforeEach() {
+ lockFile = clusterDir.resolve(NAMESPACE);
+ dataFile = clusterDir.resolve(NAMESPACE + ".dat");
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createCamelContext(new ClusterConfig());
+ }
+
+ protected CamelContext createCamelContext(ClusterConfig config) throws
Exception {
+ CamelContext context = new DefaultCamelContext();
+ context.addService(createFileLockClusterService(config));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
fromF("master:%s:timer:clustered?delay=1&period=100&repeatCount=%d", NAMESPACE,
config.getTimerRepeatCount())
+ .routeId("clustered")
+ .log(LoggingLevel.DEBUG, "Timer fired for ${camelId}")
+ .to("mock:result");
+ }
+ });
+ return context;
+ }
+
+ protected FileLockClusterService
createFileLockClusterService(ClusterConfig config) {
+ FileLockClusterService service = new FileLockClusterService();
+ service.setAcquireLockDelay(config.getAcquireLockDelay());
+ service.setAcquireLockInterval(1);
+ service.setRoot(clusterDir.toString());
+
service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier());
+ return service;
+ }
+
+ protected CamelClusterMember getClusterMember(CamelContext camelContext)
throws Exception {
+ return getClusterView(camelContext).getLocalMember();
+ }
+
+ protected CamelClusterView getClusterView(CamelContext camelContext)
throws Exception {
+ FileLockClusterService fileLockClusterService =
camelContext.hasService(FileLockClusterService.class);
+ return fileLockClusterService.getView(NAMESPACE);
+ }
+
+ static final class ClusterConfig {
+ private long acquireLockDelay = 1;
+ private long timerRepeatCount = 5;
+ private int heartbeatTimeoutMultiplier = 5;
+
+ long getAcquireLockDelay() {
+ return acquireLockDelay;
+ }
+
+ void setAcquireLockDelay(long acquireLockDelay) {
+ this.acquireLockDelay = acquireLockDelay;
+ }
+
+ long getTimerRepeatCount() {
+ return timerRepeatCount;
+ }
+
+ void setTimerRepeatCount(long timerRepeatCount) {
+ this.timerRepeatCount = timerRepeatCount;
+ }
+
+ long getStartupDelayWithOffsetMillis() {
+ return TimeUnit.SECONDS.toMillis(getAcquireLockDelay()) + 500;
+ }
+
+ public int getHeartbeatTimeoutMultiplier() {
+ return heartbeatTimeoutMultiplier;
+ }
+
+ public void setHeartbeatTimeoutMultiplier(int
heartbeatTimeoutMultiplier) {
+ this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
+ }
+ }
+}
diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc
b/docs/user-manual/modules/ROOT/pages/clustering.adoc
index a1065347d0a..8410159acf6 100644
--- a/docs/user-manual/modules/ROOT/pages/clustering.adoc
+++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc
@@ -38,12 +38,95 @@ Out of the box camel provides the following implementations:
|consul |camel-consul |
org.apache.camel.component.consul.cluster.ConsulClusterService
|file |camel-file |
org.apache.camel.component.file.cluster.FileLockClusterService
|infinispan |camel-infinispan |
org.apache.camel.component.infinispan.cluster.InfinispanClusterService
-|jgroups |camel-jgroups |
org.apache.camel.component.jgroups.cluster.JGroupsLockClusterService
|jgroups-raft |camel-jgroups-raft |
org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService
|kubernetes |camel-kubernetes |
org.apache.camel.component.kubernetes.cluster.KubernetesClusterService
|zookeeper |camel-zookeeper |
org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService
|====
+Configuration options:
+
+*ConsulClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| sessionTtl | The Consul session TTL in seconds | 60 | int
+| sessionLockDelay | The Consul session lock delay in seconds | 5 | int
+| sessionRefreshInterval | The Consul session refresh interval in seconds | 5
| int
+| rootPath | The Consul cluster root directory path | /camel | String
+|===
+
+*FileLockClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| acquireLockDelay | The time to wait before starting to try to acquire the
cluster lock | 1 | long
+| acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS |
TimeUnit
+| acquireLockInterval | The time to wait between attempts to try to acquire
the cluster lock | 10 | long
+| acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS |
TimeUnit
+| heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader
acquireLockInterval to determine how long followers should wait before
considering the leader "stale". For example, if the leader updates its
heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers
will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader
unavailable | 5 | int
+| rootPath | The file cluster root directory path | | String
+|===
+
+*InfinispanClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| lifespan | The lifespan of the cache entry for the local cluster member
registered to the inventory | 30 | long
+| lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit
+|===
+
+*JGroupsRaftClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| jgroupsConfig | The path to the JGroups Raft configuration | raft.xml |
String
+| jgroupsClusterName | The name of the cluster | jgroupsraft-master | String
+| raftHandle | The RaftHandle | | org.jgroups.raft.RaftHandle
+| raftId | Unique Raft id | | String
+|===
+
+*KubernetesClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| leaseResourceType | Kubernetes resource type used to hold the leases |
LeaseResourceType.Lease | LeaseResourceType
+| kubernetesResourcesNamespace | Kubernetes namespace containing the pods and
the ConfigMap used for locking | | String
+| kubernetesResourceName | Name of the resource used for locking (or prefix,
in case multiple ones are used) | leaders | String
+| groupName | Name of the lock group (or namespace according to the Camel
cluster convention) within the chosen ConfigMap | | String
+| podName | Name of the current pod (defaults to host name) | | String
+| clusterLabels | Labels used to identify the members of the cluster | empty
map | Map
+| jitterFactor | A jitter factor to apply in order to prevent all pods to
call Kubernetes APIs in the same instant | 1.2 | double
+| leaseDurationMillis | The default duration of the lease for the
[.line-through]#current# leader | 15000 | long
+| renewDeadlineMillis | The deadline after which the leader must stop its
services because it may have lost the leadership | 10000 | long
+| retryPeriodMillis | The time between two subsequent attempts to check and
acquire the leadership. It is randomized using the jitter factor | 2000 | long
+|===
+
+*ZooKeeperClusterService*
+
+[options="header", cols="15,55,15,15"]
+|===
+| Name | Description | Default | Type
+| nodes | The Zookeeper server hosts (multiple servers can be separated by
comma) | | List
+| namespace | ZooKeeper namespace. If a namespace is set here, all paths will
get pre-pended with the namespace | | String
+| reconnectBaseSleepTime | Initial amount of time to wait between retries | |
long
+| reconnectBaseSleepTimeUnit | ReconnectBaseSleepTime TimeUnit. Default is |
MILLISECONDS | TimeUnit
+| reconnectMaxRetries | Max number of times to retry | 3 | int
+| sessionTimeout | The session timeout in milliseconds | 60000 | long
+| sessionTimeoutUnit | The session timeout TimeUnit | MILLISECONDS | TimeUnit
+| connectionTimeout | The connection timeout in milliseconds | 15000 | long
+| connectionTimeoutUnit | The connection timeout TimeUnit |
TimeUnit.MILLISECONDS | TimeUnit
+| authInfoList | List of AuthInfo objects with scheme and auth | | List
+| maxCloseWait | Time to wait during close to join background threads | 1000 |
long
+| maxCloseWaitUnit | MaxCloseWait TimeUnit | MILLISECONDS | TimeUnit
+| retryPolicy | The retry policy to use. | | RetryPolicy
+| basePath | The base path to store in ZooKeeper | | String
+|===
+
Configuration examples:
- *Spring Boot*