This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch CASSANDRA-14793
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ea3ee373f670b4eddcde0f94d4f5f6221166761b
Author: Benjamin Lerer <b.le...@gmail.com>
AuthorDate: Thu Mar 19 12:57:28 2020 +0100

    Allow to use a different directory for storing system tables.
---
 .circleci/config.yml                               |  97 +++++++++++
 .circleci/config.yml.HIGHRES                       |  98 +++++++++++
 .circleci/config.yml.LOWRES                        |  97 +++++++++++
 .circleci/config.yml.MIDRES                        |  97 +++++++++++
 NEWS.txt                                           |   9 ++
 build.xml                                          |  38 +++++
 conf/cassandra.yaml                                |   6 +
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  97 +++++++++--
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 101 ++++++++++--
 src/java/org/apache/cassandra/db/Directories.java  | 180 ++++++++++++++++-----
 .../apache/cassandra/db/DiskBoundaryManager.java   |   1 -
 .../org/apache/cassandra/db/SystemKeyspace.java    |   5 +
 .../apache/cassandra/io/FSDiskFullWriteError.java  |  12 +-
 ...or.java => FSNoDiskAvailableForWriteError.java} |  16 +-
 .../org/apache/cassandra/io/util/FileUtils.java    |  67 ++++++++
 .../apache/cassandra/service/CassandraDaemon.java  |  94 ++++++++++-
 .../cassandra/service/DefaultFSErrorHandler.java   |  17 +-
 .../apache/cassandra/service/StartupChecks.java    |   1 +
 .../apache/cassandra/service/StorageService.java   |  25 ++-
 .../cassandra/service/StorageServiceMBean.java     |  14 ++
 test/conf/system_keyspaces_directory.yaml          |   1 +
 .../cassandra/OffsetAwareConfigurationLoader.java  |   3 +
 .../org/apache/cassandra/db/DirectoriesTest.java   |  42 ++---
 .../apache/cassandra/io/util/FileUtilsTest.java    |  69 ++++++++
 .../apache/cassandra/tools/ClearSnapshotTest.java  |   2 +-
 26 files changed, 1091 insertions(+), 104 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index 8ba8949..6c177a4 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -2151,6 +2151,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 4
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently 
executed on every circle container
+          # so the goal here is to get the circleci script to return the tests 
*this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob 
"$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of 
containers we have
+          set -eo pipefail && circleci tests split --split-by=timings 
--timings-type=filename --index=${CIRCLE_NODE_INDEX} 
--total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | 
sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' 
'{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' 
'{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory 
-Dtest.timeout="$test_timeout" 
-Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  
-Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
@@ -2254,6 +2345,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.HIGHRES b/.circleci/config.yml.HIGHRES
index 45c8820..544ff95 100644
--- a/.circleci/config.yml.HIGHRES
+++ b/.circleci/config.yml.HIGHRES
@@ -2151,6 +2151,98 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
+    resource_class: xlarge
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 100
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently 
executed on every circle container
+          # so the goal here is to get the circleci script to return the tests 
*this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob 
"$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of 
containers we have
+          set -eo pipefail && circleci tests split --split-by=timings 
--timings-type=filename --index=${CIRCLE_NODE_INDEX} 
--total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | 
sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' 
'{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' 
'{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory 
-Dtest.timeout="$test_timeout" 
-Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  
-Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+
   j8_dtest_jars_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
@@ -2254,6 +2346,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.LOWRES b/.circleci/config.yml.LOWRES
index 8ba8949..6c177a4 100644
--- a/.circleci/config.yml.LOWRES
+++ b/.circleci/config.yml.LOWRES
@@ -2151,6 +2151,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 4
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently 
executed on every circle container
+          # so the goal here is to get the circleci script to return the tests 
*this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob 
"$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of 
containers we have
+          set -eo pipefail && circleci tests split --split-by=timings 
--timings-type=filename --index=${CIRCLE_NODE_INDEX} 
--total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | 
sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' 
'{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' 
'{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory 
-Dtest.timeout="$test_timeout" 
-Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  
-Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
@@ -2254,6 +2345,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/.circleci/config.yml.MIDRES b/.circleci/config.yml.MIDRES
index 979ee15..4dd6ab8 100644
--- a/.circleci/config.yml.MIDRES
+++ b/.circleci/config.yml.MIDRES
@@ -2151,6 +2151,97 @@ jobs:
     - CCM_HEAP_NEWSIZE: 256M
     - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
     - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+  utests_system_keyspace_directory:
+    docker:
+    - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
+    resource_class: medium
+    working_directory: ~/
+    shell: /bin/bash -eo pipefail -l
+    parallelism: 25
+    steps:
+    - attach_workspace:
+        at: /home/cassandra
+    - run:
+        name: Determine unit Tests to Run
+        command: |
+          # reminder: this code (along with all the steps) is independently 
executed on every circle container
+          # so the goal here is to get the circleci script to return the tests 
*this* container will run
+          # which we do via the `circleci` cli tool.
+
+          rm -fr ~/cassandra-dtest/upgrade_tests
+          echo "***java tests***"
+
+          # get all of our unit test filenames
+          set -eo pipefail && circleci tests glob 
"$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+
+          # split up the unit tests into groups based on the number of 
containers we have
+          set -eo pipefail && circleci tests split --split-by=timings 
--timings-type=filename --index=${CIRCLE_NODE_INDEX} 
--total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
+          set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | 
sed "s;^/home/cassandra/cassandra/test/unit/;;g" | grep "Test\.java$"  > 
/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+          echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
+          cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+        no_output_timeout: 15m
+    - run:
+        name: Log Environment Information
+        command: |
+          echo '*** id ***'
+          id
+          echo '*** cat /proc/cpuinfo ***'
+          cat /proc/cpuinfo
+          echo '*** free -m ***'
+          free -m
+          echo '*** df -m ***'
+          df -m
+          echo '*** ifconfig -a ***'
+          ifconfig -a
+          echo '*** uname -a ***'
+          uname -a
+          echo '*** mount ***'
+          mount
+          echo '*** env ***'
+          env
+          echo '*** java ***'
+          which java
+          java -version
+    - run:
+        name: Run Unit Tests (testclasslist-system-keyspace-directory)
+        command: |
+          set -x
+          export PATH=$JAVA_HOME/bin:$PATH
+          time mv ~/cassandra /tmp
+          cd /tmp/cassandra
+          if [ -d ~/dtest_jars ]; then
+            cp ~/dtest_jars/dtest* /tmp/cassandra/build/
+          fi
+          test_timeout=$(grep 'name="test.unit.timeout"' build.xml | awk -F'"' 
'{print $4}' || true)
+          if [ -z "$test_timeout" ]; then
+            test_timeout=$(grep 'name="test.timeout"' build.xml | awk -F'"' 
'{print $4}')
+          fi
+          ant testclasslist-system-keyspace-directory 
-Dtest.timeout="$test_timeout" 
-Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt  
-Dtest.classlistprefix=unit
+        no_output_timeout: 15m
+    - store_test_results:
+        path: /tmp/cassandra/build/test/output/
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/output
+        destination: junitxml
+    - store_artifacts:
+        path: /tmp/cassandra/build/test/logs
+        destination: logs
+    environment:
+    - ANT_HOME: /usr/share/ant
+    - LANG: en_US.UTF-8
+    - KEEP_TEST_DIR: true
+    - DEFAULT_DIR: /home/cassandra/cassandra-dtest
+    - PYTHONIOENCODING: utf-8
+    - PYTHONUNBUFFERED: true
+    - CASS_DRIVER_NO_EXTENSIONS: true
+    - CASS_DRIVER_NO_CYTHON: true
+    - CASSANDRA_SKIP_SYNC: true
+    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
+    - DTEST_BRANCH: master
+    - CCM_MAX_HEAP_SIZE: 1024M
+    - CCM_HEAP_NEWSIZE: 256M
+    - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
+    - JDK_HOME: /usr/lib/jvm/java-8-openjdk-amd64
   j8_dtest_jars_build:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
@@ -2254,6 +2345,12 @@ workflows:
         requires:
         - start_utests_compression
         - j8_build
+    - start_utests_system_keyspace_directory:
+        type: approval
+    - utests_system_keyspace_directory:
+        requires:
+        - start_utests_system_keyspace_directory
+        - j8_build
     - start_utests_stress:
         type: approval
     - utests_stress:
diff --git a/NEWS.txt b/NEWS.txt
index 4b5264f..a75a9f5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,15 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - The data of the system keyspaces using a local strategy (at the 
exception of the system.paxos table)
+      is now stored by default in the first data directory. This approach will 
allow the server
+      to tolerate the failure of the other disks. To ensure that a disk 
failure will not bring
+      a node down, it is possible to use the system_data_file_directory yaml 
property to store
+      the system keyspaces data on a disk that provide redundancy.
+      On node startup the system keyspace data will be automatically migrated 
if needed to the
+      correct location. If a specific disk has been used for some time and the 
system keyspaces
+      data need to be re-imported in the first data file location, the 
cassandra.importSystemDataFilesFrom
+      system property can be used to specify from where the data should be 
imported.
     - Nodes will now bootstrap all intra-cluster connections at startup by 
default and wait
       10 seconds for the all but one node in the local data center to be 
connected and marked
       UP in gossip. This prevents nodes from coordinating requests and failing 
because they
diff --git a/build.xml b/build.xml
index b3ba554..7f2950f 100644
--- a/build.xml
+++ b/build.xml
@@ -1493,6 +1493,7 @@
       <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/commitlog${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/cdc_raw${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/data${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/system_data${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/saved_caches${fileSep}@{poffset}"/>
       <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/hints${fileSep}@{poffset}"/>
     </sequential>
@@ -1573,6 +1574,28 @@
     </sequential>
   </macrodef>
 
+  <macrodef name="testlist-system-keyspace-directory">
+    <attribute name="test.file.list" />
+    <attribute name="testlist.offset" />
+    <sequential>
+      <property name="system_keyspaces_directory_yaml" 
value="${build.test.dir}/cassandra.system.yaml"/>
+      <concat destfile="${system_keyspaces_directory_yaml}">
+        <fileset file="${test.conf}/cassandra.yaml"/>
+        <fileset file="${test.conf}/system_keyspaces_directory.yaml"/>
+      </concat>
+      <testmacrohelper inputdir="${test.unit.src}" 
filelist="@{test.file.list}" poffset="@{testlist.offset}"
+                       exclude="**/*.java" timeout="${test.timeout}" 
testtag="system_keyspace_directory">
+        <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+        <jvmarg 
value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
+        <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
+        <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+        <jvmarg 
value="-Dcassandra.config=file:///${system_keyspaces_directory_yaml}"/>
+        <jvmarg value="-Dcassandra.skip_sync=true" />
+        <jvmarg 
value="-Dcassandra.config.loader=org.apache.cassandra.OffsetAwareConfigurationLoader"/>
+      </testmacrohelper>
+    </sequential>
+  </macrodef>
+
   <!--
     Run named ant task with jacoco, such as "ant jacoco-run -Dtaskname=test"
     the target run must enable the jacoco agent if usejacoco is 'yes' -->
@@ -1637,6 +1660,14 @@
     <testparallel testdelegate="testlist-cdc" />
   </target>
 
+  <target name="test-system-keyspace-directory" depends="build-test" 
description="Execute unit tests with a system keyspaces directory configured">
+    <path id="all-test-classes-path">
+      <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+    </path>
+    <property name="all-test-classes" refid="all-test-classes-path"/>
+    <testparallel testdelegate="testlist-system-keyspace-directory" />
+  </target>
+
   <target name="msg-ser-gen-test" depends="build-test" description="Generates 
message serializations">
     <testmacro inputdir="${test.unit.src}"
         timeout="${test.timeout}" filter="**/SerializationsTest.java">
@@ -2018,6 +2049,13 @@
       <property name="all-test-classes" refid="all-test-classes-path"/>
       <testparallel testdelegate="testlist-cdc"/>
   </target>
+  <target name="testclasslist-system-keyspace-directory" depends="build-test" 
description="Parallel-run tests given in file -Dtest.classlistfile 
(one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
+      <path id="all-test-classes-path">
+          <fileset dir="${test.dir}/${test.classlistprefix}" 
includesfile="${test.classlistfile}"/>
+      </path>
+      <property name="all-test-classes" refid="all-test-classes-path"/>
+      <testparallel testdelegate="testlist-system-keyspace-directory"/>
+  </target>
 
   <!-- In-JVM dtest targets -->
   <target name="list-jvm-dtests" depends="build-test">
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index fcd2ffa..ce8871a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -203,6 +203,12 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # data_file_directories:
 #     - /var/lib/cassandra/data
 
+# Directory were Cassandra should store the data of the local system keyspaces.
+# By default Cassandra will store the data of the local system keyspaces in 
the first of the data directories.
+# This approach ensure that if one of the other disk is lost Cassandra can 
continue to operate. For extra security
+# this setting allow to store those data on a different directory that provide 
redundancy.
+# system_data_file_directory: 
+
 # commit log.  when running on magnetic HDD, this should be a
 # separate spindle than the data directories.
 # If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 6abdfba..bab887c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -230,6 +230,12 @@ public class Config
 
     public String[] data_file_directories = new String[0];
 
+    /**
+     * The directory to use for storing the system keyspaces data.
+     * If unspecified the data will be stored in the first of the 
data_file_directories.
+     */
+    public String system_data_file_directory;
+
     public String saved_caches_directory;
 
     // Commit Log
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3b5fdfb..5f7cb27 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -71,6 +71,7 @@ import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CacheService.CacheType;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -611,6 +612,8 @@ public class DatabaseDescriptor
         {
             if (datadir == null)
                 throw new ConfigurationException("data_file_directories must 
not contain empty entry", false);
+            if (datadir.equals(conf.system_data_file_directory))
+                throw new ConfigurationException("system_data_file_directory 
must not be the same as any data_file_directories", false);
             if (datadir.equals(conf.commitlog_directory))
                 throw new ConfigurationException("commitlog_directory must not 
be the same as any data_file_directories", false);
             if (datadir.equals(conf.hints_directory))
@@ -618,21 +621,28 @@ public class DatabaseDescriptor
             if (datadir.equals(conf.saved_caches_directory))
                 throw new ConfigurationException("saved_caches_directory must 
not be the same as any data_file_directories", false);
 
-            try
-            {
-                dataFreeBytes = saturatedSum(dataFreeBytes, 
guessFileStore(datadir).getUnallocatedSpace());
-            }
-            catch (IOException e)
-            {
-                logger.debug("Error checking disk space", e);
-                throw new ConfigurationException(String.format("Unable to 
check disk space available to %s. Perhaps the Cassandra user does not have the 
necessary permissions",
-                                                               datadir), e);
-            }
+            dataFreeBytes = saturatedSum(dataFreeBytes, 
getUnallocatedSpace(datadir));
         }
         if (dataFreeBytes < 64 * ONE_GB) // 64 GB
             logger.warn("Only {} free across all data volumes. Consider adding 
more capacity to your cluster or removing obsolete snapshots",
                         FBUtilities.prettyPrintMemory(dataFreeBytes));
 
+        if (conf.system_data_file_directory != null)
+        {
+            if 
(conf.system_data_file_directory.equals(conf.commitlog_directory))
+                throw new ConfigurationException("system_data_file_directory 
must not be the same as the commitlog_directory", false);
+            if 
(conf.system_data_file_directory.equals(conf.saved_caches_directory))
+                throw new ConfigurationException("system_data_file_directory 
must not be the same as the saved_caches_directory", false);
+            if (conf.system_data_file_directory.equals(conf.hints_directory))
+                throw new ConfigurationException("system_data_file_directory 
must not be the same as the hints_directory", false);
+
+            long freeBytes = 
getUnallocatedSpace(conf.system_data_file_directory);
+
+            if (freeBytes < ONE_GB)
+                logger.warn("Only {} free in the system data volume. Consider 
adding more capacity or removing obsolete snapshots",
+                            FBUtilities.prettyPrintMemory(freeBytes));
+        }
+
         if (conf.commitlog_directory.equals(conf.saved_caches_directory))
             throw new ConfigurationException("saved_caches_directory must not 
be the same as the commitlog_directory", false);
         if (conf.commitlog_directory.equals(conf.hints_directory))
@@ -1157,6 +1167,20 @@ public class DatabaseDescriptor
         }
     }
 
+    private static long getUnallocatedSpace(String directory)
+    {
+        try
+        {
+            return guessFileStore(directory).getUnallocatedSpace();
+        }
+        catch (IOException e)
+        {
+            logger.debug("Error checking disk space", e);
+            throw new ConfigurationException(String.format("Unable to check 
disk space available to %s. Perhaps the Cassandra user does not have the 
necessary permissions",
+                                                           directory), e);
+        }
+    }
+
     public static IEndpointSnitch createEndpointSnitch(boolean dynamic, String 
snitchClassName) throws ConfigurationException
     {
         if (!snitchClassName.contains("."))
@@ -1324,6 +1348,9 @@ public class DatabaseDescriptor
             for (String dataFileDirectory : conf.data_file_directories)
                 FileUtils.createDirectory(dataFileDirectory);
 
+            if (conf.system_data_file_directory != null)
+                FileUtils.createDirectory(conf.system_data_file_directory);
+
             if (conf.commitlog_directory == null)
                 throw new ConfigurationException("commitlog_directory must be 
specified", false);
             FileUtils.createDirectory(conf.commitlog_directory);
@@ -1706,7 +1733,7 @@ public class DatabaseDescriptor
 
     public static int getFlushWriters()
     {
-            return conf.memtable_flush_writers;
+        return conf.memtable_flush_writers;
     }
 
     public static int getConcurrentCompactors()
@@ -1787,11 +1814,57 @@ public class DatabaseDescriptor
         conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
     }
 
-    public static String[] getAllDataFileLocations()
+    /**
+     * Checks if the local system data must be stored in a specific location 
which support redundancy.
+     *
+     * @return {@code true} if the local system keyspaces data must be stored 
in a different location,
+     * {@code false} otherwise.
+     */
+    public static boolean useSpecificLocationForSystemData()
+    {
+        return conf.system_data_file_directory != null;
+    }
+
+    /**
+     * Returns the locations where should be stored the local system keyspaces 
data.
+     *
+     * <p>If the {@code system_data_file_directory} was unspecified, the local 
system keyspaces data should be stored
+     * in the first data directory. This approach guaranty that the server can 
tolerate the lost of n - 1 disks.</p>
+     *
+     * @return the locations where should be stored the local system keyspaces 
data
+     */
+    public static String[] getSystemKeyspacesDataFileLocations()
+    {
+        if (conf.system_data_file_directory != null)
+            return new String[] {conf.system_data_file_directory};
+
+        return conf.data_file_directories.length == 0  ? 
conf.data_file_directories
+                                                       : new String[] 
{conf.data_file_directories[0]};
+    }
+
+    /**
+     * Returns the locations where should be stored the non local system 
keyspaces data.
+     *
+     * @return the locations where should be stored the non local system 
keyspaces data
+     */
+    public static String[] getNonSystemKeyspacesDataFileLocations()
     {
         return conf.data_file_directories;
     }
 
+    /**
+     * Returns the list of all the directories where the data files can be 
stored (for system and non-system keyspaces).
+     *
+     * @return the list of all the directories where the data files can be 
stored.
+     */
+    public static String[] getAllDataFileLocations()
+    {
+        if (conf.system_data_file_directory == null)
+            return conf.data_file_directories;
+
+        return ArrayUtils.addFirst(conf.data_file_directories, 
conf.system_data_file_directory);
+    }
+
     public static String getCommitLogLocation()
     {
         return conf.commitlog_directory;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1fc3eba..0afe704 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -113,20 +113,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                                                
           new NamedThreadFactory("MemtableFlushWriter"),
                                                                                
           "internal");
 
-    private static final ExecutorService [] perDiskflushExecutors = new 
ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
-
-    static
-    {
-        for (int i = 0; i < 
DatabaseDescriptor.getAllDataFileLocations().length; i++)
-        {
-            perDiskflushExecutors[i] = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                                                        
Stage.KEEP_ALIVE_SECONDS,
-                                                                        
TimeUnit.SECONDS,
-                                                                        new 
LinkedBlockingQueue<Runnable>(),
-                                                                        new 
NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
-                                                                        
"internal");
-        }
-    }
+    private static final PerDiskFlushExecutors perDiskflushExecutors = new 
PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
+                                                                               
                  DatabaseDescriptor.getNonSystemKeyspacesDataFileLocations(),
+                                                                               
                  DatabaseDescriptor.useSpecificLocationForSystemData());
 
     // post-flush executor is single threaded to provide guarantee that any 
flush Future on a CF will never return until prior flushes have completed
     private static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor(1,
@@ -230,9 +219,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException
     {
-        List<ExecutorService> executors = new 
ArrayList<>(perDiskflushExecutors.length + 3);
+        List<ExecutorService> executors = new ArrayList<>();
         Collections.addAll(executors, reclaimExecutor, postFlushExecutor, 
flushExecutor);
-        Collections.addAll(executors, perDiskflushExecutors);
+        perDiskflushExecutors.appendAllExecutors(executors);
         ExecutorUtils.shutdownAndWait(timeout, unit, executors);
     }
 
@@ -1072,9 +1061,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 {
                     // flush the memtable
                     flushRunnables = memtable.flushRunnables(txn);
+                    ExecutorService[] executors = 
perDiskflushExecutors.getExecutorsFor(keyspace.getName(), name);
 
                     for (int i = 0; i < flushRunnables.size(); i++)
-                        
futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+                        
futures.add(executors[i].submit(flushRunnables.get(i)));
 
                     /**
                      * we can flush 2is as soon as the barrier completes, as 
they will be consistent with (or ahead of) the
@@ -2740,4 +2730,81 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         return neverPurgeTombstones;
     }
+
+    /**
+     * The thread pools used to flush memtables.
+     *
+     * <p>Each disk has its own set of thread pools to perform memtable 
flushes.</p>
+     * <p>Based on the configuration. System keyspaces can have their own disk
+     * to allow for special redundency mechanism. If it is the case the 
executor services returned for
+     * system keyspace will be differents from the ones for the other 
keyspaces.</p>
+     */
+    private static final class PerDiskFlushExecutors
+    {
+        /**
+         * The flush executors for non system keyspaces.
+         */
+        private final ExecutorService[] nonSystemflushExecutors;
+
+        /**
+         * The flush executors for system keyspaces.
+         */
+        private final ExecutorService[] systemDiskFlushExecutors;
+
+        public PerDiskFlushExecutors(int flushWriters,
+                                     String[] locationsForNonSystemKeyspaces,
+                                     boolean 
useSpecificLocationForSystemKeyspaces)
+        {
+            ExecutorService[] flushExecutors = 
createPerDiskFlushWriters(locationsForNonSystemKeyspaces.length, flushWriters);
+            nonSystemflushExecutors = flushExecutors;
+            systemDiskFlushExecutors = useSpecificLocationForSystemKeyspaces ? 
new ExecutorService[] {newThreadPool("SystemKeyspacesDiskMemtableFlushWriter", 
flushWriters)}
+                                                                             : 
new ExecutorService[] {flushExecutors[0]};
+        }
+
+        private ExecutorService[] createPerDiskFlushWriters(int 
numberOfExecutors, int flushWriters)
+        {
+            ExecutorService[] flushExecutors = new 
ExecutorService[numberOfExecutors];
+
+            for (int i = 0; i < numberOfExecutors; i++)
+            {
+                flushExecutors[i] = 
newThreadPool("PerDiskMemtableFlushWriter_" + i, flushWriters);
+            }
+            return flushExecutors;
+        }
+
+        private static JMXEnabledThreadPoolExecutor newThreadPool(String 
poolName, int size)
+        {
+            return new JMXEnabledThreadPoolExecutor(size,
+                                                    Stage.KEEP_ALIVE_SECONDS,
+                                                    TimeUnit.SECONDS,
+                                                    new 
LinkedBlockingQueue<Runnable>(),
+                                                    new 
NamedThreadFactory(poolName),
+                                                    "internal");
+        }
+
+        /**
+         * Returns the flush executors for the specified keyspace.
+         *
+         * @param keyspaceName the keyspace name
+         * @param tableName the table name
+         * @return the flush executors that should be used for flushing the 
memtables of the specified keyspace.
+         */
+        public ExecutorService[] getExecutorsFor(String keyspaceName, String 
tableName)
+        {
+            return 
Directories.isStoredInSystemKeyspacesDataLocation(keyspaceName, tableName) ? 
systemDiskFlushExecutors
+                                                                  : 
nonSystemflushExecutors;
+        }
+
+        /**
+         * Appends all the {@code ExecutorService} used for flushes to the 
colection.
+         *
+         * @param collection the colection to append to.
+         */
+        public void appendAllExecutors(Collection<ExecutorService> collection)
+        {
+            Collections.addAll(collection, nonSystemflushExecutors);
+            if (nonSystemflushExecutors != systemDiskFlushExecutors)
+                Collections.addAll(collection, systemDiskFlushExecutors);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 889f3a5..d901dbb 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -17,22 +17,17 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.file.FileStore;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.*;
+import java.nio.file.*;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiPredicate;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +36,11 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSDiskFullWriteError;
 import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSNoDiskAvailableForWriteError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
 import org.apache.cassandra.utils.FBUtilities;
@@ -92,15 +89,11 @@ public class Directories
     public static final String TMP_SUBDIR = "tmp";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
-    public static final DataDirectory[] dataDirectories;
-
-    static
-    {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        dataDirectories = new DataDirectory[locations.length];
-        for (int i = 0; i < locations.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(locations[i]));
-    }
+    /**
+     * The directories used to store keyspaces data.
+     */
+    public static final DataDirectories dataDirectories = new 
DataDirectories(DatabaseDescriptor.getNonSystemKeyspacesDataFileLocations(),
+                                                                              
DatabaseDescriptor.getSystemKeyspacesDataFileLocations());
 
     /**
      * Checks whether Cassandra has RWX permissions to the specified 
directory.  Logs an error with
@@ -183,7 +176,7 @@ public class Directories
 
     public Directories(final TableMetadata metadata)
     {
-        this(metadata, dataDirectories);
+        this(metadata, dataDirectories.getDataDirectoriesFor(metadata));
     }
 
     public Directories(final TableMetadata metadata, Collection<DataDirectory> 
paths)
@@ -444,10 +437,12 @@ public class Directories
         }
 
         if (candidates.isEmpty())
+        {
             if (tooBig)
-                throw new FSDiskFullWriteError(new IOException("Insufficient 
disk space to write " + writeSize + " bytes"), "");
-            else
-                throw new FSWriteError(new IOException("All configured data 
directories have been disallowed as unwritable for erroring out"), "");
+                throw new FSDiskFullWriteError(metadata.keyspace, writeSize);
+
+            throw new FSNoDiskAvailableForWriteError(metadata.keyspace);
+        }
 
         // shortcut for single data directory systems
         if (candidates.size() == 1)
@@ -512,6 +507,9 @@ public class Directories
                 allowedDirs.add(dir);
         }
 
+        if (allowedDirs.isEmpty())
+            throw new FSNoDiskAvailableForWriteError(metadata.keyspace);
+
         Collections.sort(allowedDirs, new Comparator<DataDirectory>()
         {
             @Override
@@ -591,10 +589,30 @@ public class Directories
         }
     }
 
+    /**
+     * Checks if the specified table should be stored with locale system data.
+     *
+     * <p> To minimize the risk of failures, SSTables for local system 
keyspaces must be stored in a single data
+     * directory. The only exception to this is the system paxos table as it 
can be a high traffic table.</p>
+     *
+     * @param keyspace the keyspace name
+     * @param table the table name
+     * @return {@code true} if the specified table should be stored with 
locale system data, {@code false} otherwise.
+     */
+    public static boolean isStoredInSystemKeyspacesDataLocation(String 
keyspace, String table)
+    {
+        return SchemaConstants.isLocalSystemKeyspace(keyspace) && 
!SystemKeyspace.isPaxosTable(keyspace, table);
+    }
+
     public static class DataDirectory
     {
         public final File location;
 
+        public DataDirectory(String location)
+        {
+            this(new File(location));
+        }
+
         public DataDirectory(File location)
         {
             this.location = location;
@@ -631,6 +649,106 @@ public class Directories
         }
     }
 
+    /**
+     * Data directories used to store keyspace data.
+     */
+    public static final class DataDirectories implements 
Iterable<DataDirectory>
+    {
+        /**
+         * The directories for storing the system keyspaces.
+         */
+        private final DataDirectory[] systemKeyspaceDataDirectories;
+
+        /**
+         * The directories where should be stored the data of the non system 
keyspaces.
+         */
+        private final DataDirectory[] nonSystemKeyspacesDirectories;
+
+
+        public DataDirectories(String[] locationsForNonSystemKeyspaces, 
String[] locationsForSystemKeyspace)
+        {
+            nonSystemKeyspacesDirectories = 
toDataDirectories(locationsForNonSystemKeyspaces);
+            systemKeyspaceDataDirectories = 
toDataDirectories(locationsForSystemKeyspace);
+        }
+
+        private static DataDirectory[] toDataDirectories(String... locations)
+        {
+            DataDirectory[] directories = new DataDirectory[locations.length];
+            for (int i = 0; i < locations.length; ++i)
+                directories[i] = new DataDirectory(new File(locations[i]));
+            return directories;
+        }
+
+        /**
+         * Returns the data directories used to store the data of the 
specified keyspace.
+         *
+         * @param keyspace the keyspace name
+         * @return the data directories used to store the data of the 
specified keyspace
+         */
+        public DataDirectory[] getDataDirectoriesUsedBy(String keyspace)
+        {
+            if (SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace)
+                    && !ArrayUtils.isEmpty(systemKeyspaceDataDirectories)
+                    && !ArrayUtils.contains(nonSystemKeyspacesDirectories, 
systemKeyspaceDataDirectories[0]))
+            {
+                DataDirectory[] directories = 
Arrays.copyOf(nonSystemKeyspacesDirectories, 
nonSystemKeyspacesDirectories.length + 1);
+                directories[directories.length - 1] = 
systemKeyspaceDataDirectories[0];
+                return directories;
+            }
+            return SchemaConstants.isLocalSystemKeyspace(keyspace) ? 
systemKeyspaceDataDirectories
+                                                                   : 
nonSystemKeyspacesDirectories;
+        }
+
+        /**
+         * Returns the data directories for the specified keyspace.
+         *
+         * @param table the table metadata
+         * @return the data directories for the specified keyspace
+         */
+        public DataDirectory[] getDataDirectoriesFor(TableMetadata table)
+        {
+            return isStoredInSystemKeyspacesDataLocation(table.keyspace, 
table.name) ? systemKeyspaceDataDirectories
+                                                                               
      : nonSystemKeyspacesDirectories;
+        }
+
+        @Override
+        public Iterator<DataDirectory> iterator()
+        {
+            Iterator<DataDirectory> iter = 
Iterators.forArray(nonSystemKeyspacesDirectories);
+
+            if (nonSystemKeyspacesDirectories == systemKeyspaceDataDirectories)
+                return iter;
+
+            return Iterators.concat(iter, 
Iterators.forArray(systemKeyspaceDataDirectories));
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            DataDirectories that = (DataDirectories) o;
+
+            return Arrays.equals(this.systemKeyspaceDataDirectories, 
that.systemKeyspaceDataDirectories)
+                && Arrays.equals(this.nonSystemKeyspacesDirectories, 
that.nonSystemKeyspacesDirectories);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(systemKeyspaceDataDirectories, 
nonSystemKeyspacesDirectories);
+        }
+
+        public String toString()
+        {
+            return "DataDirectories {" +
+                   "systemKeyspaceDataDirectories=" + 
systemKeyspaceDataDirectories +
+                   ", nonSystemKeyspacesDirectories=" + 
nonSystemKeyspacesDirectories +
+                   '}';
+        }
+    }
+
     static final class DataDirectoryCandidate implements 
Comparable<DataDirectoryCandidate>
     {
         final DataDirectory dataDirectory;
@@ -1002,8 +1120,7 @@ public class Directories
 
     public static List<File> getKSChildDirectories(String ksName)
     {
-        return getKSChildDirectories(ksName, dataDirectories);
-
+        return getKSChildDirectories(ksName, 
dataDirectories.getDataDirectoriesUsedBy(ksName));
     }
 
     // Recursively finds all the sub directories in the KS directory.
@@ -1061,21 +1178,6 @@ public class Directories
         return StringUtils.join(s, File.separator);
     }
 
-    @VisibleForTesting
-    static void overrideDataDirectoriesForTest(String loc)
-    {
-        for (int i = 0; i < dataDirectories.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(loc));
-    }
-
-    @VisibleForTesting
-    static void resetDataDirectoriesAfterTest()
-    {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        for (int i = 0; i < locations.length; ++i)
-            dataDirectories[i] = new DataDirectory(new File(locations[i]));
-    }
-
     private class SSTableSizeSummer extends DirectorySizeCalculator
     {
         private final HashSet<File> toSkip;
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java 
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index bbb6dbb..cc617da 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 655c7a0..4cee06e 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1561,4 +1561,9 @@ public final class SystemKeyspace
                               row.getString("query_string")));
         return r;
     }
+
+    public static boolean isPaxosTable(String keyspace, String table)
+    {
+        return SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace) && 
PAXOS.equals(table);
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java 
b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
index ca5d8da..09ca527 100644
--- a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
+++ b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
@@ -18,16 +18,22 @@
 
 package org.apache.cassandra.io;
 
+import java.io.File;
+import java.io.IOException;
+
 public class FSDiskFullWriteError extends FSWriteError
 {
-    public FSDiskFullWriteError(Throwable cause, String path)
+    public FSDiskFullWriteError(String keyspace, long mutationSize)
     {
-        super(cause, path);
+        super(new IOException(String.format("Insufficient disk space to write 
%s bytes into the %s keyspace",
+                                            mutationSize,
+                                            keyspace)),
+              new File(""));
     }
 
     @Override
     public String toString()
     {
-        return "FSDiskFullWriteError in " + path;
+        return "FSDiskFullWriteError";
     }
 }
diff --git a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java 
b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
similarity index 62%
copy from src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
copy to src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
index ca5d8da..7e7141d 100644
--- a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
+++ b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java
@@ -18,16 +18,24 @@
 
 package org.apache.cassandra.io;
 
-public class FSDiskFullWriteError extends FSWriteError
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Thrown when all the disks used by a given keyspace have been marked as 
unwriteable.
+ */
+public class FSNoDiskAvailableForWriteError extends FSWriteError
 {
-    public FSDiskFullWriteError(Throwable cause, String path)
+    public FSNoDiskAvailableForWriteError(String keyspace)
     {
-        super(cause, path);
+        super(new IOException(String.format("The data directories for the %s 
keyspace have been marked as unwritable",
+                                            keyspace)),
+                              new File(""));
     }
 
     @Override
     public String toString()
     {
-        return "FSDiskFullWriteError in " + path;
+        return "FSNoDiskAvailableForWriteError";
     }
 }
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 67840c4..e9aa357 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -40,6 +40,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -920,4 +922,69 @@ public final class FileUtils
             return fileStore.getAttribute(attribute);
         }
     }
+
+
+    /**
+     * Moves the files from a directory to another directory.
+     * <p>Once a file has been copied to the target directory it will be 
deleted from the source directory.
+     * If a file already exist in the target directory a warning will be 
logged and the file will not
+     * be deleted.</p>
+     *
+     * @param source the directory containing the files to move
+     * @param target the directory where the files must be moved
+     */
+    public static void moveRecursively(Path source, Path target) throws 
IOException
+    {
+        logger.info("Moving {} to {}" , source, target);
+
+        if (Files.isDirectory(source))
+        {
+            Files.createDirectories(target);
+
+            try (Stream<Path> paths = Files.list(source))
+            {
+                Path[] children = paths.toArray(Path[]::new);
+
+                for (Path child : children)
+                    moveRecursively(child, 
target.resolve(source.relativize(child)));
+            }
+
+            deleteDirectoryIfEmpty(source);
+        }
+        else
+        {
+            if (Files.exists(target))
+            {
+                logger.warn("Cannot move the file {} to {} as the target file 
already exists." , source, target);
+            }
+            else
+            {
+                Files.copy(source, target, StandardCopyOption.COPY_ATTRIBUTES);
+                Files.delete(source);
+            }
+        }
+    }
+
+    /**
+     * Deletes the specified directory if it is empty
+     *
+     * @param path the path to the directory
+     */
+    public static void deleteDirectoryIfEmpty(Path path) throws IOException
+    {
+        try
+        {
+            logger.info("Deleting directory {}", path);
+            Files.delete(path);
+        }
+        catch (DirectoryNotEmptyException e)
+        {
+            try (Stream<Path> paths = Files.list(path))
+            {
+                String content = paths.map(p -> 
p.getFileName().toString()).collect(Collectors.joining(", "));
+
+                logger.warn("Cannot delete the directory {} as it is not 
empty. (Content: {})", path, content);
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4b92d69..2c43f47 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,8 +24,14 @@ import java.lang.management.MemoryPoolMXBean;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 import javax.management.remote.JMXConnectorServer;
@@ -38,6 +44,7 @@ import com.codahale.metrics.jvm.BufferPoolMetricSet;
 import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -212,6 +219,19 @@ public class CassandraDaemon
     {
         FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
 
+        // Since CASSANDRA-14793 the local system file data are not dispatched 
accross the data directories
+        // anymore to reduce the risks in case of disk failures. By 
consequence, the system need to ensure in case of
+        // upgrade that the old data files have been migrated to the new 
directories before we start deleting
+        // snapshot and upgrading system tables.
+        try
+        {
+            migrateSystemDataIfNeeded();
+        }
+        catch (IOException e)
+        {
+            exitOrFail(StartupException.ERR_WRONG_DISK_STATE, e.getMessage(), 
e);
+        }
+
         // Delete any failed snapshot deletions on Windows - see CASSANDRA-9658
         if (FBUtilities.isWindows)
             WindowsFailedSnapshotTracker.deleteOldSnapshots();
@@ -243,7 +263,7 @@ public class CassandraDaemon
         }
         catch (IOException e)
         {
-            exitOrFail(3, e.getMessage(), e.getCause());
+            exitOrFail(StartupException.ERR_WRONG_DISK_STATE, e.getMessage(), 
e.getCause());
         }
 
         // We need to persist this as soon as possible after startup checks.
@@ -456,6 +476,78 @@ public class CassandraDaemon
         completeSetup();
     }
 
+    /**
+     * Checks if the data of the local system keyspaces need to be migrated to 
a different location.
+     *
+     * @throws IOException
+     */
+    private void migrateSystemDataIfNeeded() throws IOException
+    {
+        String importSystemDataFrom = 
System.getProperty("cassandra.importSystemDataFilesFrom");
+
+        // If there is only one directory and no system keyspace directory has 
been specified we do not need to do
+        // anything. If it is not the case we want to try to migrate the data.
+        if (DatabaseDescriptor.useSpecificLocationForSystemData()
+                || 
DatabaseDescriptor.getNonSystemKeyspacesDataFileLocations().length > 1
+                || importSystemDataFrom != null)
+        {
+            // We can face several cases:
+            //  1) The system data are spread accross the data file locations 
and need to be moved to
+            //     the first data location (upgrade to 4.0)
+            //  2) The system data are spread accross the data file locations 
and need to be moved to
+            //     the system keyspace location configured by the user 
(upgrade to 4.0)
+            //  3) The system data are stored in the first data location and 
need to be moved to
+            //     the system keyspace location configured by the user 
(system_data_file_directory has been configured)
+            //  4) The system data have been stored in the system keyspace 
location configured by the user
+            //     and need to be moved to the first data location (the import 
of the data has been requested)
+            Path target = 
Paths.get(DatabaseDescriptor.getSystemKeyspacesDataFileLocations()[0]);
+
+            String[] nonSystemKeyspacesFileLocations = 
DatabaseDescriptor.getNonSystemKeyspacesDataFileLocations();
+            String[] sources = importSystemDataFrom != null
+                    ? new String[] {importSystemDataFrom}
+                    : DatabaseDescriptor.useSpecificLocationForSystemData() ? 
nonSystemKeyspacesFileLocations
+                                                                            : 
Arrays.copyOfRange(nonSystemKeyspacesFileLocations, 1, 
nonSystemKeyspacesFileLocations.length);
+
+
+            for (String source : sources)
+            {
+                Path dataFileLocation = Paths.get(source);
+
+                if (!Files.exists(dataFileLocation))
+                    continue;
+
+                try (Stream<Path> locationChildren = 
Files.list(dataFileLocation))
+                {
+                    Path[] keyspaceDirectories = locationChildren.filter(p -> 
SchemaConstants.isLocalSystemKeyspace(p.getFileName().toString()))
+                                                                 
.toArray(Path[]::new);
+
+                    for (Path keyspaceDirectory : keyspaceDirectories)
+                    {
+                        try (Stream<Path> keyspaceChildren = 
Files.list(keyspaceDirectory))
+                        {
+                            Path[] tableDirectories = 
keyspaceChildren.filter(Files::isDirectory)
+                                                                      
.filter(p -> !p.getFileName()
+                                                                               
      .toString()
+                                                                               
      .startsWith(SystemKeyspace.PAXOS))
+                                                                      
.toArray(Path[]::new);
+
+                            for (Path tableDirectory : tableDirectories)
+                            {
+                                FileUtils.moveRecursively(tableDirectory,
+                                                          
target.resolve(dataFileLocation.relativize(tableDirectory)));
+                            }
+
+                            if 
(!SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspaceDirectory.getFileName().toString()))
+                            {
+                                
FileUtils.deleteDirectoryIfEmpty(keyspaceDirectory);
+                            }
+                        }
+                    }
+                }
+             }
+        }
+    }
+
     public void setupVirtualKeyspaces()
     {
         
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java 
b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index d72b59a..592a5c2 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -26,10 +26,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DisallowedDirectories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.FSError;
-import org.apache.cassandra.io.FSErrorHandler;
-import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class DefaultFSErrorHandler implements FSErrorHandler
@@ -67,6 +66,18 @@ public class DefaultFSErrorHandler implements FSErrorHandler
                 StorageService.instance.stopTransports();
                 break;
             case best_effort:
+
+                // There are a few scenarios where we know that the node will 
not be able to operate properly
+                // For those scenarios we want to stop the transports and let 
the administrators handle the problem.
+                // Those scenarios are:
+                // * All the disks are full
+                // * All the disks for a given keyspace have been marked as 
unwriteable
+                if (e instanceof FSDiskFullWriteError || e instanceof 
FSNoDiskAvailableForWriteError)
+                {
+                    logger.error("Stopping transports: " + 
e.getCause().getMessage());
+                    StorageService.instance.stopTransports();
+                }
+
                 // for both read and write errors mark the path as unwritable.
                 DisallowedDirectories.maybeMarkUnwritable(e.path);
                 if (e instanceof FSReadError)
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index ecf9549..200f098 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -339,6 +339,7 @@ public class StartupChecks
                                                  
Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
                                                                
DatabaseDescriptor.getSavedCachesLocation(),
                                                                
DatabaseDescriptor.getHintsDirectory().getAbsolutePath()));
+
         for (String dataDir : dirs)
         {
             logger.debug("Checking directory {}", dataDir);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index c06b819..22d06b3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3291,12 +3291,29 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return stringify(Gossiper.instance.getUnreachableMembers(), true);
     }
 
+    @Override
     public String[] getAllDataFileLocations()
     {
-        String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        for (int i = 0; i < locations.length; i++)
-            locations[i] = FileUtils.getCanonicalPath(locations[i]);
-        return locations;
+        return getCanonicalPaths(DatabaseDescriptor.getAllDataFileLocations());
+    }
+
+    private String[] getCanonicalPaths(String[] paths)
+    {
+        for (int i = 0; i < paths.length; i++)
+            paths[i] = FileUtils.getCanonicalPath(paths[i]);
+        return paths;
+    }
+
+    @Override
+    public String[] getSystemKeyspacesDataFileLocations()
+    {
+        return 
getCanonicalPaths(DatabaseDescriptor.getSystemKeyspacesDataFileLocations());
+    }
+
+    @Override
+    public String[] getNonSystemKeyspacesDataFileLocations()
+    {
+        return 
getCanonicalPaths(DatabaseDescriptor.getNonSystemKeyspacesDataFileLocations());
     }
 
     public String getCommitLogLocation()
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 58402e1..dca7bd4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -119,6 +119,20 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public String[] getAllDataFileLocations();
 
     /**
+     * Returns the locations where should be stored the system keyspaces data.
+     *
+     * @return the locations where should be stored the system keyspaces data
+     */
+    public String[] getSystemKeyspacesDataFileLocations();
+
+    /**
+     * Returns the locations where should be stored the non system keyspaces 
data.
+     *
+     * @return the locations where should be stored the non system keyspaces 
data
+     */
+    public String[] getNonSystemKeyspacesDataFileLocations();
+
+    /**
      * Get location of the commit log
      * @return a string path
      */
diff --git a/test/conf/system_keyspaces_directory.yaml 
b/test/conf/system_keyspaces_directory.yaml
new file mode 100644
index 0000000..3d968c3
--- /dev/null
+++ b/test/conf/system_keyspaces_directory.yaml
@@ -0,0 +1 @@
+system_data_file_directory: build/test/cassandra/system_data
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java 
b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 23138b0..443fb8f 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -94,6 +94,9 @@ public class OffsetAwareConfigurationLoader extends 
YamlConfigurationLoader
         for (int i = 0; i < config.data_file_directories.length; i++)
             config.data_file_directories[i] += sep + offset;
 
+        if (config.system_data_file_directory != null)
+            config.system_data_file_directory += sep + offset;
+
         return config;
     }
 }
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java 
b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index eb2016f..019c6ab 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -22,7 +22,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -50,7 +49,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.junit.Assert.assertEquals;
@@ -88,18 +86,21 @@ public class DirectoriesTest
         tempDataDir.delete(); // hack to create a temp dir
         tempDataDir.mkdir();
 
-        Directories.overrideDataDirectoriesForTest(tempDataDir.getPath());
-        // Create two fake data dir for tests, one using CF directories, one 
that do not.
+       // Create two fake data dir for tests, one using CF directories, one 
that do not.
         createTestFiles();
     }
 
     @AfterClass
     public static void afterClass()
     {
-        Directories.resetDataDirectoriesAfterTest();
         FileUtils.deleteRecursive(tempDataDir);
     }
 
+    private static DataDirectory[] toDataDirectories(File location) throws 
IOException
+    {
+        return new DataDirectory[] { new DataDirectory(location) };
+    }
+
     private static void createTestFiles() throws IOException
     {
         for (TableMetadata cfm : CFM)
@@ -156,7 +157,7 @@ public class DirectoriesTest
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, 
toDataDirectories(tempDataDir));
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
 
             Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.name, 1, 
SSTableFormat.Type.BIG);
@@ -169,7 +170,7 @@ public class DirectoriesTest
     }
 
     @Test
-    public void testSecondaryIndexDirectories()
+    public void testSecondaryIndexDirectories() throws IOException
     {
         TableMetadata.Builder builder =
             TableMetadata.builder(KS, "cf")
@@ -187,8 +188,8 @@ public class DirectoriesTest
 
         TableMetadata PARENT_CFM = builder.build();
         TableMetadata INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, 
indexDef);
-        Directories parentDirectories = new Directories(PARENT_CFM);
-        Directories indexDirectories = new Directories(INDEX_CFM);
+        Directories parentDirectories = new Directories(PARENT_CFM, 
toDataDirectories(tempDataDir));
+        Directories indexDirectories = new Directories(INDEX_CFM, 
toDataDirectories(tempDataDir));
         // secondary index has its own directory
         for (File dir : indexDirectories.getCFDirectories())
         {
@@ -248,11 +249,11 @@ public class DirectoriesTest
     }
 
     @Test
-    public void testSSTableLister()
+    public void testSSTableLister() throws IOException
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, 
toDataDirectories(tempDataDir));
             checkFiles(cfm, directories);
         }
     }
@@ -301,7 +302,7 @@ public class DirectoriesTest
     {
         for (TableMetadata cfm : CFM)
         {
-            Directories directories = new Directories(cfm);
+            Directories directories = new Directories(cfm, 
toDataDirectories(tempDataDir));
 
             File tempDir = 
directories.getTemporaryWriteableDirectoryAsFile(10);
             tempDir.mkdir();
@@ -332,15 +333,18 @@ public class DirectoriesTest
         try
         {
             
DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.best_effort);
+
+            DataDirectory[] directories = 
Directories.dataDirectories.getDataDirectoriesUsedBy(KS);
+
             // Fake a Directory creation failure
-            if (Directories.dataDirectories.length > 0)
+            if (directories.length > 0)
             {
                 String[] path = new String[] {KS, "bad"};
-                File dir = new File(Directories.dataDirectories[0].location, 
StringUtils.join(path, File.separator));
+                File dir = new File(directories[0].location, 
StringUtils.join(path, File.separator));
                 JVMStabilityInspector.inspectThrowable(new FSWriteError(new 
IOException("Unable to create directory " + dir), dir));
             }
 
-            for (DataDirectory dd : Directories.dataDirectories)
+            for (DataDirectory dd : 
Directories.dataDirectories.getDataDirectoriesUsedBy(KS))
             {
                 File file = new File(dd.location, new File(KS, 
"bad").getPath());
                 assertTrue(DisallowedDirectories.isUnwritable(file));
@@ -357,7 +361,7 @@ public class DirectoriesTest
     {
         for (final TableMetadata cfm : CFM)
         {
-            final Directories directories = new Directories(cfm);
+            final Directories directories = new Directories(cfm, 
toDataDirectories(tempDataDir));
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
@@ -519,9 +523,9 @@ public class DirectoriesTest
     public void getDataDirectoryForFile()
     {
         Collection<DataDirectory> paths = new ArrayList<>();
-        paths.add(new DataDirectory(new File("/tmp/a")));
-        paths.add(new DataDirectory(new File("/tmp/aa")));
-        paths.add(new DataDirectory(new File("/tmp/aaa")));
+        paths.add(new DataDirectory("/tmp/a"));
+        paths.add(new DataDirectory("/tmp/aa"));
+        paths.add(new DataDirectory("/tmp/aaa"));
 
         for (TableMetadata cfm : CFM)
         {
diff --git a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java 
b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
index 373232d..a357df5 100644
--- a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
@@ -128,6 +128,75 @@ public class FileUtilsTest
         assertFalse(FileUtils.isContained(new File("/tmp/abc/../abc"), new 
File("/tmp/abcc")));
     }
 
+    @Test
+    public void testMoveFiles() throws IOException
+    {
+        Path tmpDir = 
Files.createTempDirectory(this.getClass().getSimpleName());
+        Path sourceDir = Files.createDirectory(tmpDir.resolve("source"));
+        Path subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_1.txt").toFile().createNewFile();
+        subDir_1.resolve("file_2.txt").toFile().createNewFile();
+        Path subDir_11 = Files.createDirectory(subDir_1.resolve("ab"));
+        subDir_11.resolve("file_1.txt").toFile().createNewFile();
+        subDir_11.resolve("file_2.txt").toFile().createNewFile();
+        subDir_11.resolve("file_3.txt").toFile().createNewFile();
+        Path subDir_12 = Files.createDirectory(subDir_1.resolve("ac"));
+        Path subDir_2 = Files.createDirectory(sourceDir.resolve("b"));
+        subDir_2.resolve("file_1.txt").toFile().createNewFile();
+        subDir_2.resolve("file_2.txt").toFile().createNewFile();
+
+        Path targetDir = Files.createDirectory(tmpDir.resolve("target"));
+
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertFalse(Files.exists(sourceDir));
+        assertTrue(Files.exists(targetDir.resolve("a/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_3.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ac/")));
+        assertTrue(Files.exists(targetDir.resolve("b/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("b/file_2.txt")));
+
+        // Tests that files can be moved into existing directories
+
+        sourceDir = Files.createDirectory(tmpDir.resolve("source2"));
+        subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_3.txt").toFile().createNewFile();
+        subDir_11 = Files.createDirectory(subDir_1.resolve("ab"));
+        subDir_11.resolve("file_4.txt").toFile().createNewFile();
+
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertFalse(Files.exists(sourceDir));
+        assertTrue(Files.exists(targetDir.resolve("a/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/file_3.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_3.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_4.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ab/file_2.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/ac/")));
+        assertTrue(Files.exists(targetDir.resolve("b/file_1.txt")));
+        assertTrue(Files.exists(targetDir.resolve("b/file_2.txt")));
+
+        // Tests that existing files are not replaced but trigger an error.
+
+        sourceDir = Files.createDirectory(tmpDir.resolve("source3"));
+        subDir_1 = Files.createDirectory(sourceDir.resolve("a"));
+        subDir_1.resolve("file_3.txt").toFile().createNewFile();
+        FileUtils.moveRecursively(sourceDir, targetDir);
+
+        assertTrue(Files.exists(sourceDir));
+        assertTrue(Files.exists(sourceDir.resolve("a/file_3.txt")));
+        assertTrue(Files.exists(targetDir.resolve("a/file_3.txt")));
+    }
+
     private File createFolder(Path path)
     {
         File folder = path.toFile();
diff --git a/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java 
b/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
index 7e70467..8f27f0b 100644
--- a/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
+++ b/test/unit/org/apache/cassandra/tools/ClearSnapshotTest.java
@@ -92,7 +92,7 @@ public class ClearSnapshotTest extends CQLTester
         assertTrue(!tool.getStdout().isEmpty());
         tool = 
runner.invokeNodetool("snapshot","-t","some-other-name").waitAndAssertOnCleanExit();
         assertTrue(!tool.getStdout().isEmpty());
-        
+
         Map<String, TabularData> snapshots_before = probe.getSnapshotDetails();
         Assert.assertTrue(snapshots_before.size() == 2);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to