Author: jbellis Date: Sun Jun 5 14:45:47 2011 New Revision: 1132428 URL: http://svn.apache.org/viewvc?rev=1132428&view=rev Log: add test compaction package
Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1132428&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (added) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Sun Jun 5 14:45:47 2011 @@ -0,0 +1,236 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.Util; + +import static junit.framework.Assert.assertEquals; +import static org.apache.cassandra.db.TableTest.assertColumns; +import org.apache.cassandra.utils.ByteBufferUtil; + + +public class CompactionsPurgeTest extends CleanupHelper +{ + public static final String TABLE1 = "Keyspace1"; + public static final String TABLE2 = "Keyspace2"; + + @Test + public void testMajorCompactionPurge() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + Table table = Table.open(TABLE1); + String cfName = "Standard1"; + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + DecoratedKey key = Util.dk("key1"); + RowMutation rm; + + // inserts + rm = new RowMutation(TABLE1, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); + } + rm.apply(); + cfs.forceBlockingFlush(); + + // deletes + for (int i = 0; i < 10; i++) + { + rm = new RowMutation(TABLE1, key.key); + rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1); + rm.apply(); + } + cfs.forceBlockingFlush(); + + // resurrect one column + rm = new RowMutation(TABLE1, key.key); + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2); + rm.apply(); + cfs.forceBlockingFlush(); + + // major compact and test that all columns but the resurrected one is completely gone + CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get(); + cfs.invalidateCachedRow(key); + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + assertColumns(cf, "5"); + assert cf.getColumn(ByteBufferUtil.bytes(String.valueOf(5))) != null; + } + + @Test + public void testMinorCompactionPurge() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + Table table = Table.open(TABLE2); + String cfName = "Standard1"; + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + RowMutation rm; + for (int k = 1; k <= 2; ++k) { + DecoratedKey key = Util.dk("key" + k); + + // inserts + rm = new RowMutation(TABLE2, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); + } + rm.apply(); + cfs.forceBlockingFlush(); + + // deletes + for (int i = 0; i < 10; i++) + { + rm = new RowMutation(TABLE2, key.key); + rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1); + rm.apply(); + } + cfs.forceBlockingFlush(); + } + + DecoratedKey key1 = Util.dk("key1"); + DecoratedKey key2 = Util.dk("key2"); + + // flush, remember the current sstable and then resurrect one column + // for first key. Then submit minor compaction on remembered sstables. + cfs.forceBlockingFlush(); + Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables(); + rm = new RowMutation(TABLE2, key1.key); + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2); + rm.apply(); + cfs.forceBlockingFlush(); + CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE); + + // verify that minor compaction does not GC when key is present + // in a non-compacted sstable + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName))); + assert cf.getColumnCount() == 10; + + // verify that minor compaction does GC when key is provably not + // present in a non-compacted sstable + cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, new QueryPath(cfName))); + assert cf == null; + } + + @Test + public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + Table table = Table.open(TABLE1); + String cfName = "Standard2"; + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + DecoratedKey key = Util.dk("key1"); + RowMutation rm; + + // inserts + rm = new RowMutation(TABLE1, key.key); + for (int i = 0; i < 5; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); + } + rm.apply(); + + // deletes + for (int i = 0; i < 5; i++) + { + rm = new RowMutation(TABLE1, key.key); + rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1); + rm.apply(); + } + cfs.forceBlockingFlush(); + assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable + + // compact and test that the row is completely gone + Util.compactAll(cfs).get(); + assert cfs.getSSTables().isEmpty(); + ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + assert cf == null : cf; + } + + @Test + public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + String tableName = "RowCacheSpace"; + String cfName = "CachedCF"; + Table table = Table.open(tableName); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + DecoratedKey key = Util.dk("key3"); + RowMutation rm; + + // inserts + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); + } + rm.apply(); + + // move the key up in row cache + cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + + // deletes row + rm = new RowMutation(tableName, key.key); + rm.delete(new QueryPath(cfName, null, null), 1); + rm.apply(); + + // flush and major compact + cfs.forceBlockingFlush(); + Util.compactAll(cfs).get(); + + // re-inserts with timestamp lower than delete + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0); + } + rm.apply(); + + // Check that the second insert did went in + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + assertEquals(10, cf.getColumnCount()); + for (IColumn c : cf) + assert !c.isMarkedForDelete(); + } +} Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1132428&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (added) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Sun Jun 5 14:45:47 2011 @@ -0,0 +1,190 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; + +import org.apache.cassandra.Util; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import static junit.framework.Assert.assertEquals; + +public class CompactionsTest extends CleanupHelper +{ + public static final String TABLE1 = "Keyspace1"; + public static final String TABLE2 = "Keyspace2"; + public static final InetAddress LOCAL = FBUtilities.getLocalAddress(); + + public static final int MIN_COMPACTION_THRESHOLD = 2; + + @Test + public void testCompactions() throws IOException, ExecutionException, InterruptedException + { + // this test does enough rows to force multiple block indexes to be used + Table table = Table.open(TABLE1); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + + final int ROWS_PER_SSTABLE = 10; + final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE); + + // disable compaction while flushing + store.disableAutoCompaction(); + + Set<DecoratedKey> inserted = new HashSet<DecoratedKey>(); + for (int j = 0; j < SSTABLES; j++) { + for (int i = 0; i < ROWS_PER_SSTABLE; i++) { + DecoratedKey key = Util.dk(String.valueOf(i % 2)); + RowMutation rm = new RowMutation(TABLE1, key.key); + rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i); + rm.apply(); + inserted.add(key); + } + store.forceBlockingFlush(); + assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size()); + } + // re-enable compaction with thresholds low enough to force a few rounds + store.setMinimumCompactionThreshold(2); + store.setMaximumCompactionThreshold(4); + // loop submitting parallel compactions until they all return 0 + while (true) + { + ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>(); + for (int i = 0; i < 10; i++) + compactions.add(CompactionManager.instance.submitMinorIfNeeded(store)); + // another compaction attempt will be launched in the background by + // each completing compaction: not much we can do to control them here + boolean progress = false; + for (Future<Integer> compaction : compactions) + if (compaction.get() > 0) + progress = true; + if (!progress) + break; + } + if (store.getSSTables().size() > 1) + { + CompactionManager.instance.performMajor(store); + } + assertEquals(inserted.size(), Util.getRangeSlice(store).size()); + } + + @Test + public void testGetBuckets() + { + List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>(); + String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" }; + for (String st : strings) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2); + assertEquals(3, buckets.size()); + + for (List<String> bucket : buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0).length(), bucket.get(1).length()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + } + + pairs.clear(); + buckets.clear(); + + String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (String st : strings2) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + buckets = CompactionManager.getBuckets(pairs, 2); + assertEquals(2, buckets.size()); + + for (List<String> bucket : buckets) + { + assertEquals(3, bucket.size()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0)); + } + + // Test the "min" functionality + pairs.clear(); + buckets.clear(); + + String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (String st : strings3) + { + Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length())); + pairs.add(pair); + } + + buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10 + assertEquals(1, buckets.size()); + } + @Test + public void testEchoedRow() throws IOException, ExecutionException, InterruptedException + { + // This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653 + + Table table = Table.open(TABLE1); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard2"); + + // disable compaction while flushing + store.disableAutoCompaction(); + + // Insert 4 keys in two sstables. We need the sstables to have 2 rows + // at least to trigger what was causing CASSANDRA-2653 + for (int i=1; i < 5; i++) + { + DecoratedKey key = Util.dk(String.valueOf(i)); + RowMutation rm = new RowMutation(TABLE1, key.key); + rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + rm.apply(); + + if (i % 2 == 0) + store.forceBlockingFlush(); + } + + // Force compaction. Since each row is in only one sstable, we will be using EchoedRow. + CompactionManager.instance.performMajor(store); + + // Now assert we do have the two keys + assertEquals(4, Util.getRangeSlice(store).size()); + } +} Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java?rev=1132428&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java (added) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java Sun Jun 5 14:45:47 2011 @@ -0,0 +1,75 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.Set; +import java.util.HashSet; + +import org.apache.cassandra.Util; + +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.ColumnFamilyStore; + +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.utils.ByteBufferUtil; + + +public class OneCompactionTest extends CleanupHelper +{ + private void testCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + Table table = Table.open("Keyspace1"); + ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName); + + Set<DecoratedKey> inserted = new HashSet<DecoratedKey>(); + for (int j = 0; j < insertsPerTable; j++) { + DecoratedKey key = Util.dk(String.valueOf(j)); + RowMutation rm = new RowMutation("Keyspace1", key.key); + rm.add(new QueryPath(columnFamilyName, null, ByteBufferUtil.bytes("0")), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.apply(); + inserted.add(key); + store.forceBlockingFlush(); + assertEquals(inserted.size(), Util.getRangeSlice(store).size()); + } + CompactionManager.instance.performMajor(store); + assertEquals(1, store.getSSTables().size()); + } + + @Test + public void testCompaction1() throws IOException, ExecutionException, InterruptedException + { + testCompaction("Standard1", 1); + } + + @Test + public void testCompaction2() throws IOException, ExecutionException, InterruptedException + { + testCompaction("Standard2", 2); + } +}