This is an automated email from the ASF dual-hosted git repository.

jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92f7c8d  Ignore COMPACT STORAGE flag for tables for which its safe to 
do so
92f7c8d is described below

commit 92f7c8db1444bf5d757cd50dba2211a446f3b22c
Author: Jordan West <jw...@apache.org>
AuthorDate: Mon Aug 17 14:20:32 2020 -0700

    Ignore COMPACT STORAGE flag for tables for which its safe to do so
    
    patch by Jordan West; Reviewed by Marcus Eriksson and Caleb Rackliffe for 
CASSANDRA-16048
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/schema/SchemaKeyspace.java    |  60 ++++++-
 .../cassandra/distributed/impl/Instance.java       |  10 ++
 .../upgrade/CompactStorage3to4UpgradeTest.java     | 190 +++++++++++++++++++++
 4 files changed, 255 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 70bbd4f..04af373 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@ Merged from 3.0:
  * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
 Merged from 2.2:
  * Fixed a NullPointerException when calling nodetool enablethrift 
(CASSANDRA-16127)
+ * Automatically drop compact storage on tables for which it is safe 
(CASSANDRA-16048)
 
 4.0-beta2
  * Add addition incremental repair visibility to nodetool repair_admin 
(CASSANDRA-14939)
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 0333ee6..f6dbe03 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -866,15 +866,34 @@ public final class SchemaKeyspace
         StringBuilder messages = new StringBuilder();
         for (UntypedResultSet.Row row : query(query))
         {
-            if 
(SchemaConstants.isLocalSystemKeyspace(row.getString("keyspace_name")))
+            String keyspaceName = row.getString("keyspace_name");
+            if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
                 continue;
 
-            Set<String> flags = row.getFrozenSet("flags", UTF8Type.instance);
-            if 
(TableMetadata.Flag.isLegacyCompactTable(TableMetadata.Flag.fromStringSet(flags)))
+            Set<TableMetadata.Flag> flags = 
TableMetadata.Flag.fromStringSet(row.getFrozenSet("flags", UTF8Type.instance));
+            if (TableMetadata.Flag.isLegacyCompactTable(flags))
             {
-                messages.append(String.format("ALTER TABLE %s.%s DROP COMPACT 
STORAGE;\n",
-                                              
maybeQuote(row.getString("keyspace_name")),
-                                              
maybeQuote(row.getString("table_name"))));
+                String tableName = row.getString("table_name");
+                if (isSafeToDropCompactStorage(keyspaceName, tableName))
+                {
+                    flags.remove(TableMetadata.Flag.DENSE);
+                    flags.add(TableMetadata.Flag.COMPOUND);
+                    String update = String.format("UPDATE %s.%s SET flags={%s} 
WHERE keyspace_name='%s' AND table_name='%s'",
+                                                  
SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES,
+                                                  
TableMetadata.Flag.toStringSet(flags).stream()
+                                                                               
        .map(f -> "'" + f + "'")
+                                                                               
        .collect(Collectors.joining(", ")),
+                                                  keyspaceName, tableName);
+
+                    logger.info("Safely dropping COMPACT STORAGE on {}.{}", 
keyspaceName, tableName);
+                    executeInternal(update);
+                }
+                else
+                {
+                    messages.append(String.format("ALTER TABLE %s.%s DROP 
COMPACT STORAGE;\n",
+                                                  
maybeQuote(row.getString("keyspace_name")),
+                                                  maybeQuote(tableName)));
+                }
             }
         }
 
@@ -889,6 +908,35 @@ public final class SchemaKeyspace
         }
     }
 
+    private static boolean isSafeToDropCompactStorage(String keyspaceName, 
String tableName)
+    {
+        if 
(!Boolean.parseBoolean(System.getProperty("cassandra.auto_drop_compact_storage",
 "false")))
+            return false;
+
+        String columnQuery = String.format("SELECT kind, type FROM %s.%s WHERE 
keyspace_name='%s' and table_name='%s'",
+                                           
SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName);
+
+        String simpleType = "empty";
+        int simpleCount = 0;
+        for (UntypedResultSet.Row row : query(columnQuery))
+        {
+            String kind = row.getString("kind");
+            if (kind.equalsIgnoreCase("partition_key") || 
kind.equalsIgnoreCase("clustering"))
+                continue;
+
+            if (kind.equalsIgnoreCase("static"))
+                return false;
+
+            simpleCount++; // if not partition, clustering, or static column 
then its a regular columnb
+            simpleType = row.getString("type"); // only save one type becuase 
if there is > 1 simple column then false is returned
+        }
+
+        if (simpleCount == 1 && !simpleType.equalsIgnoreCase("empty"))
+            return true;
+
+        return false;
+    }
+
     private static Keyspaces fetchKeyspacesWithout(Set<String> 
excludedKeyspaceNames)
     {
         String query = format("SELECT keyspace_name FROM %s.%s", 
SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 9a3fd08..edd525d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -79,6 +79,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
@@ -103,6 +104,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StartupChecks;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamReceiveTask;
@@ -415,6 +417,14 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 DatabaseDescriptor.createAllDirectories();
                 CommitLog.instance.start();
 
+                try
+                {
+                    new StartupChecks().withDefaultTests().verify();
+                } catch (StartupException e)
+                {
+                    throw e;
+                }
+
                 // We need to persist this as soon as possible after startup 
checks.
                 // This should be the first write to SystemKeyspace 
(CASSANDRA-11742)
                 SystemKeyspace.persistLocalMetadata();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
new file mode 100644
index 0000000..5317517
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class CompactStorage3to4UpgradeTest extends UpgradeTestBase
+{
+
+    public static final String TABLE_NAME = "cs_tbl";
+    public static final String CREATE_TABLE_C1_R1 = String.format(
+         "CREATE TABLE %s.%s (key int, c1 int, v int, PRIMARY KEY (key, c1)) 
WITH COMPACT STORAGE",
+         KEYSPACE, TABLE_NAME);
+    public static final String CREATE_TABLE_C1_ONLY = String.format(
+         "CREATE TABLE %s.%s (key int, c1 int, PRIMARY KEY (key, c1)) WITH 
COMPACT STORAGE",
+         KEYSPACE, TABLE_NAME);
+    public static final String CREATE_TABLE_R_ONLY = String.format(
+    "CREATE TABLE %s.%s (key int, c1 int, c2 int, PRIMARY KEY (key)) WITH 
COMPACT STORAGE",
+    KEYSPACE, TABLE_NAME);
+
+    public static final String INSERT_C1_R1 = String.format(
+         "INSERT INTO %s.%s (key, c1, v) VALUES (?, ?, ?)",
+         KEYSPACE, TABLE_NAME);
+
+    @Test
+    public void ignoreDenseCompoundTablesWithValueColumn() throws Throwable
+    {
+        System.setProperty("cassandra.auto_drop_compact_storage", "true");
+        final int partitions = 10;
+        final int rowsPerPartition = 10;
+
+        DropCompactTestHelper helper = new DropCompactTestHelper();
+        new TestCase()
+        .nodes(2)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .setup(cluster -> {
+            cluster.schemaChange(CREATE_TABLE_C1_R1);
+
+            ICoordinator coordinator = cluster.coordinator(1);
+            for (int i = 1; i <= partitions; i++)
+                for (int j = 1; j <= rowsPerPartition; j++)
+                    coordinator.execute(INSERT_C1_R1, ConsistencyLevel.ALL, i, 
j, i + j);
+
+
+            runQueries(coordinator, helper, new String[]{
+                String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE_NAME),
+
+                String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                              KEYSPACE, TABLE_NAME, partitions - 3, 
rowsPerPartition - 2),
+
+                String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                              KEYSPACE, TABLE_NAME, partitions - 1, 
rowsPerPartition - 5),
+
+                String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+                              KEYSPACE, TABLE_NAME, partitions - 8, 
rowsPerPartition - 3),
+            });
+        })
+        .runAfterNodeUpgrade((cluster, node) -> {
+            validateResults(helper, cluster, 1);
+            validateResults(helper, cluster, 2);
+
+            String flagQuery = String.format("SELECT flags FROM 
system_schema.tables WHERE keyspace_name='%s' and table_name='%s'", KEYSPACE, 
TABLE_NAME);
+            Object[][] results = cluster.get(node).executeInternal(flagQuery);
+            if (results.length != 1)
+                Assert.fail("failed to find table flags with query: " + 
flagQuery);
+
+            Set<String> flags = (Set) results[0][0];
+            Assert.assertTrue("missing compound flag", 
flags.contains("compound"));
+            Assert.assertFalse("found dense flag", flags.contains("dense"));
+        })
+        .run();
+    }
+
+    @Test
+    public void failOnCompactClusteredTablesWithValueOutColumn() throws 
Throwable
+    {
+        try
+        {
+            new TestCase()
+            .nodes(2)
+            .upgrade(Versions.Major.v30, Versions.Major.v4)
+            .setup(cluster -> cluster.schemaChange(CREATE_TABLE_C1_ONLY))
+            .runAfterNodeUpgrade((cluster, node) -> {
+                Assert.fail("should never run because we don't expect the node 
to start");
+            })
+            .run();
+        } catch (RuntimeException e)
+        {
+            validateError(e);
+        }
+    }
+
+    @Test
+    public void failOnCompactTablesWithNoClustering() throws Throwable
+    {
+        try
+        {
+            new TestCase()
+            .nodes(2)
+            .upgrade(Versions.Major.v30, Versions.Major.v4)
+            .setup(cluster -> cluster.schemaChange(CREATE_TABLE_R_ONLY))
+            .runAfterNodeUpgrade((cluster, node) -> {
+                Assert.fail("should never run because we don't expect the node 
to start");
+            })
+            .run();
+        } catch (RuntimeException e)
+        {
+            validateError(e);
+        }
+    }
+
+
+    public void validateResults(DropCompactTestHelper helper, 
UpgradeableCluster cluster, int node)
+    {
+        validateResults(helper, cluster, node, ConsistencyLevel.ALL);
+    }
+
+    public void validateResults(DropCompactTestHelper helper, 
UpgradeableCluster cluster, int node, ConsistencyLevel cl)
+    {
+        for (Map.Entry<String, Object[][]> entry : 
helper.queriesAndResults().entrySet())
+        {
+            Object[][] postUpgradeResult = 
cluster.coordinator(node).execute(entry.getKey(), cl);
+            assertRows(postUpgradeResult, entry.getValue());
+        }
+
+    }
+
+    private void runQueries(ICoordinator coordinator, DropCompactTestHelper 
helper, String[] queries)
+    {
+        for (String query : queries)
+            helper.addResult(query, coordinator.execute(query, 
ConsistencyLevel.ALL));
+    }
+
+    private void validateError(Throwable t)
+    {
+        Throwable cause = t.getCause();
+        if (cause instanceof StartupException)
+        {
+            Assert.assertTrue("Message was: " + cause.getMessage(),
+                              cause.getMessage().contains(String.format("ALTER 
TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME)));
+        }
+
+    }
+
+    public static class DropCompactTestHelper
+    {
+        final private Map<String, Object[][]> preUpgradeResults = new 
HashMap<>();
+
+        public void addResult(String query, Object[][] results)
+        {
+            preUpgradeResults.put(query, results);
+        }
+
+        public Map<String, Object[][]> queriesAndResults()
+        {
+            return preUpgradeResults;
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to