This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-4.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit f5793761549eb57cfb5bc9bad991140cc75c771b Author: James Netherton <[email protected]> AuthorDate: Mon Dec 22 13:31:27 2025 +0000 CAMEL-22784 - Improve FileLockClusterService resilience to long blocking network based file I/O --- components/camel-file/pom.xml | 5 + .../file/cluster/FileLockClusterLeaderInfo.java | 9 ++ .../file/cluster/FileLockClusterService.java | 92 +++++++++++++ .../file/cluster/FileLockClusterTaskExecutor.java | 75 +++++++++++ .../file/cluster/FileLockClusterUtils.java | 6 +- .../file/cluster/FileLockClusterView.java | 147 +++++++++++++++------ .../cluster/FileLockClusterTaskExecutorTest.java | 113 ++++++++++++++++ .../file/cluster/FileLockClusterUtilsTest.java | 3 +- ...FileLockClusterServiceAdvancedFailoverTest.java | 88 ------------ .../FileLockClusterServiceBasicFailoverTest.java | 55 ++++++-- .../cluster/FileLockClusterServiceTestBase.java | 24 +++- .../user-manual/modules/ROOT/pages/clustering.adoc | 3 + 12 files changed, 480 insertions(+), 140 deletions(-) diff --git a/components/camel-file/pom.xml b/components/camel-file/pom.xml index e50a83ac5bff..e3fedecde39d 100644 --- a/components/camel-file/pom.xml +++ b/components/camel-file/pom.xml @@ -53,6 +53,11 @@ <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core-engine</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java index 77f0ba55fe01..1a2c70387143 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java @@ -68,4 +68,13 @@ final class FileLockClusterLeaderInfo { public int hashCode() { return Objects.hashCode(id); } + + @Override + public String toString() { + return "FileLockClusterLeaderInfo{" + + "id='" + id + '\'' + + ", heartbeatUpdateIntervalMilliseconds=" + heartbeatUpdateIntervalMilliseconds + + ", heartbeatMilliseconds=" + heartbeatMilliseconds + + '}'; + } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java index e21568925f0a..3851aea2cdd6 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.cluster; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -32,6 +33,10 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock private TimeUnit acquireLockIntervalUnit; private ScheduledExecutorService executor; private int heartbeatTimeoutMultiplier; + private int clusterDataTaskMaxAttempts; + private long clusterDataTaskTimeout; + private TimeUnit clusterDataTaskTimeoutUnit; + private ExecutorService clusterDataTaskExecutor; public FileLockClusterService() { this.acquireLockDelay = 1; @@ -39,6 +44,9 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock this.acquireLockInterval = 10; this.acquireLockIntervalUnit = TimeUnit.SECONDS; this.heartbeatTimeoutMultiplier = 5; + this.clusterDataTaskMaxAttempts = 5; + this.clusterDataTaskTimeout = 10; + this.clusterDataTaskTimeoutUnit = TimeUnit.SECONDS; } @Override @@ -120,6 +128,9 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock * <p> */ public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { + if (heartbeatTimeoutMultiplier <= 0) { + throw new IllegalArgumentException("HeartbeatTimeoutMultiplier must be greater than 0"); + } this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier; } @@ -127,6 +138,64 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock return heartbeatTimeoutMultiplier; } + /** + * Sets how many times a cluster data task will run, counting both the first execution and subsequent retries in + * case of failure or timeout. The default is 5 attempts. + * <p> + * This can be useful when the cluster data root is on network based file storage, where I/O operations may + * occasionally block for long or unpredictable periods. + */ + public void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) { + if (clusterDataTaskMaxAttempts <= 0) { + throw new IllegalArgumentException("clusterDataTaskMaxRetries must be greater than 0"); + } + this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts; + } + + public int getClusterDataTaskMaxAttempts() { + return clusterDataTaskMaxAttempts; + } + + /** + * Sets the timeout for a cluster data task (reading or writing cluster data). The default is 10 seconds. + * <p> + * Timeouts are useful when the cluster data root is on network storage, where I/O operations may occasionally block + * for long or unpredictable periods. + */ + public void setClusterDataTaskTimeout(long clusterDataTaskTimeout) { + if (clusterDataTaskTimeout <= 0) { + throw new IllegalArgumentException("clusterDataTaskMaxRetries must be greater than 0"); + } + this.clusterDataTaskTimeout = clusterDataTaskTimeout; + } + + public long getClusterDataTaskTimeout() { + return clusterDataTaskTimeout; + } + + /** + * The time unit for the clusterDataTaskTimeoutUnit, default to TimeUnit.SECONDS. + */ + public void setClusterDataTaskTimeoutUnit(TimeUnit clusterDataTaskTimeoutUnit) { + this.clusterDataTaskTimeoutUnit = clusterDataTaskTimeoutUnit; + } + + public TimeUnit getClusterDataTaskTimeoutUnit() { + return clusterDataTaskTimeoutUnit; + } + + /** + * Sets the timeout for a cluster data task (reading or writing cluster data). The default is 10 seconds. + * <p> + * Timeouts are useful when the cluster data root is on network storage, where I/O operations may occasionally block + * for long or unpredictable periods. + * <p> + */ + public void setClusterDataTaskTimeout(long clusterDataTaskTimeout, TimeUnit clusterDataTaskTimeoutUnit) { + setClusterDataTaskTimeout(clusterDataTaskTimeout); + setClusterDataTaskTimeoutUnit(clusterDataTaskTimeoutUnit); + } + @Override protected void doStop() throws Exception { super.doStop(); @@ -142,6 +211,14 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock executor = null; } + + if (clusterDataTaskExecutor != null) { + if (context != null) { + context.getExecutorServiceManager().shutdown(clusterDataTaskExecutor); + } else { + clusterDataTaskExecutor.shutdown(); + } + } } ScheduledExecutorService getExecutor() { @@ -161,4 +238,19 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock internalLock.unlock(); } } + + ExecutorService getClusterDataTaskExecutor() { + Lock internalLock = getInternalLock(); + internalLock.lock(); + try { + if (clusterDataTaskExecutor == null) { + final CamelContext context = ObjectHelper.notNull(getCamelContext(), "CamelContext"); + clusterDataTaskExecutor = context.getExecutorServiceManager().newFixedThreadPool(this, + "FileLockClusterDataTask-" + getId(), 5); + } + return clusterDataTaskExecutor; + } finally { + internalLock.unlock(); + } + } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java new file mode 100644 index 000000000000..cc7932989793 --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executes cluster data read / write tasks asynchronously, with timeouts to guard against potential unpredictable + * blocking I/O periods. + */ +class FileLockClusterTaskExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterTaskExecutor.class); + private final FileLockClusterService service; + + FileLockClusterTaskExecutor(FileLockClusterService service) { + Objects.requireNonNull(service, "FileLockClusterService cannot be null"); + this.service = service; + } + + /** + * If the cluster data root is network based, like an NFS mount, avoid potential long blocking I/O to fail fast and + * reliably reason about the cluster state. + * + * @param task Supplier representing a task to run + */ + <T> T run(Supplier<T> task) throws ExecutionException, TimeoutException { + Objects.requireNonNull(task, "Task cannot be null"); + + int maxAttempts = service.getClusterDataTaskMaxAttempts(); + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + LOGGER.debug("Running cluster task attempt {} of {}", attempt, maxAttempts); + + CompletableFuture<T> future = CompletableFuture.supplyAsync(task, service.getClusterDataTaskExecutor()); + try { + return future.get(service.getClusterDataTaskTimeout(), service.getClusterDataTaskTimeoutUnit()); + } catch (InterruptedException e) { + LOGGER.trace("Cluster task interrupted on attempt {} of {}", attempt, maxAttempts); + future.cancel(true); + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException | TimeoutException e) { + LOGGER.debug("Cluster task encountered an exception on attempt {} of {}", attempt, maxAttempts, e); + future.cancel(true); + if (attempt == maxAttempts) { + LOGGER.debug("Cluster task retry limit ({}) reached", maxAttempts, e); + throw e; + } + } finally { + LOGGER.debug("Cluster task attempt {} ended", attempt); + } + } + return null; + } +} diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java index fd3a7982cf9f..5a7466ceb622 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java @@ -62,7 +62,7 @@ final class FileLockClusterUtils { FileChannel channel, FileLockClusterLeaderInfo clusterLeaderInfo, boolean forceMetaData) - throws IOException { + throws Exception { Objects.requireNonNull(channel, "channel cannot be null"); Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be null"); @@ -100,7 +100,7 @@ final class FileLockClusterUtils { * inconsistent state * @throws IOException If reading the lock file failed */ - static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws IOException { + static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws Exception { try { byte[] bytes = Files.readAllBytes(leaderDataPath); @@ -119,7 +119,7 @@ final class FileLockClusterUtils { long lastHeartbeat = buf.getLong(); return new FileLockClusterLeaderInfo(uuidStr, intervalMillis, lastHeartbeat); - } catch (NoSuchFileException e) { + } catch (FileNotFoundException | NoSuchFileException e) { // Handle NoSuchFileException to give the ClusterView a chance to recreate the leadership data return null; } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index 8156f00cfaff..80896e1f1c32 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -29,14 +29,18 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.support.cluster.AbstractCamelClusterView; +import org.apache.camel.util.function.ThrowingHelper; +import org.apache.camel.util.function.ThrowingSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +61,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { private ScheduledFuture<?> task; private int heartbeatTimeoutMultiplier; private long acquireLockIntervalMilliseconds; + private FileLockClusterTaskExecutor clusterTaskExecutor; FileLockClusterView(FileLockClusterService cluster, String namespace) { super(cluster, namespace); @@ -88,6 +93,8 @@ public class FileLockClusterView extends AbstractCamelClusterView { @Override protected void doStart() throws Exception { + FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); + // Start critical section try { contextStartLock.lock(); @@ -97,31 +104,42 @@ public class FileLockClusterView extends AbstractCamelClusterView { fireLeadershipChangedEvent((CamelClusterMember) null); } - if (!Files.exists(leaderLockPath.getParent())) { - Files.createDirectories(leaderLockPath.getParent()); + // Attempt to pre-create cluster data files & directories. On failure, it will either be attempted by another cluster member or run again within the tryLock task loop + try { + if (!Files.exists(leaderLockPath.getParent())) { + Files.createDirectories(leaderLockPath.getParent()); + } + } catch (IOException e) { + LOGGER.debug("Error creating directory {}", leaderLockPath.getParent(), e); } - if (!Files.exists(leaderLockPath)) { - Files.createFile(leaderLockPath); + try { + if (!Files.exists(leaderLockPath)) { + Files.createFile(leaderLockPath); + } + } catch (IOException e) { + LOGGER.debug("Error creating cluster leader lock file {}", leaderLockPath, e); } - if (!Files.exists(leaderDataPath)) { - Files.createFile(leaderDataPath); + try { + if (!Files.exists(leaderDataPath)) { + Files.createFile(leaderDataPath); + } + } catch (IOException e) { + LOGGER.debug("Error creating cluster leader data file {}", leaderDataPath, e); } } finally { // End critical section contextStartLock.unlock(); } - FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); + clusterTaskExecutor = new FileLockClusterTaskExecutor(service); + acquireLockIntervalMilliseconds = TimeUnit.MILLISECONDS.convert( service.getAcquireLockInterval(), service.getAcquireLockIntervalUnit()); heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier(); - if (heartbeatTimeoutMultiplier <= 0) { - throw new IllegalArgumentException("HeartbeatTimeoutMultiplier must be greater than 0"); - } ScheduledExecutorService executor = service.getExecutor(); task = executor.scheduleWithFixedDelay(this::tryLock, @@ -135,14 +153,20 @@ public class FileLockClusterView extends AbstractCamelClusterView { @Override protected void doStop() throws Exception { if (localMember.isLeader() && leaderDataFile != null) { - try { - FileChannel channel = leaderDataFile.getChannel(); - channel.truncate(0); - channel.force(true); - } catch (Exception e) { - // Log and ignore since we need to release the file lock and do cleanup - LOGGER.debug("Failed to truncate {} on {} stop", leaderDataPath, getClass().getSimpleName(), e); - } + clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<Void, Throwable>() { + @Override + public Void get() throws Throwable { + try { + FileChannel channel = leaderDataFile.getChannel(); + channel.truncate(0); + channel.force(true); + } catch (Exception e) { + // Log and ignore since we need to release the file lock and do cleanup + LOGGER.debug("Failed to truncate {} on {} stop", leaderDataPath, getClass().getSimpleName(), e); + } + return null; + } + })); } closeInternal(); @@ -201,7 +225,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { // Update the cluster data file with the leader state so that other cluster members can interrogate it writeClusterLeaderInfo(false); return; - } catch (IOException e) { + } catch (Exception e) { LOGGER.debug("Failed writing cluster leader data to {}", leaderDataPath, e); } } @@ -212,6 +236,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { localMember.getUuid()); localMember.setStatus(ClusterMemberStatus.FOLLOWER); fireLeadershipChangedEvent((CamelClusterMember) null); + clusterLeaderInfoRef.set(null); releaseFileLock(); closeLockFiles(); lock = null; @@ -223,9 +248,8 @@ public class FileLockClusterView extends AbstractCamelClusterView { // Get & update cluster leader state LOGGER.debug("Reading cluster leader state from {}", leaderDataPath); - FileLockClusterLeaderInfo latestClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); + FileLockClusterLeaderInfo latestClusterLeaderInfo = readClusterLeaderInfo(); FileLockClusterLeaderInfo previousClusterLeaderInfo = clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo); - // Check if we can attempt to take cluster leadership if (isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo) || canReclaimLeadership(latestClusterLeaderInfo)) { @@ -235,19 +259,17 @@ public class FileLockClusterView extends AbstractCamelClusterView { } // Try to recreate the cluster data directory in case it got removed - if (!Files.exists(leaderLockPath.getParent())) { - Files.createDirectories(leaderLockPath.getParent()); - } + createClusterRootDirectoryIfRequired(); // Attempt to obtain cluster leadership LOGGER.debug("Try to acquire a lock on {} (cluster-member-id={})", leaderLockPath, localMember.getUuid()); - leaderLockFile = new RandomAccessFile(leaderLockPath.toFile(), "rw"); - leaderDataFile = new RandomAccessFile(leaderDataPath.toFile(), "rw"); + leaderLockFile = createRandomAccessFile(leaderLockPath); + leaderDataFile = createRandomAccessFile(leaderDataPath); lock = null; lock = leaderLockFile.getChannel().tryLock(0, Math.max(1, leaderLockFile.getChannel().size()), false); - if (lock != null) { + if (lockIsValid()) { LOGGER.info("Lock on file {} acquired (lock={}, cluster-member-id={})", leaderLockPath, lock, localMember.getUuid()); localMember.setStatus(ClusterMemberStatus.LEADER); @@ -286,28 +308,67 @@ public class FileLockClusterView extends AbstractCamelClusterView { return leaderInfo != null && localMember.getUuid().equals(leaderInfo.getId()); } - void writeClusterLeaderInfo(boolean forceMetaData) throws IOException { + void createClusterRootDirectoryIfRequired() throws ExecutionException, TimeoutException { + clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<Void, Throwable>() { + @Override + public Void get() throws Throwable { + if (!Files.exists(leaderLockPath.getParent())) { + Files.createDirectories(leaderLockPath.getParent()); + } + return null; + } + })); + } + + RandomAccessFile createRandomAccessFile(Path path) throws ExecutionException, TimeoutException { + return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<RandomAccessFile, Throwable>() { + @Override + public RandomAccessFile get() throws Throwable { + return new RandomAccessFile(path.toFile(), "rw"); + } + })); + } + + FileLockClusterLeaderInfo readClusterLeaderInfo() throws Exception { + return clusterTaskExecutor + .run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<FileLockClusterLeaderInfo, Throwable>() { + @Override + public FileLockClusterLeaderInfo get() throws Throwable { + return FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); + } + })); + } + + void writeClusterLeaderInfo(boolean forceMetaData) throws Exception { FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( localMember.getUuid(), acquireLockIntervalMilliseconds, System.currentTimeMillis()); - FileLockClusterUtils.writeClusterLeaderInfo( - leaderDataPath, - leaderDataFile.getChannel(), - latestClusterLeaderInfo, - forceMetaData); + clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<Void, Throwable>() { + @Override + public Void get() throws Throwable { + FileLockClusterUtils.writeClusterLeaderInfo( + leaderDataPath, + leaderDataFile.getChannel(), + latestClusterLeaderInfo, + forceMetaData); + return null; + } + })); } boolean isLeaderInternal() { if (localMember.isLeader()) { try { - FileLockClusterLeaderInfo leaderInfo = FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); + FileLockClusterLeaderInfo leaderInfo = readClusterLeaderInfo(); + boolean leaderStale = isLeaderStale(leaderInfo, clusterLeaderInfoRef.getAndSet(leaderInfo)); + LOGGER.debug("Leader read cluster data {}, isStale={}", leaderInfo, leaderStale); + return leaderInfo != null - && lock != null - && lock.isValid() + && !leaderStale && localMember.getUuid().equals(leaderInfo.getId()) - && Files.exists(leaderLockPath); + && lockIsValid(); } catch (Exception e) { LOGGER.debug("Failed to read {} (cluster-member-id={})", leaderLockPath, localMember.getUuid(), e); return false; @@ -316,6 +377,18 @@ public class FileLockClusterView extends AbstractCamelClusterView { return false; } + boolean lockIsValid() throws ExecutionException, TimeoutException { + if (lock != null && lock.isValid()) { + return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new ThrowingSupplier<Boolean, Throwable>() { + @Override + public Boolean get() throws Throwable { + return Files.exists(leaderLockPath); + } + })); + } + return false; + } + private final class ClusterMember implements CamelClusterMember { private final AtomicReference<ClusterMemberStatus> status = new AtomicReference<>(ClusterMemberStatus.STOPPED); private final String uuid = UUID.randomUUID().toString(); diff --git a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java new file mode 100644 index 000000000000..0b086b3ee1a3 --- /dev/null +++ b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class FileLockClusterTaskExecutorTest { + @Test + void runTaskWithDefaultMaxAttemptsAndTimeout() throws ExecutionException, TimeoutException { + FileLockClusterService service = new FileLockClusterService(); + service.setCamelContext(new DefaultCamelContext()); + + FileLockClusterTaskExecutor executor = new FileLockClusterTaskExecutor(service); + + String message = "Hello World"; + String result = executor.run(new Supplier<String>() { + @Override + public String get() { + return message; + } + }); + + Assertions.assertEquals(message, result); + } + + @Test + void runTaskWithMaxAttemptsExceeded() { + int maxAttempts = 3; + int timeoutMs = 100; + + FileLockClusterService service = new FileLockClusterService(); + service.setCamelContext(new DefaultCamelContext()); + service.setClusterDataTaskMaxAttempts(maxAttempts); + service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS); + + FileLockClusterTaskExecutor executor = new FileLockClusterTaskExecutor(service); + + AtomicInteger count = new AtomicInteger(); + String message = "Hello World"; + + Assertions.assertThrows(TimeoutException.class, () -> { + executor.run(new Supplier<String>() { + @Override + public String get() { + count.incrementAndGet(); + try { + Thread.sleep(timeoutMs + 50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return message; + } + }); + }); + + Assertions.assertEquals(3, count.get()); + } + + @Test + void runTaskWithMaxAttemptsNotExceeded() throws ExecutionException, TimeoutException { + int maxAttempts = 3; + int timeoutMs = 100; + + FileLockClusterService service = new FileLockClusterService(); + service.setCamelContext(new DefaultCamelContext()); + service.setClusterDataTaskMaxAttempts(maxAttempts); + service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS); + + FileLockClusterTaskExecutor executor = new FileLockClusterTaskExecutor(service); + + AtomicInteger count = new AtomicInteger(); + String message = "Hello World"; + + String result = executor.run(new Supplier<String>() { + @Override + public String get() { + if (count.incrementAndGet() < 3) { + try { + Thread.sleep(timeoutMs + 50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return message; + } + }); + + Assertions.assertEquals(3, count.get()); + Assertions.assertEquals(message, result); + } +} diff --git a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java index 3e6f9db8a578..28fb79fc2923 100644 --- a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java +++ b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.file.cluster; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; @@ -171,7 +170,7 @@ class FileLockClusterUtilsTest { } @Test - void writeClusterLeaderInfoData(@TempDir Path tempDir) throws IOException { + void writeClusterLeaderInfoData(@TempDir Path tempDir) throws Exception { Path clusterData = tempDir.resolve("leader.dat"); try (RandomAccessFile raf = new RandomAccessFile(clusterData.toFile(), "rw")) { FileLockClusterLeaderInfo leaderInfo = new FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 2L); diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java index c335681eca85..29c9a60536bf 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java @@ -258,94 +258,6 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT } } - @Test - void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception { - ClusterConfig leaderConfig = new ClusterConfig(); - leaderConfig.setTimerRepeatCount(-1); - - CamelContext clusterLeader = createCamelContext(leaderConfig); - - ClusterConfig followerConfig = new ClusterConfig(); - followerConfig.setTimerRepeatCount(-1); - followerConfig.setAcquireLockDelay(2); - - CamelContext clusterFollower = createCamelContext(followerConfig); - - try { - MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); - mockEndpointLeader.expectedMessageCount(5); - - clusterLeader.start(); - clusterFollower.start(); - - mockEndpointLeader.assertIsSatisfied(); - - AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new AtomicReference<>(); - Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { - assertTrue(Files.exists(lockFile)); - assertTrue(Files.exists(dataFile)); - assertTrue(getClusterMember(clusterLeader).isLeader()); - - FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); - assertNotNull(clusterLeaderInfo); - leaderInfo.set(clusterLeaderInfo); - - String leaderId = clusterLeaderInfo.getId(); - assertNotNull(leaderId); - assertDoesNotThrow(() -> UUID.fromString(leaderId)); - }); - - // Wait enough time for the follower to have run its lock acquisition scheduled task - Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); - - // The follower should not have produced any messages - MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); - assertTrue(mockEndpointFollower.getExchanges().isEmpty()); - - mockEndpointLeader.reset(); - mockEndpointLeader.expectedMinimumMessageCount(1); - - // Simulate the file system becoming detached by moving the cluster data directory - Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING); - - // Simulate reattaching the file system by moving the cluster directory back to the original location - try (Stream<Path> stream = Files.walk(clusterMovedLocation)) { - stream.forEach(path -> { - try { - Path destination = clusterDir.resolve(clusterMovedLocation.relativize(path)); - if (Files.isDirectory(path)) { - Files.createDirectories(destination); - } else { - Files.copy(path, destination, StandardCopyOption.REPLACE_EXISTING); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - FileLockClusterLeaderInfo updatedInfo - = new FileLockClusterLeaderInfo( - leaderInfo.get().getId(), TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis()); - Path data = clusterMovedLocation.resolve(NAMESPACE + ".data"); - try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) { - FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo, - true); - } - - // Since the lock file is not considered 'stale', the original leader should resume leadership - Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { - assertTrue(getClusterMember(clusterLeader).isLeader()); - mockEndpointLeader.assertIsSatisfied(); - }); - - assertTrue(mockEndpointFollower.getExchanges().isEmpty()); - } finally { - clusterLeader.stop(); - clusterFollower.stop(); - } - } - @Test void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path clusterMovedLocation) throws Exception { ClusterConfig leaderConfig = new ClusterConfig(); diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java index 72b49bc1b486..5bb4da929715 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java @@ -26,7 +26,6 @@ import org.apache.camel.component.mock.MockEndpoint; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; -import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -221,28 +220,68 @@ class FileLockClusterServiceBasicFailoverTest extends FileLockClusterServiceTest } @Test - void negativeHeartbeatTimeoutMultiplierThrowsException() throws Exception { + void negativeHeartbeatTimeoutMultiplierThrowsException() { ClusterConfig config = new ClusterConfig(); config.setHeartbeatTimeoutMultiplier(-1); - - Exception exception = assertThrows(Exception.class, () -> { + assertThrows(IllegalArgumentException.class, () -> { try (CamelContext camelContext = createCamelContext(config)) { camelContext.start(); } }); - assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); } @Test - void zeroHeartbeatTimeoutMultiplierThrowsException() throws Exception { + void zeroHeartbeatTimeoutMultiplierThrowsException() { ClusterConfig config = new ClusterConfig(); config.setHeartbeatTimeoutMultiplier(0); + assertThrows(IllegalArgumentException.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + } + + @Test + void negativeClusterDataTaskMaxAttemptsThrowsException() { + ClusterConfig config = new ClusterConfig(); + config.setClusterDataTaskMaxAttempts(-1); + assertThrows(IllegalArgumentException.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + } + + @Test + void zeroClusterDataTaskMaxAttemptsThrowsException() { + ClusterConfig config = new ClusterConfig(); + config.setClusterDataTaskMaxAttempts(0); + assertThrows(IllegalArgumentException.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + } + + @Test + void negativeClusterDataTaskTimeoutThrowsException() { + ClusterConfig config = new ClusterConfig(); + config.setClusterDataTaskTimeout(-1); + assertThrows(IllegalArgumentException.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + } - Exception exception = assertThrows(Exception.class, () -> { + @Test + void zeroClusterDataTaskTimeoutThrowsException() { + ClusterConfig config = new ClusterConfig(); + config.setClusterDataTaskTimeout(0); + assertThrows(IllegalArgumentException.class, () -> { try (CamelContext camelContext = createCamelContext(config)) { camelContext.start(); } }); - assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); } } diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java index 72290aaacbbb..b7f7bfbbde55 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java @@ -67,6 +67,8 @@ abstract class FileLockClusterServiceTestBase { service.setAcquireLockInterval(1); service.setRoot(clusterDir.toString()); service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier()); + service.setClusterDataTaskMaxAttempts(config.getClusterDataTaskMaxAttempts()); + service.setClusterDataTaskTimeout(config.getClusterDataTaskTimeout()); return service; } @@ -83,6 +85,8 @@ abstract class FileLockClusterServiceTestBase { private long acquireLockDelay = 1; private long timerRepeatCount = 5; private int heartbeatTimeoutMultiplier = 5; + private int clusterDataTaskMaxAttempts = 5; + private long clusterDataTaskTimeout = 5; long getAcquireLockDelay() { return acquireLockDelay; @@ -104,12 +108,28 @@ abstract class FileLockClusterServiceTestBase { return TimeUnit.SECONDS.toMillis(getAcquireLockDelay()) + 500; } - public int getHeartbeatTimeoutMultiplier() { + int getHeartbeatTimeoutMultiplier() { return heartbeatTimeoutMultiplier; } - public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { + void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier; } + + int getClusterDataTaskMaxAttempts() { + return this.clusterDataTaskMaxAttempts; + } + + void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) { + this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts; + } + + long getClusterDataTaskTimeout() { + return clusterDataTaskTimeout; + } + + void setClusterDataTaskTimeout(long clusterDataTaskTimeout) { + this.clusterDataTaskTimeout = clusterDataTaskTimeout; + } } } diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc b/docs/user-manual/modules/ROOT/pages/clustering.adoc index 669a56d13216..322443e064c1 100644 --- a/docs/user-manual/modules/ROOT/pages/clustering.adoc +++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc @@ -65,6 +65,9 @@ Configuration options: | acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit | acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long | acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit +| clusterDataTaskMaxAttempts | Sets how many times a cluster data task will run, counting both the first execution and subsequent retries in case of failure or timeout. This can be useful when the cluster data root is on network based file storage, where I/O operations may occasionally block for long or unpredictable periods | 5 | int +| clusterDataTaskTimeout | Sets the timeout for a cluster data task (reading or writing cluster data). Timeouts are useful when the cluster data root is on network storage, where I/O operations may occasionally block for long or unpredictable periods | 10 | long +| clusterDataTaskTimeoutUnit | The time unit for the clusterDataTaskTimeoutUnit | SECONDS | TimeUnit | heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to code 2s * 3 = 6s of silence before declaring the leader unavailable | 5 | int | rootPath | The file cluster root directory path | | String |===
