This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 5c845f3b09 Move all disk error logic to DiskErrorsHandler to enable pluggability 5c845f3b09 is described below commit 5c845f3b090d3a53fa8837a1d9e2c874abad1b4a Author: Stefan Miklosovic <smikloso...@apache.org> AuthorDate: Fri Feb 28 11:10:14 2025 +0100 Move all disk error logic to DiskErrorsHandler to enable pluggability patch by Stefan Miklosovic; reviewed by Tommy Stendahl, Brandon Williams for CASSANDRA-20363 --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 1 + .../apache/cassandra/db/commitlog/CommitLog.java | 21 +- .../cassandra/db/commitlog/CommitLogSegment.java | 2 +- .../org/apache/cassandra/io/FSErrorHandler.java | 31 --- .../org/apache/cassandra/io/util/FileUtils.java | 25 --- .../apache/cassandra/service/CassandraDaemon.java | 2 +- ...rHandler.java => DefaultDiskErrorsHandler.java} | 122 ++++++++++-- .../cassandra/service/DiskErrorsHandler.java | 77 +++++++ .../service/DiskErrorsHandlerService.java | 83 ++++++++ .../apache/cassandra/service/StorageService.java | 3 + .../cassandra/utils/JVMStabilityInspector.java | 31 +-- .../cassandra/distributed/impl/Instance.java | 8 +- .../cassandra/distributed/impl/InstanceKiller.java | 2 +- .../unit/org/apache/cassandra/ServerTestUtils.java | 3 + .../org/apache/cassandra/cql3/OutOfSpaceTest.java | 4 + .../org/apache/cassandra/db/DirectoriesTest.java | 4 +- .../commitlog/CommitLogInitWithExceptionTest.java | 3 + .../service/DefaultFSErrorHandlerTest.java | 3 +- .../cassandra/service/DiskErrorsHandlerTest.java | 221 +++++++++++++++++++++ .../cassandra/service/DiskFailurePolicyTest.java | 3 +- .../service/snapshot/MetadataSnapshotsTest.java | 5 +- .../cassandra/utils/JVMStabilityInspectorTest.java | 8 +- .../org/apache/cassandra/utils/KillerForTests.java | 2 +- 24 files changed, 525 insertions(+), 140 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c70b5e8c8b..a44294d606 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Move all disk error logic to DiskErrorsHandler to enable pluggability (CASSANDRA-20363) * Fix marking an SSTable as suspected and BufferPool leakage in case of a corrupted SSTable read during a compaction (CASSANDRA-20396) * Add missed documentation for CREATE TABLE LIKE (CASSANDRA-20401) * Add OCTET_LENGTH constraint (CASSANDRA-20340) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index e212a9f72c..515bee33bb 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -179,6 +179,7 @@ public enum CassandraRelevantProperties CONSISTENT_RANGE_MOVEMENT("cassandra.consistent.rangemovement", "true"), CONSISTENT_SIMULTANEOUS_MOVES_ALLOW("cassandra.consistent.simultaneousmoves.allow"), CRYPTO_PROVIDER_CLASS_NAME("cassandra.crypto_provider_class_name"), + CUSTOM_DISK_ERROR_HANDLER("cassandra.custom_disk_error_handler"), CUSTOM_GUARDRAILS_CONFIG_PROVIDER_CLASS("cassandra.custom_guardrails_config_provider_class"), CUSTOM_QUERY_HANDLER_CLASS("cassandra.custom_query_handler_class"), CUSTOM_TRACING_CLASS("cassandra.custom_tracing_class"), diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 30c8e08b87..76cf38a804 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -57,8 +57,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -576,24 +576,7 @@ public class CommitLog implements CommitLogMBean @VisibleForTesting public static boolean handleCommitError(String message, Throwable t) { - JVMStabilityInspector.inspectCommitLogThrowable(t); - switch (DatabaseDescriptor.getCommitFailurePolicy()) - { - // Needed here for unit tests to not fail on default assertion - case die: - case stop: - StorageService.instance.stopTransports(); - //$FALL-THROUGH$ - case stop_commit: - String errorMsg = String.format("%s. Commit disk failure policy is %s; terminating thread.", message, DatabaseDescriptor.getCommitFailurePolicy()); - logger.error(addAdditionalInformationIfPossible(errorMsg), t); - return false; - case ignore: - logger.error(addAdditionalInformationIfPossible(message), t); - return true; - default: - throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); - } + return DiskErrorsHandlerService.get().handleCommitError(message, t); } /** diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index e2aeef1dd2..d3c8e40de3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -387,7 +387,7 @@ public abstract class CommitLogSegment } catch (IOException e) { - if (!CommitLog.instance.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e)) + if (!CommitLog.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e)) throw new RuntimeException(e); } } diff --git a/src/java/org/apache/cassandra/io/FSErrorHandler.java b/src/java/org/apache/cassandra/io/FSErrorHandler.java deleted file mode 100644 index b7d283640b..0000000000 --- a/src/java/org/apache/cassandra/io/FSErrorHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io; - -import org.apache.cassandra.io.sstable.CorruptSSTableException; - -/** - * Interface for handling file system errors. - */ -public interface FSErrorHandler -{ - void handleCorruptSSTable(CorruptSSTableException e); - void handleFSError(FSError e); - default void handleStartupFSError(Throwable t) {} -} diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index cf6ea52be1..d5ea8dcde1 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -44,10 +44,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,9 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.FSError; -import org.apache.cassandra.io.FSErrorHandler; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.SyncUtil; @@ -78,7 +74,6 @@ public final class FileUtils public static final long ONE_TIB = 1024 * ONE_GIB; private static final DecimalFormat df = new DecimalFormat("#.##"); - private static final AtomicReference<Optional<FSErrorHandler>> fsErrorHandler = new AtomicReference<>(Optional.empty()); private static final Class clsDirectBuffer; private static final MethodHandle mhDirectBufferCleaner; @@ -468,21 +463,6 @@ public final class FileUtils } } - public static void handleCorruptSSTable(CorruptSSTableException e) - { - fsErrorHandler.get().ifPresent(handler -> handler.handleCorruptSSTable(e)); - } - - public static void handleFSError(FSError e) - { - fsErrorHandler.get().ifPresent(handler -> handler.handleFSError(e)); - } - - public static void handleStartupFSError(Throwable t) - { - fsErrorHandler.get().ifPresent(handler -> handler.handleStartupFSError(t)); - } - /** * handleFSErrorAndPropagate will invoke the disk failure policy error handler, * which may or may not stop the daemon or transports. However, if we don't exit, @@ -626,11 +606,6 @@ public final class FileUtils } } - public static void setFSErrorHandler(FSErrorHandler handler) - { - fsErrorHandler.getAndSet(Optional.ofNullable(handler)); - } - /** @deprecated See CASSANDRA-16926 */ @Deprecated(since = "4.1") public static void createDirectory(String directory) diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 8c44c4286d..57f38e93e9 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -224,7 +224,7 @@ public class CassandraDaemon */ protected void setup() { - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); // Since CASSANDRA-14793 the local system keyspaces data are not dispatched across the data directories // anymore to reduce the risks in case of disk failures. By consequence, the system need to ensure in case of diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultDiskErrorsHandler.java similarity index 60% rename from src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java rename to src/java/org/apache/cassandra/service/DefaultDiskErrorsHandler.java index 8b182942b2..cc350861f4 100644 --- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java +++ b/src/java/org/apache/cassandra/service/DefaultDiskErrorsHandler.java @@ -18,28 +18,44 @@ package org.apache.cassandra.service; - +import java.nio.file.FileStore; import java.util.Set; import com.google.common.collect.ImmutableSet; - -import org.apache.cassandra.io.util.File; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DisallowedDirectories; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.*; +import org.apache.cassandra.io.FSDiskFullWriteError; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSNoDiskAvailableForWriteError; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.utils.JVMStabilityInspector; -public class DefaultFSErrorHandler implements FSErrorHandler +public class DefaultDiskErrorsHandler implements DiskErrorsHandler { - private static final Logger logger = LoggerFactory.getLogger(DefaultFSErrorHandler.class); + private static final Logger logger = LoggerFactory.getLogger(DefaultDiskErrorsHandler.class); private static final Set<Class<?>> exceptionsSkippingDataRemoval = ImmutableSet.of(OutOfMemoryError.class); + @Override + public void init() + { + // intentionally empty + } + + @Override + public void close() throws Exception + { + // intentionally empty + } + @Override public void handleCorruptSSTable(CorruptSSTableException e) { @@ -102,6 +118,68 @@ public class DefaultFSErrorHandler implements FSErrorHandler } } + @Override + public void handleStartupFSError(Throwable t) + { + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + case stop: + case die: + logger.error("Exiting forcefully due to file system exception on startup, disk failure policy \"{}\"", + DatabaseDescriptor.getDiskFailurePolicy(), + t); + JVMStabilityInspector.killCurrentJVM(t, true); + break; + default: + break; + } + } + + @Override + public boolean handleCommitError(String message, Throwable t) + { + JVMStabilityInspector.inspectCommitLogThrowable(t); + switch (DatabaseDescriptor.getCommitFailurePolicy()) + { + // Needed here for unit tests to not fail on default assertion + case die: + case stop: + StorageService.instance.stopTransports(); + //$FALL-THROUGH$ + case stop_commit: + String errorMsg = String.format("%s. Commit disk failure policy is %s; terminating thread.", message, DatabaseDescriptor.getCommitFailurePolicy()); + logger.error(addAdditionalInformationIfPossible(errorMsg), t); + return false; + case ignore: + logger.error(addAdditionalInformationIfPossible(message), t); + return true; + default: + throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); + } + } + + @Override + public void inspectDiskError(Throwable t) + { + if (t instanceof CorruptSSTableException) + handleCorruptSSTable((CorruptSSTableException) t); + else if (t instanceof FSError) + handleFSError((FSError) t); + } + + @Override + public void inspectCommitLogError(Throwable t) + { + if (!StorageService.instance.isDaemonSetupCompleted()) + { + logger.error("Exiting due to error while processing commit log during initialization.", t); + JVMStabilityInspector.killCurrentJVM(t, true); + } + else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) + JVMStabilityInspector.killCurrentJVM(t, false); + } + private boolean shouldMaybeRemoveData(Throwable error) { for (Throwable t = error; t != null; t = t.getCause()) @@ -118,21 +196,27 @@ public class DefaultFSErrorHandler implements FSErrorHandler return true; } - @Override - public void handleStartupFSError(Throwable t) + /** + * Add additional information to the error message if the commit directory does not have enough free space. + * + * @param msg the original error message + * @return the message with additional information if possible + */ + private static String addAdditionalInformationIfPossible(String msg) { - switch (DatabaseDescriptor.getDiskFailurePolicy()) + long unallocatedSpace = freeDiskSpace(); + int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize(); + + if (unallocatedSpace < segmentSize) { - case stop_paranoid: - case stop: - case die: - logger.error("Exiting forcefully due to file system exception on startup, disk failure policy \"{}\"", - DatabaseDescriptor.getDiskFailurePolicy(), - t); - JVMStabilityInspector.killCurrentJVM(t, true); - break; - default: - break; + return String.format("%s. %d bytes required for next commitlog segment but only %d bytes available. Check %s to see if not enough free space is the reason for this error.", + msg, segmentSize, unallocatedSpace, DatabaseDescriptor.getCommitLogLocation()); } + return msg; + } + + private static long freeDiskSpace() + { + return PathUtils.tryGetSpace(new File(DatabaseDescriptor.getCommitLogLocation()).toPath(), FileStore::getTotalSpace); } } diff --git a/src/java/org/apache/cassandra/service/DiskErrorsHandler.java b/src/java/org/apache/cassandra/service/DiskErrorsHandler.java new file mode 100644 index 0000000000..b4fe9d67db --- /dev/null +++ b/src/java/org/apache/cassandra/service/DiskErrorsHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +public interface DiskErrorsHandler extends AutoCloseable +{ + void init(); + + void handleCorruptSSTable(CorruptSSTableException e); + + void handleFSError(FSError e); + + boolean handleCommitError(String message, Throwable t); + + void handleStartupFSError(Throwable t); + + void inspectDiskError(Throwable t); + + void inspectCommitLogError(Throwable t); + + class NoOpDiskErrorHandler implements DiskErrorsHandler + { + public static final DiskErrorsHandler NO_OP = new NoOpDiskErrorHandler(); + + @VisibleForTesting + NoOpDiskErrorHandler() {} + + @Override + public void inspectCommitLogError(Throwable t) {} + + @Override + public boolean handleCommitError(String message, Throwable t) + { + // tracks what DefaultDiskErrorsHandler does when commit_failure_policy = ignore + return true; + } + + @Override + public void handleCorruptSSTable(CorruptSSTableException e) {} + + @Override + public void handleFSError(FSError e) {} + + @Override + public void handleStartupFSError(Throwable t) {} + + @Override + public void inspectDiskError(Throwable t) {} + + @Override + public void init() {} + + @Override + public void close() throws Exception {} + } +} diff --git a/src/java/org/apache/cassandra/service/DiskErrorsHandlerService.java b/src/java/org/apache/cassandra/service/DiskErrorsHandlerService.java new file mode 100644 index 0000000000..97e7ecde5f --- /dev/null +++ b/src/java/org/apache/cassandra/service/DiskErrorsHandlerService.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.service.DiskErrorsHandler.NoOpDiskErrorHandler.NO_OP; + +public class DiskErrorsHandlerService +{ + private static final Logger logger = LoggerFactory.getLogger(DiskErrorsHandlerService.class); + + private static volatile DiskErrorsHandler instance = NO_OP; + + @VisibleForTesting + public static synchronized void set(DiskErrorsHandler newInstance) + { + if (newInstance == null) + return; + + DiskErrorsHandler oldInstance = DiskErrorsHandlerService.instance; + + try + { + newInstance.init(); + instance = newInstance; + + try + { + oldInstance.close(); + } + catch (Throwable t) + { + logger.warn("Exception occured while closing disk error handler of class " + oldInstance.getClass().getName(), t); + } + } + catch (Throwable t) + { + logger.warn("Exception occured while initializing disk error handler of class " + newInstance.getClass().getName(), t); + } + } + + public static DiskErrorsHandler get() + { + return instance; + } + + public static void close() throws Throwable + { + get().close(); + } + + public static void configure() throws ConfigurationException + { + String fsErrorHandlerClass = CassandraRelevantProperties.CUSTOM_DISK_ERROR_HANDLER.getString(); + DiskErrorsHandler fsErrorHandler = fsErrorHandlerClass == null + ? new DefaultDiskErrorsHandler() + : FBUtilities.construct(fsErrorHandlerClass, "disk error handler"); + DiskErrorsHandlerService.set(fsErrorHandler); + } +} diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ab0cb41ee3..1aae64970e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3838,6 +3838,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // wait for miscellaneous tasks like sstable and commitlog segment deletion ColumnFamilyStore.shutdownPostFlushExecutor(); + if (isFinalShutdown) + DiskErrorsHandlerService.get().close(); + try { // we are not shutting down ScheduledExecutors#scheduledFastTasks to be still able to progress time diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 983e75f252..fdd678efd0 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.tracing.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,6 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -86,20 +86,12 @@ public final class JVMStabilityInspector */ public static void inspectThrowable(Throwable t) throws OutOfMemoryError { - inspectThrowable(t, JVMStabilityInspector::inspectDiskError); + inspectThrowable(t, DiskErrorsHandlerService.get()::inspectDiskError); } public static void inspectCommitLogThrowable(Throwable t) { - inspectThrowable(t, JVMStabilityInspector::inspectCommitLogError); - } - - private static void inspectDiskError(Throwable t) - { - if (t instanceof CorruptSSTableException) - FileUtils.handleCorruptSSTable((CorruptSSTableException) t); - else if (t instanceof FSError) - FileUtils.handleFSError((FSError) t); + inspectThrowable(t, ex -> DiskErrorsHandlerService.get().inspectCommitLogError(ex)); } public static void inspectThrowable(Throwable t, Consumer<Throwable> fn) throws OutOfMemoryError @@ -156,7 +148,7 @@ public final class JVMStabilityInspector if (isUnstable) { if (!StorageService.instance.isDaemonSetupCompleted()) - FileUtils.handleStartupFSError(t); + DiskErrorsHandlerService.get().handleStartupFSError(t); killer.killCurrentJVM(t); } @@ -197,17 +189,6 @@ public final class JVMStabilityInspector } } - private static void inspectCommitLogError(Throwable t) - { - if (!StorageService.instance.isDaemonSetupCompleted()) - { - logger.error("Exiting due to error while processing commit log during initialization.", t); - killer.killCurrentJVM(t, true); - } - else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) - killer.killCurrentJVM(t); - } - public static void killCurrentJVM(Throwable t, boolean quiet) { killer.killCurrentJVM(t, quiet); @@ -249,12 +230,12 @@ public final class JVMStabilityInspector * @param t * The Throwable to log before killing the current JVM */ - protected void killCurrentJVM(Throwable t) + public void killCurrentJVM(Throwable t) { killCurrentJVM(t, false); } - protected void killCurrentJVM(Throwable t, boolean quiet) + public void killCurrentJVM(Throwable t, boolean quiet) { if (!quiet) { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index b98b36c47c..a7cb238119 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -109,7 +109,6 @@ import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.CassandraMetricsRegistry; @@ -124,7 +123,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; -import org.apache.cassandra.service.DefaultFSErrorHandler; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageServiceMBean; @@ -715,7 +714,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance logSystemInfo(inInstancelogger); Config.log(DatabaseDescriptor.getRawConfig()); - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); DatabaseDescriptor.createAllDirectories(); CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded(); @@ -956,7 +955,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())), () -> ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES), () -> EpochAwareDebounce.instance.close(), - SnapshotManager.instance::close + SnapshotManager.instance::close, + DiskErrorsHandlerService::close ); internodeMessagingStarted = false; diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java index 38b045b381..6f2576cebc 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java @@ -45,7 +45,7 @@ public class InstanceKiller extends JVMStabilityInspector.Killer } @Override - protected void killCurrentJVM(Throwable t, boolean quiet) + public void killCurrentJVM(Throwable t, boolean quiet) { KILL_ATTEMPTS.incrementAndGet(); onKill.accept(quiet); diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index 596312ff0b..ead4a1a558 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -48,6 +48,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.BaseProximity; import org.apache.cassandra.security.ThreadAwareSecurityManager; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.service.EmbeddedCassandraService; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; import org.apache.cassandra.tcm.ClusterMetadata; @@ -178,6 +179,8 @@ public final class ServerTestUtils SystemKeyspace.persistLocalMetadata(); AuditLogManager.instance.initialize(); + DiskErrorsHandlerService.configure(); + isServerPrepared = true; } diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java index 30e12ea173..072ee08720 100644 --- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java @@ -36,6 +36,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; @@ -59,6 +61,8 @@ public class OutOfSpaceTest extends CQLTester DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); ServerTestUtils.prepareServerNoRegister(); ServerTestUtils.markCMS(); + StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting()); + CassandraDaemon.getInstanceForTesting().completeSetup(); } @Test diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index 06db7e4a02..4963125034 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -95,7 +95,7 @@ import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.DefaultFSErrorHandler; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.service.snapshot.SnapshotLoader; import org.apache.cassandra.service.snapshot.SnapshotManifest; import org.apache.cassandra.service.snapshot.TableSnapshot; @@ -151,7 +151,7 @@ public class DirectoriesTest public static void beforeClass() { DatabaseDescriptor.daemonInitialization(); - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); } @Before diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogInitWithExceptionTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogInitWithExceptionTest.java index b3cff94c66..05acf17f8d 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogInitWithExceptionTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogInitWithExceptionTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.commitlog; import org.apache.cassandra.Util; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.apache.cassandra.utils.concurrent.Condition; import org.junit.Assert; import org.junit.BeforeClass; @@ -62,6 +63,8 @@ public class CommitLogInitWithExceptionTest killed.signal(); } }; + + DiskErrorsHandlerService.configure(); } @Test diff --git a/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java index b1da62fcdc..652af81916 100644 --- a/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java +++ b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java @@ -35,7 +35,6 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.FSErrorHandler; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; @@ -46,7 +45,7 @@ import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) public class DefaultFSErrorHandlerTest { - private FSErrorHandler handler = new DefaultFSErrorHandler(); + private DiskErrorsHandler handler = new DefaultDiskErrorsHandler(); Config.DiskFailurePolicy oldDiskPolicy; Config.DiskFailurePolicy testDiskPolicy; private boolean gossipRunningFSError; diff --git a/test/unit/org/apache/cassandra/service/DiskErrorsHandlerTest.java b/test/unit/org/apache/cassandra/service/DiskErrorsHandlerTest.java new file mode 100644 index 0000000000..6465164fe0 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/DiskErrorsHandlerTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.junit.Test; + +import org.apache.cassandra.distributed.shared.WithProperties; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_DISK_ERROR_HANDLER; +import static org.apache.cassandra.service.DiskErrorsHandlerService.get; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class DiskErrorsHandlerTest +{ + @Test + public void testSetting() throws Throwable + { + try (WithProperties ignore = new WithProperties().set(CUSTOM_DISK_ERROR_HANDLER, + HandlerA.class.getName())) + { + DiskErrorsHandlerService.configure(); + + assertSame(HandlerA.class, get().getClass()); + + assertTrue(HandlerA.initialized); + assertFalse(HandlerA.closed); + assertFalse(HandlerB.initialized); + assertFalse(HandlerB.closed); + } + + try (WithProperties ignore = new WithProperties().set(CUSTOM_DISK_ERROR_HANDLER, + HandlerB.class.getName())) + { + DiskErrorsHandlerService.configure(); + + assertTrue(HandlerA.initialized); + assertTrue(HandlerA.closed); + + assertTrue(HandlerB.initialized); + assertFalse(HandlerB.closed); + + assertSame(HandlerB.class, get().getClass()); + + get().close(); + + assertTrue(HandlerB.closed); + } + } + + @Test + public void testFailures() + { + // failed closing + try (WithProperties ignore = new WithProperties().set(CUSTOM_DISK_ERROR_HANDLER, + HandlerC.class.getName())) + { + DiskErrorsHandlerService.configure(); + assertTrue(HandlerC.initialized); + assertSame(HandlerC.class, get().getClass()); + } + + // this will call close() on C handler + try (WithProperties ignore = new WithProperties().set(CUSTOM_DISK_ERROR_HANDLER, + HandlerE.class.getName())) + { + DiskErrorsHandlerService.configure(); + assertTrue(HandlerE.initialized); + assertSame(HandlerE.class, get().getClass()); + } + + try (WithProperties ignore = new WithProperties().set(CUSTOM_DISK_ERROR_HANDLER, + HandlerD.class.getName())) + { + DiskErrorsHandlerService.configure(); + // still handler E as handler D failed to init + assertSame(HandlerE.class, get().getClass()); + } + } + + public static class HandlerA extends DummyErrorHandler + { + public static boolean initialized = false; + public static boolean closed = false; + + @Override + public void init() + { + initialized = true; + } + + @Override + public void close() throws Exception + { + closed = true; + } + } + + public static class HandlerB extends DummyErrorHandler + { + public static boolean initialized = false; + public static boolean closed = false; + + @Override + public void init() + { + initialized = true; + } + + @Override + public void close() throws Exception + { + closed = true; + } + } + + public static class HandlerC extends DummyErrorHandler + { + public static boolean initialized = false; + + @Override + public void init() + { + initialized = true; + } + + @Override + public void close() throws Exception + { + throw new RuntimeException("failed to close"); + } + } + + public static class HandlerD extends DummyErrorHandler + { + public static boolean closed = false; + + @Override + public void init() + { + throw new RuntimeException("failed to init"); + } + + @Override + public void close() throws Exception + { + closed = true; + } + } + + public static class HandlerE extends DummyErrorHandler + { + public static boolean initialized = false; + public static boolean closed = false; + + @Override + public void init() + { + initialized = true; + } + + @Override + public void close() throws Exception + { + closed = true; + } + } + + private static abstract class DummyErrorHandler implements DiskErrorsHandler + { + @Override + public void handleCorruptSSTable(CorruptSSTableException e) + { + } + + @Override + public void handleFSError(FSError e) + { + } + + @Override + public void handleStartupFSError(Throwable t) + { + } + + @Override + public void inspectDiskError(Throwable t) + { + } + + @Override + public void inspectCommitLogError(Throwable t) + { + } + + @Override + public boolean handleCommitError(String message, Throwable t) + { + return true; + } + } +} diff --git a/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java index 832c631eed..f1485d0ac6 100644 --- a/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java +++ b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java @@ -38,7 +38,6 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; @@ -72,7 +71,7 @@ public class DiskFailurePolicyTest JOIN_RING.setBoolean(false); // required to start gossiper without setting tokens SchemaLoader.prepareServer(); StorageService.instance.initServer(); - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); } public DiskFailurePolicyTest(DiskFailurePolicy testPolicy, boolean isStartUpInProgress, Throwable t, diff --git a/test/unit/org/apache/cassandra/service/snapshot/MetadataSnapshotsTest.java b/test/unit/org/apache/cassandra/service/snapshot/MetadataSnapshotsTest.java index ee996e3a33..ab2ae50bf5 100644 --- a/test/unit/org/apache/cassandra/service/snapshot/MetadataSnapshotsTest.java +++ b/test/unit/org/apache/cassandra/service/snapshot/MetadataSnapshotsTest.java @@ -32,8 +32,7 @@ import org.junit.rules.TemporaryFolder; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.service.DefaultFSErrorHandler; +import org.apache.cassandra.service.DiskErrorsHandlerService; import static org.apache.cassandra.service.snapshot.TableSnapshotTest.createFolders; import static org.apache.cassandra.utils.FBUtilities.now; @@ -50,7 +49,7 @@ public class MetadataSnapshotsTest CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.setInt(3); DatabaseDescriptor.daemonInitialization(); - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); } @ClassRule diff --git a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java index 3a3415e4d4..172e5eb584 100644 --- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java +++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java @@ -30,10 +30,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.service.DiskErrorsHandler; +import org.apache.cassandra.service.DiskErrorsHandlerService; import org.assertj.core.api.Assertions; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.DefaultFSErrorHandler; import org.apache.cassandra.service.StorageService; import static java.util.Arrays.asList; @@ -59,7 +59,7 @@ public class JVMStabilityInspectorTest Config.DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); Config.CommitFailurePolicy oldCommitPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + DiskErrorsHandlerService.configure(); try { CassandraDaemon daemon = new CassandraDaemon(); @@ -114,7 +114,7 @@ public class JVMStabilityInspectorTest DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); DatabaseDescriptor.setCommitFailurePolicy(oldCommitPolicy); StorageService.instance.registerDaemon(null); - FileUtils.setFSErrorHandler(null); + DiskErrorsHandlerService.set(DiskErrorsHandler.NoOpDiskErrorHandler.NO_OP); } } diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java b/test/unit/org/apache/cassandra/utils/KillerForTests.java index b6c48d52e8..6f7f3c7b10 100644 --- a/test/unit/org/apache/cassandra/utils/KillerForTests.java +++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java @@ -40,7 +40,7 @@ public class KillerForTests extends JVMStabilityInspector.Killer } @Override - protected void killCurrentJVM(Throwable t, boolean quiet) + public void killCurrentJVM(Throwable t, boolean quiet) { if (!expected) Assert.fail("Saw JVM Kill but did not expect it."); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org