This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new be574fc34b Fix default file system error handler for 
disk_failure_policy die
be574fc34b is described below

commit be574fc34ba9834929f1618ab63dd74446cd2683
Author: Brandon Williams <brandonwilli...@apache.org>
AuthorDate: Thu Mar 9 09:53:58 2023 -0600

    Fix default file system error handler for disk_failure_policy die
    
    Patch by Runtian Liu; reviewed by brandonwilliams and smiklosovic for
    CASSANDRA-18294
---
 CHANGES.txt                                        |   1 +
 .../cassandra/service/DefaultFSErrorHandler.java   |   2 +
 ...ava => JVMStabilityInspectorThrowableTest.java} |  72 ++++++++---
 .../service/DefaultFSErrorHandlerTest.java         | 121 ++++++++++++++++++
 .../cassandra/service/DiskFailurePolicyTest.java   | 135 +++++++++++++++++++++
 .../org/apache/cassandra/utils/KillerForTests.java |   5 +
 6 files changed, 321 insertions(+), 15 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7994824ad3..aa1e60427a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.29
+ * Fix default file system error handler for disk_failure_policy die 
(CASSANDRA-18294)
  * Introduce check for names of test classes (CASSANDRA-17964)
  * Suppress CVE-2022-41915 (CASSANDRA-18147)
  * Suppress CVE-2021-1471, CVE-2021-3064, CVE-2021-4235 (CASSANDRA-18149)
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java 
b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index 1c81f65e44..00029e3f40 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -44,6 +44,7 @@ public class DefaultFSErrorHandler implements FSErrorHandler
 
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
+            case die:
             case stop_paranoid:
                 // exception not logged here on purpose as it is already logged
                 logger.error("Stopping transports as disk_failure_policy is " 
+ DatabaseDescriptor.getDiskFailurePolicy());
@@ -60,6 +61,7 @@ public class DefaultFSErrorHandler implements FSErrorHandler
 
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
+            case die:
             case stop_paranoid:
             case stop:
                 // exception not logged here on purpose as it is already logged
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
similarity index 70%
rename from 
test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
index 98ca496ffc..d7aeccaf6f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorCorruptSSTableExceptionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/JVMStabilityInspectorThrowableTest.java
@@ -19,11 +19,15 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,6 +43,8 @@ import 
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableCallab
 import org.apache.cassandra.distributed.shared.AbstractBuilder;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -51,21 +57,46 @@ import static 
org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
-public class JVMStabilityInspectorCorruptSSTableExceptionTest extends 
TestBaseImpl
+@RunWith(Parameterized.class)
+public class JVMStabilityInspectorThrowableTest extends TestBaseImpl
 {
-    @Test
-    public void 
testAbstractLocalAwareExecutorServiceOnIgnoredDiskFailurePolicy() throws 
Exception
+    private DiskFailurePolicy testPolicy;
+    private boolean testCorrupted;
+    private boolean expectNativeTransportRunning;;
+    private boolean expectGossiperEnabled;
+
+    public JVMStabilityInspectorThrowableTest(DiskFailurePolicy policy, 
boolean testCorrupted,
+                                              boolean 
expectNativeTransportRunning, boolean expectGossiperEnabled)
+    {
+        this.testPolicy = policy;
+        this.testCorrupted = testCorrupted;
+        this.expectNativeTransportRunning = expectNativeTransportRunning;
+        this.expectGossiperEnabled = expectGossiperEnabled;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> generateData()
     {
-        test(DiskFailurePolicy.ignore, true, true);
+        return Arrays.asList(new Object[][]{
+                             { DiskFailurePolicy.ignore, true, true, true},
+                             { DiskFailurePolicy.stop, true, true,  true},
+                             { DiskFailurePolicy.stop_paranoid, true, false, 
false},
+                             { DiskFailurePolicy.best_effort, true, true, 
true},
+                             { DiskFailurePolicy.ignore, false, true, true},
+                             { DiskFailurePolicy.stop, false, false, false},
+                             { DiskFailurePolicy.stop_paranoid, false, false, 
false},
+                             { DiskFailurePolicy.best_effort, false, true, 
true}
+                             }
+        );
     }
 
     @Test
-    public void 
testAbstractLocalAwareExecutorServiceOnStopParanoidDiskFailurePolicy() throws 
Exception
+    public void testAbstractLocalAwareExecutorServiceOnPolicies() throws 
Exception
     {
-        test(DiskFailurePolicy.stop_paranoid, false, false);
+        test(testPolicy, testCorrupted, expectNativeTransportRunning, 
expectGossiperEnabled);
     }
 
-    private static void test(DiskFailurePolicy policy, boolean 
expectNativeTransportRunning, boolean expectGossiperEnabled) throws Exception
+    private static void test(DiskFailurePolicy policy, boolean 
shouldTestCorrupted, boolean expectNativeTransportRunning, boolean 
expectGossiperEnabled) throws Exception
     {
         String table = policy.name();
         try (final Cluster cluster = init(getCluster(policy).start()))
@@ -84,16 +115,16 @@ public class 
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
 
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + table + " 
(id bigint PRIMARY KEY)");
             node.executeInternal("INSERT INTO " + KEYSPACE + "." + table + " 
(id) VALUES (?)", 0L);
-            corruptTable(node, KEYSPACE, table);
+            throwThrowable(node, KEYSPACE, table, shouldTestCorrupted);
 
             try
             {
                 cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + 
'.' + table + " WHERE id=?", ConsistencyLevel.ONE, 0L);
-                Assert.fail("Select should fail as we corrupted SSTable on 
purpose.");
+                Assert.fail("Select should fail as we expect corrupted sstable 
or FS error.");
             }
             catch (final Exception ex)
             {
-                // we expect that above query fails as we corrupted an sstable
+                // we expect that above query fails as we corrupted an sstable 
or throw FS error when read
             }
 
             waitForStop(!expectGossiperEnabled, node, new 
SerializableCallable<Boolean>()
@@ -154,7 +185,7 @@ public class 
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
         }
     }
 
-    private static void corruptTable(IInvokableInstance node, String keyspace, 
String table)
+    private static void throwThrowable(IInvokableInstance node, String 
keyspace, String table, boolean shouldTestCorrupted)
     {
         node.runOnInstance(() -> {
             ColumnFamilyStore cf = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
@@ -163,7 +194,7 @@ public class 
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
             Set<SSTableReader> remove = cf.getLiveSSTables();
             Set<SSTableReader> replace = new HashSet<>();
             for (SSTableReader r : remove)
-                replace.add(new CorruptedSSTableReader(r));
+                replace.add(new CorruptedSSTableReader(r, 
shouldTestCorrupted));
 
             cf.getTracker().removeUnsafe(remove);
             cf.addSSTables(replace);
@@ -180,26 +211,37 @@ public class 
JVMStabilityInspectorCorruptSSTableExceptionTest extends TestBaseIm
 
     private static final class CorruptedSSTableReader extends 
ForwardingSSTableReader
     {
-        public CorruptedSSTableReader(SSTableReader delegate)
+        private boolean shouldThrowCorrupted;
+        public CorruptedSSTableReader(SSTableReader delegate, boolean 
shouldThrowCorrupted)
         {
             super(delegate);
+            this.shouldThrowCorrupted = shouldThrowCorrupted;
         }
 
         @Override
         public SliceableUnfilteredRowIterator iterator(DecoratedKey key, 
ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, 
SSTableReadsListener listener)
         {
-            throw throwCorrupted();
+            if (shouldThrowCorrupted)
+                throw throwCorrupted();
+            throw throwFSError();
         }
 
         @Override
         public SliceableUnfilteredRowIterator iterator(FileDataInput file, 
DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, 
boolean reversed, boolean isForThrift)
         {
-            throw throwCorrupted();
+            if (shouldThrowCorrupted)
+                throw throwCorrupted();
+            throw throwFSError();
         }
 
         private CorruptSSTableException throwCorrupted()
         {
             throw new CorruptSSTableException(new IOException("failed to get 
position"), descriptor.baseFilename());
         }
+
+        private FSError throwFSError()
+        {
+            throw new FSReadError(new IOException("failed to get position"), 
descriptor.baseFilename());
+        }
     }
 }
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java 
b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
new file mode 100644
index 0000000000..f6ed9da7ed
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/DefaultFSErrorHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSErrorHandler;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class DefaultFSErrorHandlerTest
+{
+    private FSErrorHandler handler = new DefaultFSErrorHandler();
+    Config.DiskFailurePolicy oldDiskPolicy;
+    Config.DiskFailurePolicy testDiskPolicy;
+    private boolean gossipRunningFSError;
+    private boolean gossipRunningCorruptedSStableException;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise FS 
error will kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+        StorageService.instance.initServer();
+    }
+
+    @AfterClass
+    public static void shutdown()
+    {
+        StorageService.instance.stopClient();
+    }
+
+    @Before
+    public void setup()
+    {
+        StorageService.instance.startGossiping();
+        assertTrue(Gossiper.instance.isEnabled());
+        oldDiskPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+    }
+
+    public DefaultFSErrorHandlerTest(Config.DiskFailurePolicy policy,
+                                     boolean gossipRunningFSError,
+                                     boolean 
gossipRunningCorruptedSStableException)
+    {
+        this.testDiskPolicy = policy;
+        this.gossipRunningFSError = gossipRunningFSError;
+        this.gossipRunningCorruptedSStableException = 
gossipRunningCorruptedSStableException;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][]{
+                             { Config.DiskFailurePolicy.die, false, false},
+                             { Config.DiskFailurePolicy.ignore, true, true},
+                             { Config.DiskFailurePolicy.stop, false,  true},
+                             { Config.DiskFailurePolicy.stop_paranoid, false, 
false},
+                             { Config.DiskFailurePolicy.best_effort, true, 
true}
+                             }
+        );
+    }
+
+    @After
+    public void teardown()
+    {
+        DatabaseDescriptor.setDiskFailurePolicy(oldDiskPolicy);
+    }
+
+    @Test
+    public void testFSErrors()
+    {
+        DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
+        handler.handleFSError(new FSReadError(new IOException(), "blah"));
+        assertEquals(gossipRunningFSError, Gossiper.instance.isEnabled());
+    }
+
+    @Test
+    public void testCorruptSSTableException()
+    {
+        DatabaseDescriptor.setDiskFailurePolicy(testDiskPolicy);
+        handler.handleCorruptSSTable(new CorruptSSTableException(new 
IOException(), "blah"));
+        assertEquals(gossipRunningCorruptedSStableException, 
Gossiper.instance.isEnabled());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java 
b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java
new file mode 100644
index 0000000000..90e85e9cc0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/DiskFailurePolicyTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+@RunWith(Parameterized.class)
+public class DiskFailurePolicyTest
+{
+    DiskFailurePolicy originalDiskFailurePolicy;
+    JVMStabilityInspector.Killer originalKiller;
+    KillerForTests killerForTests;
+    DiskFailurePolicy testPolicy;
+    boolean isStartUpInProgress;
+    Throwable t;
+    boolean expectGossipRunning;
+    boolean expectJVMKilled;
+    boolean expectJVMKilledQuiet;
+
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+        FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
+    }
+
+    public DiskFailurePolicyTest(DiskFailurePolicy testPolicy, boolean 
isStartUpInProgress, Throwable t,
+                                 boolean expectGossipRunning, boolean 
jvmKilled, boolean jvmKilledQuiet)
+    {
+        this.testPolicy = testPolicy;
+        this.isStartUpInProgress = isStartUpInProgress;
+        this.t = t;
+        this.expectGossipRunning = expectGossipRunning;
+        this.expectJVMKilled = jvmKilled;
+        this.expectJVMKilledQuiet = jvmKilledQuiet;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][]{
+                             { Config.DiskFailurePolicy.die, true, new 
FSReadError(new IOException(), "blah"), false, true, true},
+                             { DiskFailurePolicy.ignore, true, new 
FSReadError(new IOException(), "blah"), true, false, false},
+                             { DiskFailurePolicy.stop, true, new 
FSReadError(new IOException(), "blah"), false, true, true},
+                             { DiskFailurePolicy.stop_paranoid, true, new 
FSReadError(new IOException(), "blah"), false, true, true},
+                             { Config.DiskFailurePolicy.die, true, new 
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+                             { DiskFailurePolicy.ignore, true, new 
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+                             { DiskFailurePolicy.stop, true, new 
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+                             { DiskFailurePolicy.stop_paranoid, true, new 
CorruptSSTableException(new IOException(), "blah"), false, true, true},
+                             { Config.DiskFailurePolicy.die, false, new 
FSReadError(new IOException(), "blah"), false, true, false},
+                             { DiskFailurePolicy.ignore, false, new 
FSReadError(new IOException(), "blah"), true, false, false},
+                             { DiskFailurePolicy.stop, false, new 
FSReadError(new IOException(), "blah"), false, false, false},
+                             { DiskFailurePolicy.stop_paranoid, false, new 
FSReadError(new IOException(), "blah"), false, false, false},
+                             { Config.DiskFailurePolicy.die, false, new 
CorruptSSTableException(new IOException(), "blah"), false, true, false},
+                             { DiskFailurePolicy.ignore, false, new 
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+                             { DiskFailurePolicy.stop, false, new 
CorruptSSTableException(new IOException(), "blah"), true, false, false},
+                             { DiskFailurePolicy.stop_paranoid, false, new 
CorruptSSTableException(new IOException(), "blah"), false, false, false}
+                             }
+        );
+    }
+
+    @Before
+    public void setup()
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        if (!isStartUpInProgress)
+            daemon.completeSetup(); //mark startup completed
+        StorageService.instance.registerDaemon(daemon);
+        killerForTests = new KillerForTests();
+        originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        originalDiskFailurePolicy = DatabaseDescriptor.getDiskFailurePolicy();
+        StorageService.instance.startGossiping();
+        Assert.assertTrue(Gossiper.instance.isEnabled());
+    }
+
+    @After
+    public void teardown()
+    {
+        JVMStabilityInspector.replaceKiller(originalKiller);
+        DatabaseDescriptor.setDiskFailurePolicy(originalDiskFailurePolicy);
+    }
+
+    @Test
+    public void testPolicies()
+    {
+        DatabaseDescriptor.setDiskFailurePolicy(testPolicy);
+        JVMStabilityInspector.inspectThrowable(t);
+        Assert.assertEquals(expectJVMKilled, killerForTests.wasKilled());
+        Assert.assertEquals(expectJVMKilledQuiet, 
killerForTests.wasKilledQuietly());
+        if (!expectJVMKilled) {
+            // only verify gossip if JVM is not killed
+            Assert.assertEquals(expectGossipRunning, 
Gossiper.instance.isEnabled());
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java 
b/test/unit/org/apache/cassandra/utils/KillerForTests.java
index abc7952322..ad3a27436e 100644
--- a/test/unit/org/apache/cassandra/utils/KillerForTests.java
+++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java
@@ -29,6 +29,11 @@ public class KillerForTests extends 
JVMStabilityInspector.Killer
     @Override
     protected void killCurrentJVM(Throwable t, boolean quiet)
     {
+        if (killed)
+        {
+            // Can only be killed once
+            return;
+        }
         this.killed = true;
         this.quiet = quiet;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to