This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push: new 8a9ba8866d Split compact storage upgrade tests to prevent OOM 8a9ba8866d is described below commit 8a9ba8866db6162a7b7352a260122d6e3c219567 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Mon May 16 17:29:32 2022 +0100 Split compact storage upgrade tests to prevent OOM patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for CASSANDRA-17213 --- .../upgrade/CompactStorage2to3UpgradeTest.java | 363 --------------------- .../upgrade/CompactStorageMultiColumnTest.java | 65 ++++ .../upgrade/CompactStorageSingleColumnTest.java | 66 ++++ ...opCompactStorageBeforeUpgradeSSTablesTest.java} | 3 +- .../upgrade/DropCompactStorageTester.java | 66 ++++ ...actStorageWithClusteringAndValueColumnTest.java | 120 +++++++ ...DropCompactStorageWithDeletesAndWritesTest.java | 159 +++++++++ 7 files changed, 477 insertions(+), 365 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java deleted file mode 100644 index 7235c728d1..0000000000 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.upgrade; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Test; - -import org.apache.cassandra.distributed.UpgradeableCluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.ICoordinator; -import org.apache.cassandra.distributed.api.IMessageFilters; -import org.apache.cassandra.distributed.api.NodeToolResult; -import org.apache.cassandra.distributed.shared.Versions; - -import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; -import static org.apache.cassandra.distributed.shared.AssertUtils.row; -import static org.junit.Assert.assertEquals; -import static org.apache.cassandra.distributed.api.Feature.GOSSIP; -import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - - -public class CompactStorage2to3UpgradeTest extends UpgradeTestBase -{ - @Test - public void multiColumn() throws Throwable - { - new TestCase() - .upgradesFrom(v22) - .setup(cluster -> { - assert cluster.size() == 3; - int rf = cluster.size() - 1; - assert rf == 2; - cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); - cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); - ICoordinator coordinator = cluster.coordinator(1); - // these shouldn't be replicated by the 3rd node - coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL); - coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL); - for (int i = 0; i < cluster.size(); i++) - { - int nodeNum = i + 1; - System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config()); - } - }) - .runAfterNodeUpgrade(((cluster, node) -> { - if (node != 2) - return; - - Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); - Object[][] expected = { - row(9, 9, "9"), - row(3, 3, "3") - }; - assertRows(rows, expected); - })).run(); - } - - @Test - public void singleColumn() throws Throwable - { - new TestCase() - .upgradesFrom(v22) - .setup(cluster -> { - assert cluster.size() == 3; - int rf = cluster.size() - 1; - assert rf == 2; - cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); - cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); - ICoordinator coordinator = cluster.coordinator(1); - // these shouldn't be replicated by the 3rd node - coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL); - coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL); - for (int i = 0; i < cluster.size(); i++) - { - int nodeNum = i + 1; - System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config()); - } - }) - .runAfterNodeUpgrade(((cluster, node) -> { - - if (node < 2) - return; - - Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); - Object[][] expected = { - row(9, 9), - row(3, 3) - }; - assertRows(rows, expected); - })).run(); - } - - @Test - public void testDropCompactWithClusteringAndValueColumn() throws Throwable - { - final String table = "clustering_and_value"; - final int partitions = 10; - final int rowsPerPartition = 10; - - final ResultsRecorder recorder = new ResultsRecorder(); - new TestCase() - .nodes(2) - .upgradesFrom(v22) - .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) - .setup(cluster -> { - cluster.schemaChange(String.format( - "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE", - KEYSPACE, table)); - ICoordinator coordinator = cluster.coordinator(1); - - for (int i = 1; i <= partitions; i++) - { - for (int j = 1; j <= rowsPerPartition; j++) - { - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - } - } - - runQueries(cluster.coordinator(1), recorder, new String[] { - String.format("SELECT * FROM %s.%s", KEYSPACE, table), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, partitions - 3, rowsPerPartition - 2), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, partitions - 1, rowsPerPartition - 5), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d", - KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d", - KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", - KEYSPACE, table, partitions - 8, rowsPerPartition - 3), - - }); - }).runBeforeNodeRestart((cluster, node) -> - { - cluster.get(node).config().set("enable_drop_compact_storage", true); - - - }).runAfterClusterUpgrade(cluster -> - { - for (int i = 1; i <= cluster.size(); i++) - { - NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables"); - assertEquals("upgrade sstables failed for node " + i, 0, result.getRc()); - } - Thread.sleep(1000); - - // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage - recorder.validateResults(cluster, 1); - recorder.validateResults(cluster, 2); - - // make sure the results are the same after dropping compact storage on only the first node - IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop(); - cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1); - - recorder.validateResults(cluster, 1, ConsistencyLevel.ONE); - - filter.off(); - recorder.validateResults(cluster, 1); - recorder.validateResults(cluster, 2); - - // make sure the results continue to be the same after dropping compact storage on the second node - cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2); - recorder.validateResults(cluster, 1); - recorder.validateResults(cluster, 2); - }) - .run(); - } - - @Test - public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable - { - final String table = "clustering_and_value_with_deletes"; - final int partitions = 10; - final int rowsPerPartition = 10; - final int additionalParititons = 5; - - new TestCase() - .nodes(2) - .upgradesFrom(v22) - .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true)) - .setup(cluster -> { - cluster.schemaChange(String.format( - "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE", - KEYSPACE, table)); - ICoordinator coordinator = cluster.coordinator(1); - - for (int i = 1; i <= partitions; i++) - { - for (int j = 1; j <= rowsPerPartition; j++) - { - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - } - } - - }) - .runAfterClusterUpgrade(cluster -> { - cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success()); - Thread.sleep(1000); - - // drop compact storage on only one node before performing writes - IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop(); - cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1); - filter.off(); - - // add new partitions and delete some of the old ones - ICoordinator coordinator = cluster.coordinator(1); - for (int i = 0; i < additionalParititons; i++) - { - for (int j = 1; j <= rowsPerPartition; j++) - { - coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", - KEYSPACE, table, i, j), ConsistencyLevel.ALL); - } - } - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 0, 3), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d", - KEYSPACE, table, 1), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", - KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL); - - coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1", - KEYSPACE, table, 6, 2), ConsistencyLevel.ALL); - - ResultsRecorder recorder = new ResultsRecorder(); - runQueries(coordinator, recorder, new String[] { - String.format("SELECT * FROM %s.%s", KEYSPACE, table), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, partitions - 3, rowsPerPartition - 2), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, partitions - 1, rowsPerPartition - 5), - - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", - KEYSPACE, table, partitions - 8, rowsPerPartition - 3), - - String.format("SELECT * FROM %s.%s WHERE key = %d", - KEYSPACE, table, 7), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 7, 2), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 8, 1), - - String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 8, 1), - - String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 8, 1), - - String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", - KEYSPACE, table, 4, 1), - - String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d", - KEYSPACE, table, 6), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", - KEYSPACE, table, 0, 1), - - String.format("SELECT * FROM %s.%s WHERE key = %d", - KEYSPACE, table, partitions - (additionalParititons - 2)), - - String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", - KEYSPACE, table, partitions - (additionalParititons - 3), 4) - - }); - - // drop compact storage on remaining node and check result - cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2); - recorder.validateResults(cluster, 1); - recorder.validateResults(cluster, 2); - }).run(); - } - - private void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries) - { - for (String query : queries) - helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL)); - } - - public static class ResultsRecorder - { - final private Map<String, Object[][]> preUpgradeResults = new HashMap<>(); - - public void addResult(String query, Object[][] results) - { - preUpgradeResults.put(query, results); - } - - public Map<String, Object[][]> queriesAndResults() - { - return preUpgradeResults; - } - - public void validateResults(UpgradeableCluster cluster, int node) - { - validateResults(cluster, node, ConsistencyLevel.ALL); - } - - public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl) - { - for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet()) - { - Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl); - assertRows(postUpgradeResult, entry.getValue()); - } - - } - } -} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java new file mode 100644 index 0000000000..086982ead2 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; + + +public class CompactStorageMultiColumnTest extends UpgradeTestBase +{ + @Test + public void multiColumn() throws Throwable + { + new TestCase() + .upgradesFrom(v22) + .setup(cluster -> { + assert cluster.size() == 3; + int rf = cluster.size() - 1; + assert rf == 2; + cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); + cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); + ICoordinator coordinator = cluster.coordinator(1); + // these shouldn't be replicated by the 3rd node + coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL); + coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL); + for (int i = 0; i < cluster.size(); i++) + { + int nodeNum = i + 1; + System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config()); + } + }) + .runAfterNodeUpgrade(((cluster, node) -> { + if (node != 2) + return; + + Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); + Object[][] expected = { + row(9, 9, "9"), + row(3, 3, "3") + }; + assertRows(rows, expected); + })).run(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java new file mode 100644 index 0000000000..8a7170f28d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; + + +public class CompactStorageSingleColumnTest extends UpgradeTestBase +{ + @Test + public void singleColumn() throws Throwable + { + new TestCase() + .upgradesFrom(v22) + .setup(cluster -> { + assert cluster.size() == 3; + int rf = cluster.size() - 1; + assert rf == 2; + cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); + cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); + ICoordinator coordinator = cluster.coordinator(1); + // these shouldn't be replicated by the 3rd node + coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL); + coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL); + for (int i = 0; i < cluster.size(); i++) + { + int nodeNum = i + 1; + System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config()); + } + }) + .runAfterNodeUpgrade(((cluster, node) -> { + + if (node < 2) + return; + + Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); + Object[][] expected = { + row(9, 9), + row(3, 3) + }; + assertRows(rows, expected); + })).run(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java similarity index 96% rename from test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java rename to test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java index 80ce02afe0..2fd94894ff 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java @@ -22,7 +22,6 @@ import com.vdurmont.semver4j.Semver; import org.junit.Test; import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.shared.Versions; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; @@ -30,7 +29,7 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; -public class DropCompactStorageTest extends UpgradeTestBase +public class DropCompactStorageBeforeUpgradeSSTablesTest extends DropCompactStorageTester { @Test public void dropCompactStorageBeforeUpgradesstablesTo3X() throws Throwable diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java new file mode 100644 index 0000000000..bd9f4d67ad --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + +public abstract class DropCompactStorageTester extends UpgradeTestBase +{ + protected void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries) + { + for (String query : queries) + helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL)); + } + + public static class ResultsRecorder + { + final private Map<String, Object[][]> preUpgradeResults = new HashMap<>(); + + public void addResult(String query, Object[][] results) + { + preUpgradeResults.put(query, results); + } + + public Map<String, Object[][]> queriesAndResults() + { + return preUpgradeResults; + } + + public void validateResults(UpgradeableCluster cluster, int node) + { + validateResults(cluster, node, ConsistencyLevel.ALL); + } + + public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl) + { + for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet()) + { + Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl); + assertRows(postUpgradeResult, entry.getValue()); + } + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java new file mode 100644 index 0000000000..53042d0ea0 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.api.NodeToolResult; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; + + +public class DropCompactStorageWithClusteringAndValueColumnTest extends DropCompactStorageTester +{ + @Test + public void testDropCompactWithClusteringAndValueColumn() throws Throwable + { + final String table = "clustering_and_value"; + final int partitions = 10; + final int rowsPerPartition = 10; + + final ResultsRecorder recorder = new ResultsRecorder(); + new TestCase() + .nodes(2) + .upgradesFrom(v22) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .setup(cluster -> { + cluster.schemaChange(String.format( + "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE", + KEYSPACE, table)); + ICoordinator coordinator = cluster.coordinator(1); + + for (int i = 1; i <= partitions; i++) + { + for (int j = 1; j <= rowsPerPartition; j++) + { + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + } + } + + runQueries(cluster.coordinator(1), recorder, new String[]{ + String.format("SELECT * FROM %s.%s", KEYSPACE, table), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, partitions - 3, rowsPerPartition - 2), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, partitions - 1, rowsPerPartition - 5), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d", + KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d", + KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", + KEYSPACE, table, partitions - 8, rowsPerPartition - 3), + }); + }).runBeforeNodeRestart((cluster, node) -> { + cluster.get(node).config().set("enable_drop_compact_storage", true); + }).runAfterClusterUpgrade(cluster -> { + for (int i = 1; i <= cluster.size(); i++) + { + NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables"); + assertEquals("upgrade sstables failed for node " + i, 0, result.getRc()); + } + Thread.sleep(1000); + + // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage + recorder.validateResults(cluster, 1); + recorder.validateResults(cluster, 2); + + // make sure the results are the same after dropping compact storage on only the first node + IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop(); + cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1); + + recorder.validateResults(cluster, 1, ConsistencyLevel.ONE); + + filter.off(); + recorder.validateResults(cluster, 1); + recorder.validateResults(cluster, 2); + + // make sure the results continue to be the same after dropping compact storage on the second node + cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2); + recorder.validateResults(cluster, 1); + recorder.validateResults(cluster, 2); + }) + .run(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java new file mode 100644 index 0000000000..9d9e278fde --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IMessageFilters; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + + +public class DropCompactStorageWithDeletesAndWritesTest extends DropCompactStorageTester +{ + @Test + public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable + { + final String table = "clustering_and_value_with_deletes"; + final int partitions = 10; + final int rowsPerPartition = 10; + final int additionalParititons = 5; + + new TestCase() + .nodes(2) + .upgradesFrom(v22) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true)) + .setup(cluster -> { + cluster.schemaChange(String.format( + "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE", + KEYSPACE, table)); + ICoordinator coordinator = cluster.coordinator(1); + + for (int i = 1; i <= partitions; i++) + { + for (int j = 1; j <= rowsPerPartition; j++) + { + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + } + } + + }) + .runAfterClusterUpgrade(cluster -> { + cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success()); + Thread.sleep(1000); + + // drop compact storage on only one node before performing writes + IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop(); + cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1); + filter.off(); + + // add new partitions and delete some of the old ones + ICoordinator coordinator = cluster.coordinator(1); + for (int i = 0; i < additionalParititons; i++) + { + for (int j = 1; j <= rowsPerPartition; j++) + { + coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)", + KEYSPACE, table, i, j), ConsistencyLevel.ALL); + } + } + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 0, 3), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d", + KEYSPACE, table, 1), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d", + KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL); + + coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1", + KEYSPACE, table, 6, 2), ConsistencyLevel.ALL); + + ResultsRecorder recorder = new ResultsRecorder(); + runQueries(coordinator, recorder, new String[] { + String.format("SELECT * FROM %s.%s", KEYSPACE, table), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, partitions - 3, rowsPerPartition - 2), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, partitions - 1, rowsPerPartition - 5), + + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", + KEYSPACE, table, partitions - 8, rowsPerPartition - 3), + + String.format("SELECT * FROM %s.%s WHERE key = %d", + KEYSPACE, table, 7), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 7, 2), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 8, 1), + + String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 8, 1), + + String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 8, 1), + + String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d", + KEYSPACE, table, 4, 1), + + String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d", + KEYSPACE, table, 6), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", + KEYSPACE, table, 0, 1), + + String.format("SELECT * FROM %s.%s WHERE key = %d", + KEYSPACE, table, partitions - (additionalParititons - 2)), + + String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d", + KEYSPACE, table, partitions - (additionalParititons - 3), 4) + + }); + + // drop compact storage on remaining node and check result + cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2); + recorder.validateResults(cluster, 1); + recorder.validateResults(cluster, 2); + }).run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org