Backport CASSANDRA-11578 patch by yukim; reviewed by Paulo Motta for CASSANDRA-11750
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b851792c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b851792c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b851792c Branch: refs/heads/cassandra-2.2 Commit: b851792c4e3ae32b8d863d9079cca6d135f1cf23 Parents: 5dc7414 Author: Yuki Morishita <yu...@apache.org> Authored: Wed May 18 17:03:39 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu May 26 09:40:40 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/io/FSErrorHandler.java | 30 ++++++ .../org/apache/cassandra/io/util/FileUtils.java | 75 +++----------- .../cassandra/service/CassandraDaemon.java | 2 + .../service/DefaultFSErrorHandler.java | 101 +++++++++++++++++++ .../apache/cassandra/db/DirectoriesTest.java | 2 + 6 files changed, 152 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f73db6e..ad9d00c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Backport CASSANDRA-11578 (CASSANDRA-11750) * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840) * Do not consider local node a valid source during replace (CASSANDRA-11848) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/src/java/org/apache/cassandra/io/FSErrorHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/FSErrorHandler.java b/src/java/org/apache/cassandra/io/FSErrorHandler.java new file mode 100644 index 0000000..081ec0b --- /dev/null +++ b/src/java/org/apache/cassandra/io/FSErrorHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +/** + * Interface for handling file system errors. + */ +public interface FSErrorHandler +{ + void handleCorruptSSTable(CorruptSSTableException e); + void handleFSError(FSError e); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 3be7c99..f69ed01 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -32,6 +32,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.text.DecimalFormat; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; import sun.nio.ch.DirectBuffer; @@ -39,17 +40,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.BlacklistedDirectories; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSErrorHandler; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.JVMStabilityInspector; -public class FileUtils +public final class FileUtils { private static final Logger logger = LoggerFactory.getLogger(FileUtils.class); private static final double KB = 1024d; @@ -59,6 +57,7 @@ public class FileUtils private static final DecimalFormat df = new DecimalFormat("#.##"); private static final boolean canCleanDirectBuffers; + private static final AtomicReference<FSErrorHandler> fsErrorHandler = new AtomicReference<>(); static { @@ -395,63 +394,16 @@ public class FileUtils public static void handleCorruptSSTable(CorruptSSTableException e) { - if (!StorageService.instance.isSetupCompleted()) - handleStartupFSError(e); - - JVMStabilityInspector.inspectThrowable(e); - switch (DatabaseDescriptor.getDiskFailurePolicy()) - { - case stop_paranoid: - StorageService.instance.stopTransports(); - break; - } - } - - public static void handleFSError(FSError e) - { - if (!StorageService.instance.isSetupCompleted()) - handleStartupFSError(e); - - JVMStabilityInspector.inspectThrowable(e); - switch (DatabaseDescriptor.getDiskFailurePolicy()) - { - case stop_paranoid: - case stop: - StorageService.instance.stopTransports(); - break; - case best_effort: - // for both read and write errors mark the path as unwritable. - BlacklistedDirectories.maybeMarkUnwritable(e.path); - if (e instanceof FSReadError) - { - File directory = BlacklistedDirectories.maybeMarkUnreadable(e.path); - if (directory != null) - Keyspace.removeUnreadableSSTables(directory); - } - break; - case ignore: - // already logged, so left nothing to do - break; - default: - throw new IllegalStateException(); - } + FSErrorHandler handler = fsErrorHandler.get(); + if (handler != null) + handler.handleCorruptSSTable(e); } - private static void handleStartupFSError(Throwable t) + public static void handleFSError(FSError e) { - switch (DatabaseDescriptor.getDiskFailurePolicy()) - { - case stop_paranoid: - case stop: - case die: - logger.error("Exiting forcefully due to file system exception on startup, disk failure policy \"{}\"", - DatabaseDescriptor.getDiskFailurePolicy(), - t); - JVMStabilityInspector.killCurrentJVM(t, true); - break; - default: - break; - } + FSErrorHandler handler = fsErrorHandler.get(); + if (handler != null) + handler.handleFSError(e); } /** * Get the size of a directory in bytes @@ -470,4 +422,9 @@ public class FileUtils } return length; } + + public static void setFSErrorHandler(FSErrorHandler handler) + { + fsErrorHandler.getAndSet(handler); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index a27fa20..5a1fb14 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -148,6 +148,8 @@ public class CassandraDaemon */ protected void setup() { + FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); + try { logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java new file mode 100644 index 0000000..88a1fce --- /dev/null +++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.File; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BlacklistedDirectories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSErrorHandler; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.utils.JVMStabilityInspector; + +public class DefaultFSErrorHandler implements FSErrorHandler +{ + private static final Logger logger = LoggerFactory.getLogger(DefaultFSErrorHandler.class); + + @Override + public void handleCorruptSSTable(CorruptSSTableException e) + { + if (!StorageService.instance.isSetupCompleted()) + handleStartupFSError(e); + + JVMStabilityInspector.inspectThrowable(e); + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + StorageService.instance.stopTransports(); + break; + } + } + + @Override + public void handleFSError(FSError e) + { + if (!StorageService.instance.isSetupCompleted()) + handleStartupFSError(e); + + JVMStabilityInspector.inspectThrowable(e); + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + case stop: + StorageService.instance.stopTransports(); + break; + case best_effort: + // for both read and write errors mark the path as unwritable. + BlacklistedDirectories.maybeMarkUnwritable(e.path); + if (e instanceof FSReadError) + { + File directory = BlacklistedDirectories.maybeMarkUnreadable(e.path); + if (directory != null) + Keyspace.removeUnreadableSSTables(directory); + } + break; + case ignore: + // already logged, so left nothing to do + break; + default: + throw new IllegalStateException(); + } + } + + private static void handleStartupFSError(Throwable t) + { + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + case stop: + case die: + logger.error("Exiting forcefully due to file system exception on startup, disk failure policy \"{}\"", + DatabaseDescriptor.getDiskFailurePolicy(), + t); + JVMStabilityInspector.killCurrentJVM(t, true); + break; + default: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b851792c/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index b1c51ee..4267c1f 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -44,6 +44,7 @@ import org.apache.cassandra.db.Directories.DataDirectory; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.DefaultFSErrorHandler; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.io.FSWriteError; @@ -65,6 +66,7 @@ public class DirectoriesTest @BeforeClass public static void beforeClass() throws IOException { + FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); for (String cf : CFS) { CFM.add(new CFMetaData(KS, cf, ColumnFamilyType.Standard, null));