This is an automated email from the ASF dual-hosted git repository. jwest pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 92f7c8d Ignore COMPACT STORAGE flag for tables for which its safe to do so 92f7c8d is described below commit 92f7c8db1444bf5d757cd50dba2211a446f3b22c Author: Jordan West <jw...@apache.org> AuthorDate: Mon Aug 17 14:20:32 2020 -0700 Ignore COMPACT STORAGE flag for tables for which its safe to do so patch by Jordan West; Reviewed by Marcus Eriksson and Caleb Rackliffe for CASSANDRA-16048 --- CHANGES.txt | 1 + .../apache/cassandra/schema/SchemaKeyspace.java | 60 ++++++- .../cassandra/distributed/impl/Instance.java | 10 ++ .../upgrade/CompactStorage3to4UpgradeTest.java | 190 +++++++++++++++++++++ 4 files changed, 255 insertions(+), 6 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 70bbd4f..04af373 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,7 @@ Merged from 3.0: * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160) Merged from 2.2: * Fixed a NullPointerException when calling nodetool enablethrift (CASSANDRA-16127) + * Automatically drop compact storage on tables for which it is safe (CASSANDRA-16048) 4.0-beta2 * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 0333ee6..f6dbe03 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -866,15 +866,34 @@ public final class SchemaKeyspace StringBuilder messages = new StringBuilder(); for (UntypedResultSet.Row row : query(query)) { - if (SchemaConstants.isLocalSystemKeyspace(row.getString("keyspace_name"))) + String keyspaceName = row.getString("keyspace_name"); + if (SchemaConstants.isLocalSystemKeyspace(keyspaceName)) continue; - Set<String> flags = row.getFrozenSet("flags", UTF8Type.instance); - if (TableMetadata.Flag.isLegacyCompactTable(TableMetadata.Flag.fromStringSet(flags))) + Set<TableMetadata.Flag> flags = TableMetadata.Flag.fromStringSet(row.getFrozenSet("flags", UTF8Type.instance)); + if (TableMetadata.Flag.isLegacyCompactTable(flags)) { - messages.append(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE;\n", - maybeQuote(row.getString("keyspace_name")), - maybeQuote(row.getString("table_name")))); + String tableName = row.getString("table_name"); + if (isSafeToDropCompactStorage(keyspaceName, tableName)) + { + flags.remove(TableMetadata.Flag.DENSE); + flags.add(TableMetadata.Flag.COMPOUND); + String update = String.format("UPDATE %s.%s SET flags={%s} WHERE keyspace_name='%s' AND table_name='%s'", + SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES, + TableMetadata.Flag.toStringSet(flags).stream() + .map(f -> "'" + f + "'") + .collect(Collectors.joining(", ")), + keyspaceName, tableName); + + logger.info("Safely dropping COMPACT STORAGE on {}.{}", keyspaceName, tableName); + executeInternal(update); + } + else + { + messages.append(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE;\n", + maybeQuote(row.getString("keyspace_name")), + maybeQuote(tableName))); + } } } @@ -889,6 +908,35 @@ public final class SchemaKeyspace } } + private static boolean isSafeToDropCompactStorage(String keyspaceName, String tableName) + { + if (!Boolean.parseBoolean(System.getProperty("cassandra.auto_drop_compact_storage", "false"))) + return false; + + String columnQuery = String.format("SELECT kind, type FROM %s.%s WHERE keyspace_name='%s' and table_name='%s'", + SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName); + + String simpleType = "empty"; + int simpleCount = 0; + for (UntypedResultSet.Row row : query(columnQuery)) + { + String kind = row.getString("kind"); + if (kind.equalsIgnoreCase("partition_key") || kind.equalsIgnoreCase("clustering")) + continue; + + if (kind.equalsIgnoreCase("static")) + return false; + + simpleCount++; // if not partition, clustering, or static column then its a regular columnb + simpleType = row.getString("type"); // only save one type becuase if there is > 1 simple column then false is returned + } + + if (simpleCount == 1 && !simpleType.equalsIgnoreCase("empty")) + return true; + + return false; + } + private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames) { String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 9a3fd08..edd525d 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -79,6 +79,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe; import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory; import org.apache.cassandra.distributed.shared.InstanceClassLoader; +import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; @@ -103,6 +104,7 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.DefaultFSErrorHandler; import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StartupChecks; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.streaming.StreamReceiveTask; @@ -415,6 +417,14 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance DatabaseDescriptor.createAllDirectories(); CommitLog.instance.start(); + try + { + new StartupChecks().withDefaultTests().verify(); + } catch (StartupException e) + { + throw e; + } + // We need to persist this as soon as possible after startup checks. // This should be the first write to SystemKeyspace (CASSANDRA-11742) SystemKeyspace.persistLocalMetadata(); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java new file mode 100644 index 0000000..5317517 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.exceptions.StartupException; +import org.apache.cassandra.schema.TableMetadata; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + +public class CompactStorage3to4UpgradeTest extends UpgradeTestBase +{ + + public static final String TABLE_NAME = "cs_tbl"; + public static final String CREATE_TABLE_C1_R1 = String.format( + "CREATE TABLE %s.%s (key int, c1 int, v int, PRIMARY KEY (key, c1)) WITH COMPACT STORAGE", + KEYSPACE, TABLE_NAME); + public static final String CREATE_TABLE_C1_ONLY = String.format( + "CREATE TABLE %s.%s (key int, c1 int, PRIMARY KEY (key, c1)) WITH COMPACT STORAGE", + KEYSPACE, TABLE_NAME); + public static final String CREATE_TABLE_R_ONLY = String.format( + "CREATE TABLE %s.%s (key int, c1 int, c2 int, PRIMARY KEY (key)) WITH COMPACT STORAGE", + KEYSPACE, TABLE_NAME); + + public static final String INSERT_C1_R1 = String.format( + "INSERT INTO %s.%s (key, c1, v) VALUES (?, ?, ?)", + KEYSPACE, TABLE_NAME); + + @Test + public void ignoreDenseCompoundTablesWithValueColumn() throws Throwable + { + System.setProperty("cassandra.auto_drop_compact_storage", "true"); + final int partitions = 10; + final int rowsPerPartition = 10; + + DropCompactTestHelper helper = new DropCompactTestHelper(); + new TestCase() + .nodes(2) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .setup(cluster -> { + cluster.schemaChange(CREATE_TABLE_C1_R1); + + ICoordinator coordinator = cluster.coordinator(1); + for (int i = 1; i <= partitions; i++) + for (int j = 1; j <= rowsPerPartition; j++) + coordinator.execute(INSERT_C1_R1, ConsistencyLevel.ALL, i, j, i + j); + + + runQueries(coordinator, helper, new String[]{ + String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE_NAME), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, TABLE_NAME, partitions - 3, rowsPerPartition - 2), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, TABLE_NAME, partitions - 1, rowsPerPartition - 5), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", + KEYSPACE, TABLE_NAME, partitions - 8, rowsPerPartition - 3), + }); + }) + .runAfterNodeUpgrade((cluster, node) -> { + validateResults(helper, cluster, 1); + validateResults(helper, cluster, 2); + + String flagQuery = String.format("SELECT flags FROM system_schema.tables WHERE keyspace_name='%s' and table_name='%s'", KEYSPACE, TABLE_NAME); + Object[][] results = cluster.get(node).executeInternal(flagQuery); + if (results.length != 1) + Assert.fail("failed to find table flags with query: " + flagQuery); + + Set<String> flags = (Set) results[0][0]; + Assert.assertTrue("missing compound flag", flags.contains("compound")); + Assert.assertFalse("found dense flag", flags.contains("dense")); + }) + .run(); + } + + @Test + public void failOnCompactClusteredTablesWithValueOutColumn() throws Throwable + { + try + { + new TestCase() + .nodes(2) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .setup(cluster -> cluster.schemaChange(CREATE_TABLE_C1_ONLY)) + .runAfterNodeUpgrade((cluster, node) -> { + Assert.fail("should never run because we don't expect the node to start"); + }) + .run(); + } catch (RuntimeException e) + { + validateError(e); + } + } + + @Test + public void failOnCompactTablesWithNoClustering() throws Throwable + { + try + { + new TestCase() + .nodes(2) + .upgrade(Versions.Major.v30, Versions.Major.v4) + .setup(cluster -> cluster.schemaChange(CREATE_TABLE_R_ONLY)) + .runAfterNodeUpgrade((cluster, node) -> { + Assert.fail("should never run because we don't expect the node to start"); + }) + .run(); + } catch (RuntimeException e) + { + validateError(e); + } + } + + + public void validateResults(DropCompactTestHelper helper, UpgradeableCluster cluster, int node) + { + validateResults(helper, cluster, node, ConsistencyLevel.ALL); + } + + public void validateResults(DropCompactTestHelper helper, UpgradeableCluster cluster, int node, ConsistencyLevel cl) + { + for (Map.Entry<String, Object[][]> entry : helper.queriesAndResults().entrySet()) + { + Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl); + assertRows(postUpgradeResult, entry.getValue()); + } + + } + + private void runQueries(ICoordinator coordinator, DropCompactTestHelper helper, String[] queries) + { + for (String query : queries) + helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL)); + } + + private void validateError(Throwable t) + { + Throwable cause = t.getCause(); + if (cause instanceof StartupException) + { + Assert.assertTrue("Message was: " + cause.getMessage(), + cause.getMessage().contains(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME))); + } + + } + + public static class DropCompactTestHelper + { + final private Map<String, Object[][]> preUpgradeResults = new HashMap<>(); + + public void addResult(String query, Object[][] results) + { + preUpgradeResults.put(query, results); + } + + public Map<String, Object[][]> queriesAndResults() + { + return preUpgradeResults; + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org