Repository: cassandra Updated Branches: refs/heads/trunk d52bdaefd -> 53c0ef171
Bump SSTable level instead of rewriting SSTable completely during single-sstable compactions Patch by Marcus Eriksson, reviewed by Alex Petrov for CASSANDRA-12526 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53c0ef17 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53c0ef17 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53c0ef17 Branch: refs/heads/trunk Commit: 53c0ef171424454c47d64a9326b0ba83cd743a50 Parents: d52bdae Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Apr 16 15:55:11 2018 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Tue Jun 19 18:25:43 2018 +0200 ---------------------------------------------------------------------- .../compaction/AbstractCompactionStrategy.java | 14 ++ .../compaction/CompactionStrategyManager.java | 12 ++ .../compaction/LeveledCompactionStrategy.java | 28 +++- .../db/compaction/LeveledManifest.java | 17 +- .../db/compaction/SingleSSTableLCSTask.java | 101 ++++++++++++ .../apache/cassandra/db/lifecycle/Tracker.java | 9 ++ .../notifications/SSTableMetadataChanged.java | 33 ++++ .../db/compaction/SingleSSTableLCSTaskTest.java | 155 +++++++++++++++++++ .../cassandra/db/lifecycle/TrackerTest.java | 4 + .../cassandra/io/sstable/LegacySSTableTest.java | 29 +++- 10 files changed, 393 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index a43761f..3410f13 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -42,6 +42,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.schema.CompactionParams; /** @@ -286,6 +287,19 @@ public abstract class AbstractCompactionStrategy @VisibleForTesting protected abstract Set<SSTableReader> getSSTables(); + /** + * Called when the metadata has changed for an sstable - for example if the level changed + * + * Not called when repair status changes (which is also metadata), because this results in the + * sstable getting removed from the compaction strategy instance. + * + * @param oldMetadata + * @param sstable + */ + public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable) + { + } + public static class ScannerList implements AutoCloseable { public final List<ISSTableScanner> scanners; http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index b954d5e..81b7c7e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.notifications.*; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.TableMetadata; @@ -770,6 +771,12 @@ public class CompactionStrategyManager implements INotificationConsumer } } + private void handleMetadataChangedNotification(SSTableReader sstable, StatsMetadata oldMetadata) + { + AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable); + acs.metadataChanged(oldMetadata, sstable); + } + private void handleDeletingNotification(SSTableReader deleted) { // If reloaded, SSTables will be placed in their correct locations @@ -806,6 +813,11 @@ public class CompactionStrategyManager implements INotificationConsumer { handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); } + else if (notification instanceof SSTableMetadataChanged) + { + SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification; + handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata); + } } public void enable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index b1091ce..a65a20e 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.Config; @@ -49,18 +50,21 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; private static final boolean tolerateSstableSize = Boolean.getBoolean(Config.PROPERTY_PREFIX + "tolerate_sstable_size"); private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size"; + private static final String SINGLE_SSTABLE_UPLEVEL_OPTION = "single_sstable_uplevel"; public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10; @VisibleForTesting final LeveledManifest manifest; private final int maxSSTableSizeInMB; private final int levelFanoutSize; + private final boolean singleSSTableUplevel; public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { super(cfs, options); int configuredMaxSSTableSize = 160; int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE; + boolean configuredSingleSSTableUplevel = false; SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options); if (options != null) { @@ -82,9 +86,15 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy { configuredLevelFanoutSize = Integer.parseInt(options.get(LEVEL_FANOUT_SIZE_OPTION)); } + + if (options.containsKey(SINGLE_SSTABLE_UPLEVEL_OPTION)) + { + configuredSingleSSTableUplevel = Boolean.parseBoolean(options.get(SINGLE_SSTABLE_UPLEVEL_OPTION)); + } } maxSSTableSizeInMB = configuredMaxSSTableSize; levelFanoutSize = configuredLevelFanoutSize; + singleSSTableUplevel = configuredSingleSSTableUplevel; manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, this.levelFanoutSize, localOptions); logger.trace("Created {}", manifest); @@ -151,7 +161,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION); if (txn != null) { - LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false); + AbstractCompactionTask newTask; + if (!singleSSTableUplevel || op == OperationType.TOMBSTONE_COMPACTION || txn.originals().size() > 1) + newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false); + else + newTask = new SingleSSTableLCSTask(cfs, txn, candidate.level); + newTask.setCompactionType(op); return newTask; } @@ -333,6 +348,13 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } @Override + public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable) + { + if (sstable.getSSTableLevel() != oldMetadata.sstableLevel) + manifest.newLevel(sstable, oldMetadata.sstableLevel); + } + + @Override public void addSSTable(SSTableReader added) { manifest.add(added); @@ -569,6 +591,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION); + uncheckedOptions.remove(SINGLE_SSTABLE_UPLEVEL_OPTION); + + uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); + uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 291973f..c01dd69 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -49,11 +49,6 @@ public class LeveledManifest private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class); /** - * if we have more than MAX_COMPACTING_L0 sstables in L0, we will run a round of STCS with at most - * cfs.getMaxCompactionThreshold() sstables. - */ - private static final int MAX_COMPACTING_L0 = 32; - /** * If we go this many rounds without compacting * in the highest level, we start bringing in sstables from * that level into lower level compactions @@ -420,7 +415,7 @@ public class LeveledManifest private CompactionCandidate getSTCSInL0CompactionCandidate() { - if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0) + if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold()) { List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0)); if (!mostInteresting.isEmpty()) @@ -818,7 +813,7 @@ public class LeveledManifest tasks += estimated[i]; } - if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0) + if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold()) { int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold(); tasks += l0compactions; @@ -864,6 +859,14 @@ public class LeveledManifest return sstables; } + public synchronized void newLevel(SSTableReader sstable, int oldLevel) + { + boolean removed = generations[oldLevel].remove(sstable); + assert removed : "Could not remove " + sstable +" from " + oldLevel; + add(sstable); + lastCompactedKeys[oldLevel] = sstable.last; + } + public static class CompactionCandidate { public final Collection<SSTableReader> sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java new file mode 100644 index 0000000..3522d61 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; + +/** + * Special compaction task that does not do any compaction, instead it + * just mutates the level metadata on the sstable and notifies the compaction + * strategy. + */ +public class SingleSSTableLCSTask extends AbstractCompactionTask +{ + private static final Logger logger = LoggerFactory.getLogger(SingleSSTableLCSTask.class); + + private final int level; + + public SingleSSTableLCSTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level) + { + super(cfs, txn); + assert txn.originals().size() == 1; + this.level = level; + } + + @Override + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + { + throw new UnsupportedOperationException("This method should never be called on SingleSSTableLCSTask"); + } + + @Override + protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector) + { + run(); + return 1; + } + + @Override + protected void runMayThrow() + { + SSTableReader sstable = transaction.onlyOne(); + StatsMetadata metadataBefore = sstable.getSSTableMetadata(); + if (level == metadataBefore.sstableLevel) + { + logger.info("Not compacting {}, level is already {}", sstable, level); + } + else + { + try + { + logger.info("Changing level on {} from {} to {}", sstable, metadataBefore.sstableLevel, level); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, level); + sstable.reloadSSTableMetadata(); + } + catch (Throwable t) + { + transaction.abort(); + throw new CorruptSSTableException(t, sstable.descriptor.filenameFor(Component.DATA)); + } + cfs.getTracker().notifySSTableMetadataChanged(sstable, metadataBefore); + } + finishTransaction(sstable); + } + + private void finishTransaction(SSTableReader sstable) + { + // we simply cancel the transaction since no sstables are added or removed - we just + // write a new sstable metadata above and then atomically move the new file on top of the old + transaction.cancel(sstable); + transaction.prepareToCommit(); + transaction.commit(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index c578da2..3ae6eaf 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.notifications.*; @@ -446,6 +447,14 @@ public class Tracker subscriber.handleNotification(notification, this); } + public void notifySSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata) + { + INotification notification = new SSTableMetadataChanged(levelChanged, oldMetadata); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + + } + public void notifyDeleting(SSTableReader deleting) { INotification notification = new SSTableDeletingNotification(deleting); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java new file mode 100644 index 0000000..83cfe60 --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.notifications; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; + +public class SSTableMetadataChanged implements INotification +{ + public final SSTableReader sstable; + public final StatsMetadata oldMetadata; + + public SSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata) + { + this.sstable = levelChanged; + this.oldMetadata = oldMetadata; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java new file mode 100644 index 0000000..6428ab7 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SingleSSTableLCSTaskTest extends CQLTester +{ + @Test + public void basicTest() throws Throwable + { + createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + execute("insert into %s (id, t) values (1, 'meep')"); + cfs.forceBlockingFlush(); + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION)) + { + if (txn != null) + { + SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2); + task.executeInternal(null); + } + } + assertEquals(1, cfs.getLiveSSTables().size()); + cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel())); + // make sure compaction strategy is notified: + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next(); + for (int i = 0; i < lcs.manifest.getLevelCount(); i++) + { + if (i == 2) + assertEquals(1, lcs.getLevelSize(i)); + else + assertEquals(0, lcs.getLevelSize(i)); + } + assertTrue(cfs.getTracker().getCompacting().isEmpty()); + } + + @Test + public void compactionTest() throws Throwable + { + compactionTestHelper(true); + } + + @Test + public void uplevelDisabledTest() throws Throwable + { + compactionTestHelper(false); + } + + private void compactionTestHelper(boolean singleSSTUplevel) throws Throwable + { + createTable("create table %s (id int, id2 int, t blob, primary key (id, id2))" + + "with compaction = {'class':'LeveledCompactionStrategy', 'single_sstable_uplevel':" + singleSSTUplevel + ", 'sstable_size_in_mb':'1', 'max_threshold':'1000'}"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + byte[] b = new byte[10 * 1024]; + new Random().nextBytes(b); + ByteBuffer value = ByteBuffer.wrap(b); + for (int i = 0; i < 5000; i++) + { + for (int j = 0; j < 10; j++) + { + execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value); + } + if (i % 100 == 0) + cfs.forceBlockingFlush(); + } + // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables: + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0); + AbstractCompactionTask act = lcs.getNextBackgroundTask(0); + act.execute(null); + + // now all sstables are laid out non-overlapping in L1, this means that the rest of the compactions + // will be single sstable ones, make sure that we use SingleSSTableLCSTask if singleSSTUplevel is true: + while (lcs.getEstimatedRemainingTasks() > 0) + { + act = lcs.getNextBackgroundTask(0); + assertEquals(singleSSTUplevel, act instanceof SingleSSTableLCSTask); + act.execute(null); + } + assertEquals(0, lcs.getLevelSize(0)); + int l1size = lcs.getLevelSize(1); + // this should be 10, but it might vary a bit depending on partition sizes etc + assertTrue(l1size >= 8 && l1size <= 12); + assertTrue(lcs.getLevelSize(2) > 0); + } + + @Test + public void corruptMetadataTest() throws Throwable + { + createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + execute("insert into %s (id, t) values (1, 'meep')"); + cfs.forceBlockingFlush(); + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + + String filenameToCorrupt = sstable.descriptor.filenameFor(Component.STATS); + RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw"); + file.seek(0); + file.writeBytes(StringUtils.repeat('z', 2)); + file.close(); + boolean gotException = false; + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION)) + { + if (txn != null) + { + SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2); + task.executeInternal(null); + } + } + catch (Throwable t) + { + gotException = true; + } + assertTrue(gotException); + assertEquals(1, cfs.getLiveSSTables().size()); + for (SSTableReader sst : cfs.getLiveSSTables()) + assertEquals(0, sst.getSSTableMetadata().sstableLevel); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next(); + assertEquals(1, lcs.getLevelSize(0)); + assertTrue(cfs.getTracker().getCompacting().isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index bee322c..2891126 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -359,6 +359,10 @@ public class TrackerTest tracker.notifyRenewed(memtable); Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed); listener.received.clear(); + tracker.notifySSTableMetadataChanged(r1, r1.getSSTableMetadata()); + Assert.assertEquals(((SSTableMetadataChanged)listener.received.get(0)).sstable, r1); + Assert.assertEquals(r1.getSSTableMetadata(), ((SSTableMetadataChanged)listener.received.get(0)).oldMetadata); + listener.received.clear(); tracker.unsubscribe(listener); MockListener failListener = new MockListener(true); tracker.subscribe(failListener); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 13d3eac..fcc9191 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -172,8 +172,35 @@ public class LegacySSTableTest { for (SSTableReader sstable : cfs.getLiveSSTables()) { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, UUID.randomUUID()); + UUID random = UUID.randomUUID(); + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, random); sstable.reloadSSTableMetadata(); + assertEquals(1234, sstable.getRepairedAt()); + if (sstable.descriptor.version.hasPendingRepair()) + assertEquals(random, sstable.getPendingRepair()); + } + } + } + } + + @Test + public void testMutateLevel() throws Exception + { + // we need to make sure we write old version metadata in the format for that version + for (String legacyVersion : legacyVersions) + { + logger.info("Loading legacy version: {}", legacyVersion); + truncateLegacyTables(legacyVersion); + loadLegacyTables(legacyVersion); + CacheService.instance.invalidateKeyCache(); + + for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores()) + { + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1234); + sstable.reloadSSTableMetadata(); + assertEquals(1234, sstable.getSSTableLevel()); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org