http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java new file mode 100644 index 0000000..d53a830 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java @@ -0,0 +1,158 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.lifecycle; + +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.Util; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigTableReader; +import org.apache.cassandra.utils.concurrent.Refs; + +public class HelpersTest +{ + + static Set<Integer> a = set(1, 2, 3); + static Set<Integer> b = set(4, 5, 6); + static Set<Integer> c = set(7, 8, 9); + static Set<Integer> abc = set(1, 2, 3, 4, 5, 6, 7, 8, 9); + + // this also tests orIn + @Test + public void testFilterIn() + { + check(Helpers.filterIn(abc, a), a); + check(Helpers.filterIn(abc, a, c), set(1, 2, 3, 7, 8, 9)); + check(Helpers.filterIn(a, c), set()); + } + + // this also tests notIn + @Test + public void testFilterOut() + { + check(Helpers.filterOut(abc, a), set(4, 5, 6, 7, 8, 9)); + check(Helpers.filterOut(abc, b), set(1, 2, 3, 7, 8, 9)); + check(Helpers.filterOut(a, a), set()); + } + + @Test + public void testConcatUniq() + { + check(Helpers.concatUniq(a, b, a, c, b, a), abc); + } + + @Test + public void testIdentityMap() + { + Integer one = new Integer(1); + Integer two = new Integer(2); + Integer three = new Integer(3); + Map<Integer, Integer> identity = Helpers.identityMap(set(one, two, three)); + Assert.assertEquals(3, identity.size()); + Assert.assertSame(one, identity.get(1)); + Assert.assertSame(two, identity.get(2)); + Assert.assertSame(three, identity.get(3)); + } + + @Test + public void testReplace() + { + boolean failure; + failure = false; + try + { + Helpers.replace(abc, a, c); + } + catch (AssertionError e) + { + failure = true; + } + Assert.assertTrue(failure); + + failure = false; + try + { + Helpers.replace(a, abc, c); + } + catch (AssertionError e) + { + failure = true; + } + Assert.assertTrue(failure); + + failure = false; + try + { + Map<Integer, Integer> notIdentity = ImmutableMap.of(1, new Integer(1), 2, 2, 3, 3); + Helpers.replace(notIdentity, a, b); + } + catch (AssertionError e) + { + failure = true; + } + Assert.assertTrue(failure); + + // check it actually works when correct values provided + check(Helpers.replace(a, a, b), b); + } + + private static Set<Integer> set(Integer ... contents) + { + return ImmutableSet.copyOf(contents); + } + + private static void check(Iterable<Integer> check, Set<Integer> expected) + { + Assert.assertEquals(expected, ImmutableSet.copyOf(check)); + } + + @Test + public void testSetupDeletionNotification() + { + Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2)); + Throwable accumulate = Helpers.setReplaced(readers, null); + Assert.assertNull(accumulate); + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isReplaced()); + accumulate = Helpers.setReplaced(readers, null); + Assert.assertNotNull(accumulate); + } + + @Test + public void testMarkObsolete() + { + Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1), MockSchema.sstable(2)); + Throwable accumulate = Helpers.markObsolete(readers, null); + Assert.assertNull(accumulate); + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isMarkedCompacted()); + accumulate = Helpers.markObsolete(readers, null); + Assert.assertNotNull(accumulate); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java new file mode 100644 index 0000000..3153ef1 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java @@ -0,0 +1,412 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.lifecycle; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; +import org.apache.cassandra.utils.concurrent.Transactional.AbstractTransactional.State; + +import static com.google.common.base.Predicates.in; +import static com.google.common.collect.ImmutableList.copyOf; +import static com.google.common.collect.ImmutableList.of; +import static com.google.common.collect.Iterables.all; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.size; +import static org.apache.cassandra.db.lifecycle.Helpers.idIn; +import static org.apache.cassandra.db.lifecycle.Helpers.orIn; +import static org.apache.cassandra.db.lifecycle.Helpers.select; + +public class LifecycleTransactionTest extends AbstractTransactionalTest +{ + private boolean incrementalBackups; + + @Before + public void disableIncrementalBackup() + { + incrementalBackups = DatabaseDescriptor.isIncrementalBackupsEnabled(); + DatabaseDescriptor.setIncrementalBackupsEnabled(false); + } + @After + public void restoreIncrementalBackup() + { + DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackups); + } + + @Test + public void testUpdates() // (including obsoletion) + { + Tracker tracker = new Tracker(null, false); + SSTableReader[] readers = readersArray(0, 3); + SSTableReader[] readers2 = readersArray(0, 4); + SSTableReader[] readers3 = readersArray(0, 4); + tracker.addInitialSSTables(copyOf(readers)); + LifecycleTransaction txn = tracker.tryModify(copyOf(readers), OperationType.UNKNOWN); + + txn.update(readers2[0], true); + txn.obsolete(readers[1]); + + Assert.assertTrue(txn.isObsolete(readers[1])); + Assert.assertFalse(txn.isObsolete(readers[0])); + + testBadUpdate(txn, readers2[0], true); // same reader && instances + testBadUpdate(txn, readers2[1], true); // staged obsolete; cannot update + testBadUpdate(txn, readers3[0], true); // same reader, diff instances + testBadUpdate(txn, readers2[2], false); // incorrectly declared original status + testBadUpdate(txn, readers2[3], true); // incorrectly declared original status + + testBadObsolete(txn, readers[1]); // staged obsolete; cannot obsolete again + testBadObsolete(txn, readers2[0]); // staged update; cannot obsolete + + txn.update(readers2[3], false); + + Assert.assertEquals(3, tracker.getView().compacting.size()); + txn.checkpoint(); + Assert.assertTrue(txn.isObsolete(readers[1])); + Assert.assertFalse(txn.isObsolete(readers[0])); + Assert.assertEquals(4, tracker.getView().compacting.size()); + Assert.assertEquals(3, tracker.getView().sstables.size()); + Assert.assertEquals(3, size(txn.current())); + Assert.assertTrue(all(of(readers2[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap))); + Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap))); + + testBadObsolete(txn, readers[1]); // logged obsolete; cannot obsolete again + testBadObsolete(txn, readers2[2]); // never seen instance; cannot obsolete + testBadObsolete(txn, readers2[3]); // non-original; cannot obsolete + testBadUpdate(txn, readers3[1], true); // logged obsolete; cannot update + testBadUpdate(txn, readers2[0], true); // same instance as logged update + + txn.update(readers3[0], true); // same reader as logged update, different instance + txn.checkpoint(); + + Assert.assertEquals(4, tracker.getView().compacting.size()); + Assert.assertEquals(3, tracker.getView().sstables.size()); + Assert.assertEquals(3, size(txn.current())); + Assert.assertTrue(all(of(readers3[0], readers[2], readers2[3]), idIn(tracker.getView().sstablesMap))); + Assert.assertTrue(all(txn.current(), idIn(tracker.getView().sstablesMap))); + + testBadObsolete(txn, readers2[0]); // not current version of sstable + + txn.obsoleteOriginals(); + txn.checkpoint(); + Assert.assertEquals(1, tracker.getView().sstables.size()); + txn.obsoleteOriginals(); // should be no-op + txn.checkpoint(); + Assert.assertEquals(1, tracker.getView().sstables.size()); + Assert.assertEquals(4, tracker.getView().compacting.size()); + } + + @Test + public void testCancellation() + { + Tracker tracker = new Tracker(null, false); + List<SSTableReader> readers = readers(0, 3); + tracker.addInitialSSTables(readers); + LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN); + + SSTableReader cancel = readers.get(0); + SSTableReader update = readers(1, 2).get(0); + SSTableReader fresh = readers(3, 4).get(0); + SSTableReader notPresent = readers(4, 5).get(0); + + txn.cancel(cancel); + txn.update(update, true); + txn.update(fresh, false); + + testBadCancel(txn, cancel); + testBadCancel(txn, update); + testBadCancel(txn, fresh); + testBadCancel(txn, notPresent); + Assert.assertEquals(2, txn.originals().size()); + Assert.assertEquals(2, tracker.getView().compacting.size()); + Assert.assertTrue(all(readers.subList(1, 3), idIn(tracker.getView().compacting))); + + txn.checkpoint(); + + testBadCancel(txn, cancel); + testBadCancel(txn, update); + testBadCancel(txn, fresh); + testBadCancel(txn, notPresent); + Assert.assertEquals(2, txn.originals().size()); + Assert.assertEquals(3, tracker.getView().compacting.size()); + Assert.assertEquals(3, size(txn.current())); + Assert.assertTrue(all(concat(readers.subList(1, 3), of(fresh)), idIn(tracker.getView().compacting))); + + txn.cancel(readers.get(2)); + Assert.assertEquals(1, txn.originals().size()); + Assert.assertEquals(2, tracker.getView().compacting.size()); + Assert.assertEquals(2, size(txn.current())); + Assert.assertTrue(all(of(readers.get(1), fresh), idIn(tracker.getView().compacting))); + } + + @Test + public void testSplit() + { + Tracker tracker = new Tracker(null, false); + List<SSTableReader> readers = readers(0, 3); + tracker.addInitialSSTables(readers); + LifecycleTransaction txn = tracker.tryModify(readers, OperationType.UNKNOWN); + LifecycleTransaction txn2 = txn.split(readers.subList(0, 1)); + Assert.assertEquals(2, txn.originals().size()); + Assert.assertTrue(all(readers.subList(1, 3), in(txn.originals()))); + Assert.assertEquals(1, txn2.originals().size()); + Assert.assertTrue(all(readers.subList(0, 1), in(txn2.originals()))); + txn.update(readers(1, 2).get(0), true); + boolean failed = false; + try + { + txn.split(readers.subList(2, 3)); + } + catch (Throwable t) + { + failed = true; + } + Assert.assertTrue(failed); + } + + private static void testBadUpdate(LifecycleTransaction txn, SSTableReader update, boolean original) + { + boolean failed = false; + try + { + txn.update(update, original); + } + catch (Throwable t) + { + failed = true; + } + Assert.assertTrue(failed); + } + + private static void testBadObsolete(LifecycleTransaction txn, SSTableReader update) + { + boolean failed = false; + try + { + txn.obsolete(update); + } + catch (Throwable t) + { + failed = true; + } + Assert.assertTrue(failed); + } + + private static void testBadCancel(LifecycleTransaction txn, SSTableReader cancel) + { + boolean failed = false; + try + { + txn.cancel(cancel); + } + catch (Throwable t) + { + failed = true; + } + Assert.assertTrue(failed); + } + + protected TestableTransaction newTest() + { + SSTableDeletingTask.waitForDeletions(); + SSTableReader.resetTidying(); + return new TxnTest(); + } + + private static final class TxnTest extends TestableTransaction + { + final List<SSTableReader> originals; + final List<SSTableReader> untouchedOriginals; + final List<SSTableReader> loggedUpdate; + final List<SSTableReader> loggedObsolete; + final List<SSTableReader> stagedObsolete; + final List<SSTableReader> loggedNew; + final List<SSTableReader> stagedNew; + final Tracker tracker; + final LifecycleTransaction txn; + + private static Tracker tracker(List<SSTableReader> readers) + { + Tracker tracker = new Tracker(MockSchema.cfs, false); + tracker.addInitialSSTables(readers); + return tracker; + } + + private TxnTest() + { + this(readers(0, 8)); + } + + private TxnTest(List<SSTableReader> readers) + { + this(tracker(readers), readers); + } + + private TxnTest(Tracker tracker, List<SSTableReader> readers) + { + this(tracker, readers, tracker.tryModify(readers, OperationType.UNKNOWN)); + } + + private TxnTest(Tracker tracker, List<SSTableReader> readers, LifecycleTransaction txn) + { + super(txn); + this.tracker = tracker; + this.originals = readers; + this.txn = txn; + update(txn, loggedUpdate = readers(0, 2), true); + obsolete(txn, loggedObsolete = readers.subList(2, 4)); + update(txn, loggedNew = readers(8, 10), false); + txn.checkpoint(); + update(txn, stagedNew = readers(10, 12), false); + obsolete(txn, stagedObsolete = copyOf(concat(loggedUpdate, originals.subList(4, 6)))); + untouchedOriginals = originals.subList(6, 8); + } + + private ReaderState state(SSTableReader reader, State state) + { + SSTableReader original = select(reader, originals); + boolean isOriginal = original != null; + + switch (state) + { + case ABORTED: + { + return new ReaderState(Action.NONE, Action.NONE, original, original, isOriginal); + } + + case READY_TO_COMMIT: + { + ReaderState prev = state(reader, State.IN_PROGRESS); + Action logged; + SSTableReader visible; + if (prev.staged == Action.NONE) + { + logged = prev.logged; + visible = prev.currentlyVisible; + } + else + { + logged = prev.staged; + visible = prev.nextVisible; + } + return new ReaderState(logged, Action.NONE, visible, visible, isOriginal); + } + + case IN_PROGRESS: + { + Action logged = Action.get(loggedUpdate.contains(reader) || loggedNew.contains(reader), loggedObsolete.contains(reader)); + Action staged = Action.get(stagedNew.contains(reader), stagedObsolete.contains(reader)); + SSTableReader currentlyVisible = ReaderState.visible(reader, in(loggedObsolete), loggedNew, loggedUpdate, originals); + SSTableReader nextVisible = ReaderState.visible(reader, orIn(stagedObsolete, loggedObsolete), stagedNew, loggedNew, loggedUpdate, originals); + return new ReaderState(logged, staged, currentlyVisible, nextVisible, isOriginal); + } + } + throw new IllegalStateException(); + } + + private List<Pair<SSTableReader, ReaderState>> states(State state) + { + List<Pair<SSTableReader, ReaderState>> result = new ArrayList<>(); + for (SSTableReader reader : concat(originals, loggedNew, stagedNew)) + result.add(Pair.create(reader, state(reader, state))); + return result; + } + + protected void doAssert(State state) + { + for (Pair<SSTableReader, ReaderState> pair : states(state)) + { + SSTableReader reader = pair.left; + ReaderState readerState = pair.right; + + Assert.assertEquals(readerState, txn.state(reader)); + Assert.assertEquals(readerState.currentlyVisible, tracker.getView().sstablesMap.get(reader)); + if (readerState.currentlyVisible == null && readerState.nextVisible == null && !readerState.original) + Assert.assertTrue(reader.selfRef().globalCount() == 0); + } + } + + protected void assertInProgress() throws Exception + { + doAssert(State.IN_PROGRESS); + } + + protected void assertPrepared() throws Exception + { + doAssert(State.READY_TO_COMMIT); + } + + protected void assertAborted() throws Exception + { + doAssert(State.ABORTED); + Assert.assertEquals(0, tracker.getView().compacting.size()); + Assert.assertEquals(8, tracker.getView().sstables.size()); + for (SSTableReader reader : concat(loggedNew, stagedNew)) + Assert.assertTrue(reader.selfRef().globalCount() == 0); + } + + protected void assertCommitted() throws Exception + { + doAssert(State.READY_TO_COMMIT); + Assert.assertEquals(0, tracker.getView().compacting.size()); + Assert.assertEquals(6, tracker.getView().sstables.size()); + for (SSTableReader reader : concat(loggedObsolete, stagedObsolete)) + Assert.assertTrue(reader.selfRef().globalCount() == 0); + } + } + + private static SSTableReader[] readersArray(int lb, int ub) + { + return readers(lb, ub).toArray(new SSTableReader[0]); + } + + private static List<SSTableReader> readers(int lb, int ub) + { + List<SSTableReader> readers = new ArrayList<>(); + for (int i = lb ; i < ub ; i++) + readers.add(MockSchema.sstable(i, i, true)); + return copyOf(readers); + } + + private static void update(LifecycleTransaction txn, Iterable<SSTableReader> readers, boolean originals) + { + for (SSTableReader reader : readers) + txn.update(reader, originals); + } + + private static void obsolete(LifecycleTransaction txn, Iterable<SSTableReader> readers) + { + for (SSTableReader reader : readers) + txn.obsolete(reader); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java new file mode 100644 index 0000000..1eef7b0 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -0,0 +1,342 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.lifecycle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.*; +import org.apache.cassandra.utils.concurrent.OpOrder; + +import static com.google.common.collect.ImmutableSet.copyOf; +import static java.util.Collections.singleton; + +public class TrackerTest +{ + + private static final class MockListener implements INotificationConsumer + { + final boolean throwException; + final List<INotification> received = new ArrayList<>(); + final List<Object> senders = new ArrayList<>(); + + private MockListener(boolean throwException) + { + this.throwException = throwException; + } + + public void handleNotification(INotification notification, Object sender) + { + if (throwException) + throw new RuntimeException(); + received.add(notification); + senders.add(sender); + } + } + + @Test + public void testTryModify() + { + Tracker tracker = new Tracker(MockSchema.cfs, false); + List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0), MockSchema.sstable(1), MockSchema.sstable(2)); + tracker.addInitialSSTables(copyOf(readers)); + try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);) + { + Assert.assertNotNull(txn); + Assert.assertNull(tracker.tryModify(readers.get(0), OperationType.COMPACTION)); + Assert.assertEquals(1, txn.originals().size()); + Assert.assertTrue(txn.originals().contains(readers.get(0))); + } + try (LifecycleTransaction txn = tracker.tryModify(Collections.<SSTableReader>emptyList(), OperationType.COMPACTION);) + { + Assert.assertNotNull(txn); + Assert.assertEquals(0, txn.originals().size()); + } + } + + @Test + public void testApply() + { + final Tracker tracker = new Tracker(null, false); + final View resultView = ViewTest.fakeView(0, 0); + final AtomicInteger count = new AtomicInteger(); + tracker.apply(new Predicate<View>() + { + public boolean apply(View view) + { + // confound the CAS by swapping the view, and check we retry + if (count.incrementAndGet() < 3) + tracker.view.set(ViewTest.fakeView(0, 0)); + return true; + } + }, new Function<View, View>() + { + @Nullable + public View apply(View view) + { + return resultView; + } + }); + Assert.assertEquals(3, count.get()); + Assert.assertEquals(resultView, tracker.getView()); + + count.set(0); + // check that if the predicate returns false, we stop immediately and return null + Assert.assertNull(tracker.apply(new Predicate<View>() + { + public boolean apply(View view) + { + count.incrementAndGet(); + return false; + } + }, null)); + Assert.assertEquals(1, count.get()); + Assert.assertEquals(resultView, tracker.getView()); + } + + @Test + public void testAddInitialSSTables() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + Tracker tracker = new Tracker(cfs, false); + List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9)); + tracker.addInitialSSTables(copyOf(readers)); + + Assert.assertEquals(3, tracker.view.get().sstables.size()); + + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isDeleteNotificationSetup()); + + Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); + } + + @Test + public void testAddSSTables() + { + boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); + DatabaseDescriptor.setIncrementalBackupsEnabled(false); + Tracker tracker = new Tracker(MockSchema.cfs, false); + MockListener listener = new MockListener(false); + tracker.subscribe(listener); + List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17), MockSchema.sstable(1, 121), MockSchema.sstable(2, 9)); + tracker.addSSTables(copyOf(readers)); + + Assert.assertEquals(3, tracker.view.get().sstables.size()); + + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isDeleteNotificationSetup()); + + Assert.assertEquals(17 + 121 + 9, MockSchema.cfs.metric.liveDiskSpaceUsed.getCount()); + Assert.assertEquals(3, listener.senders.size()); + Assert.assertEquals(tracker, listener.senders.get(0)); + Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification); + DatabaseDescriptor.setIncrementalBackupsEnabled(backups); + } + + @Test + public void testDropSSTables() + { + testDropSSTables(false); + SSTableDeletingTask.waitForDeletions(); + testDropSSTables(true); + SSTableDeletingTask.waitForDeletions(); + } + + private void testDropSSTables(boolean invalidate) + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + Tracker tracker = cfs.getTracker(); + MockListener listener = new MockListener(false); + tracker.subscribe(listener); + final List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 9, true), MockSchema.sstable(1, 15, true), MockSchema.sstable(2, 71, true)); + tracker.addInitialSSTables(copyOf(readers)); + try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION);) + { + SSTableDeletingTask.pauseDeletions(true); + if (invalidate) + cfs.invalidate(false); + else + tracker.dropSSTables(); + Assert.assertEquals(95, cfs.metric.totalDiskSpaceUsed.getCount()); + Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount()); + Assert.assertEquals(1, tracker.getView().sstables.size()); + SSTableDeletingTask.pauseDeletions(false); + } + if (!invalidate) + { + Assert.assertEquals(1, tracker.getView().sstables.size()); + Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null)); + Assert.assertEquals(1, readers.get(0).selfRef().globalCount()); + Assert.assertFalse(readers.get(0).isMarkedCompacted()); + for (SSTableReader reader : readers.subList(1, 3)) + { + Assert.assertEquals(0, reader.selfRef().globalCount()); + Assert.assertTrue(reader.isMarkedCompacted()); + } + Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader reader) + { + return reader != readers.get(0); + } + }, OperationType.UNKNOWN, null)); + + Assert.assertEquals(1, tracker.getView().sstables.size()); + Assert.assertEquals(1, listener.received.size()); + Assert.assertEquals(tracker, listener.senders.get(0)); + Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size()); + Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size()); + Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount()); + readers.get(0).selfRef().release(); + } + else + { + Assert.assertEquals(0, tracker.getView().sstables.size()); + Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isMarkedCompacted()); + } + } + + @Test + public void testMemtableReplacement() + { + boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); + DatabaseDescriptor.setIncrementalBackupsEnabled(false); + ColumnFamilyStore cfs = MockSchema.newCFS(); + MockListener listener = new MockListener(false); + Tracker tracker = cfs.getTracker(); + tracker.subscribe(listener); + + Memtable prev1 = tracker.switchMemtable(true); + OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent(); + OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier(); + prev1.setDiscarding(barrier1, new AtomicReference<ReplayPosition>()); + barrier1.issue(); + Memtable prev2 = tracker.switchMemtable(false); + OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent(); + OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier(); + prev2.setDiscarding(barrier2, new AtomicReference<ReplayPosition>()); + barrier2.issue(); + Memtable cur = tracker.getView().getCurrentMemtable(); + OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent(); + Assert.assertEquals(prev1, tracker.getMemtableFor(write1, ReplayPosition.NONE)); + Assert.assertEquals(prev2, tracker.getMemtableFor(write2, ReplayPosition.NONE)); + Assert.assertEquals(cur, tracker.getMemtableFor(writecur, ReplayPosition.NONE)); + Assert.assertEquals(1, listener.received.size()); + Assert.assertTrue(listener.received.get(0) instanceof MemtableRenewedNotification); + listener.received.clear(); + + tracker.markFlushing(prev2); + Assert.assertEquals(1, tracker.getView().flushingMemtables.size()); + Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2)); + + tracker.markFlushing(prev1); + Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev1)); + Assert.assertEquals(2, tracker.getView().flushingMemtables.size()); + + tracker.replaceFlushed(prev1, null); + Assert.assertEquals(1, tracker.getView().flushingMemtables.size()); + Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2)); + + SSTableReader reader = MockSchema.sstable(0, 10, false, cfs); + tracker.replaceFlushed(prev2, reader); + Assert.assertEquals(1, tracker.getView().sstables.size()); + Assert.assertEquals(1, listener.received.size()); + Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); + listener.received.clear(); + Assert.assertTrue(reader.isDeleteNotificationSetup()); + Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); + + // test invalidated CFS + cfs = MockSchema.newCFS(); + tracker = cfs.getTracker(); + listener = new MockListener(false); + tracker.subscribe(listener); + prev1 = tracker.switchMemtable(false); + tracker.markFlushing(prev1); + reader = MockSchema.sstable(0, 10, true, cfs); + cfs.invalidate(false); + tracker.replaceFlushed(prev1, reader); + Assert.assertEquals(0, tracker.getView().sstables.size()); + Assert.assertEquals(0, tracker.getView().flushingMemtables.size()); + Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size()); + DatabaseDescriptor.setIncrementalBackupsEnabled(backups); + } + + @Test + public void testNotifications() + { + SSTableReader r1 = MockSchema.sstable(0), r2 = MockSchema.sstable(1); + Tracker tracker = new Tracker(null, false); + MockListener listener = new MockListener(false); + tracker.subscribe(listener); + tracker.notifyAdded(r1); + Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added); + listener.received.clear(); + tracker.notifyDeleting(r1); + Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting); + listener.received.clear(); + Assert.assertNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null)); + Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed); + Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added); + listener.received.clear(); + tracker.notifySSTableRepairedStatusChanged(singleton(r1)); + Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable); + listener.received.clear(); + Memtable memtable = MockSchema.memtable(); + tracker.notifyRenewed(memtable); + Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed); + listener.received.clear(); + tracker.unsubscribe(listener); + MockListener failListener = new MockListener(true); + tracker.subscribe(failListener); + tracker.subscribe(listener); + Assert.assertNotNull(tracker.notifyAdded(r1, null)); + Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added); + listener.received.clear(); + Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null)); + Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed); + Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added); + listener.received.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java new file mode 100644 index 0000000..811e025 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java @@ -0,0 +1,202 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.lifecycle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +import static com.google.common.collect.ImmutableSet.copyOf; +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Iterables.concat; +import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; + +public class ViewTest +{ + + @Test + public void testSSTablesInBounds() + { + View initialView = fakeView(0, 5); + for (int i = 0 ; i < 5 ; i++) + { + for (int j = i ; j < 5 ; j++) + { + RowPosition min = MockSchema.readerBounds(i); + RowPosition max = MockSchema.readerBounds(j); + for (boolean minInc : new boolean[] { true, false} ) + { + for (boolean maxInc : new boolean[] { true, false} ) + { + if (i == j && !(minInc && maxInc)) + continue; + List<SSTableReader> r = initialView.sstablesInBounds(AbstractBounds.bounds(min, minInc, max, maxInc)); + Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size()); + } + } + } + } + } + + @Test + public void testCompaction() + { + View initialView = fakeView(0, 5); + View cur = initialView; + List<SSTableReader> readers = ImmutableList.copyOf(initialView.sstables); + Assert.assertTrue(View.permitCompacting(readers).apply(cur)); + // check we permit compacting duplicates in the predicate, so we don't spin infinitely if there is a screw up + Assert.assertTrue(View.permitCompacting(ImmutableList.copyOf(concat(readers, readers))).apply(cur)); + // check we fail in the application in the presence of duplicates + testFailure(View.updateCompacting(emptySet(), concat(readers.subList(0, 1), readers.subList(0, 1))), cur); + + // do lots of trivial checks that the compacting set and related methods behave properly for a simple update + cur = View.updateCompacting(emptySet(), readers.subList(0, 2)).apply(cur); + Assert.assertTrue(View.permitCompacting(readers.subList(2, 5)).apply(cur)); + Assert.assertFalse(View.permitCompacting(readers.subList(0, 2)).apply(cur)); + Assert.assertFalse(View.permitCompacting(readers.subList(0, 1)).apply(cur)); + Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur)); + Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers)))); + Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size()); + Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5))); + Assert.assertEquals(3, cur.nonCompactingSStables().size()); + + // check marking already compacting readers fails with an exception + testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur); + testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur); + testFailure(View.updateCompacting(copyOf(readers.subList(0, 1)), readers.subList(1, 2)), cur); + + // make equivalents of readers.subList(0, 3) that are different instances + SSTableReader r0 = MockSchema.sstable(0), r1 = MockSchema.sstable(1), r2 = MockSchema.sstable(2); + // attempt to mark compacting a version not in the live set + testFailure(View.updateCompacting(emptySet(), of(r2)), cur); + // update one compacting, one non-compacting, of the liveset to another instance of the same readers; + // confirm liveset changes but compacting does not + cur = View.updateLiveSet(copyOf(readers.subList(1, 3)), of(r1, r2)).apply(cur); + Assert.assertSame(readers.get(0), cur.sstablesMap.get(r0)); + Assert.assertSame(r1, cur.sstablesMap.get(r1)); + Assert.assertSame(r2, cur.sstablesMap.get(r2)); + testFailure(View.updateCompacting(emptySet(), readers.subList(2, 3)), cur); + Assert.assertSame(readers.get(1), Iterables.getFirst(Iterables.filter(cur.compacting, Predicates.equalTo(r1)), null)); + + // unmark compacting, and check our methods are all correctly updated + cur = View.updateCompacting(copyOf(readers.subList(0, 1)), emptySet()).apply(cur); + Assert.assertTrue(View.permitCompacting(concat(readers.subList(0, 1), of(r2), readers.subList(3, 5))).apply(cur)); + Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur)); + testFailure(View.updateCompacting(emptySet(), readers.subList(1, 2)), cur); + testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur); + Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers)))); + Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size()); + Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(2, 5))); + Assert.assertTrue(cur.nonCompactingSStables().containsAll(readers.subList(0, 1))); + Assert.assertEquals(4, cur.nonCompactingSStables().size()); + } + + private static void testFailure(Function<View, ?> function, View view) + { + boolean failed = true; + try + { + function.apply(view); + failed = false; + } + catch (Throwable t) + { + } + Assert.assertTrue(failed); + } + + @Test + public void testFlushing() + { + View initialView = fakeView(1, 0); + View cur = initialView; + Memtable memtable1 = initialView.getCurrentMemtable(); + Memtable memtable2 = MockSchema.memtable(); + + cur = View.switchMemtable(memtable2).apply(cur); + Assert.assertEquals(2, cur.liveMemtables.size()); + Assert.assertEquals(memtable1, cur.liveMemtables.get(0)); + Assert.assertEquals(memtable2, cur.getCurrentMemtable()); + + Memtable memtable3 = MockSchema.memtable(); + cur = View.switchMemtable(memtable3).apply(cur); + Assert.assertEquals(3, cur.liveMemtables.size()); + Assert.assertEquals(0, cur.flushingMemtables.size()); + Assert.assertEquals(memtable1, cur.liveMemtables.get(0)); + Assert.assertEquals(memtable2, cur.liveMemtables.get(1)); + Assert.assertEquals(memtable3, cur.getCurrentMemtable()); + + testFailure(View.replaceFlushed(memtable2, null), cur); + + cur = View.markFlushing(memtable2).apply(cur); + Assert.assertTrue(cur.flushingMemtables.contains(memtable2)); + Assert.assertEquals(2, cur.liveMemtables.size()); + Assert.assertEquals(1, cur.flushingMemtables.size()); + Assert.assertEquals(memtable2, cur.flushingMemtables.get(0)); + Assert.assertEquals(memtable1, cur.liveMemtables.get(0)); + Assert.assertEquals(memtable3, cur.getCurrentMemtable()); + + cur = View.markFlushing(memtable1).apply(cur); + Assert.assertEquals(1, cur.liveMemtables.size()); + Assert.assertEquals(2, cur.flushingMemtables.size()); + Assert.assertEquals(memtable1, cur.flushingMemtables.get(0)); + Assert.assertEquals(memtable2, cur.flushingMemtables.get(1)); + Assert.assertEquals(memtable3, cur.getCurrentMemtable()); + + cur = View.replaceFlushed(memtable2, null).apply(cur); + Assert.assertEquals(1, cur.liveMemtables.size()); + Assert.assertEquals(1, cur.flushingMemtables.size()); + Assert.assertEquals(memtable1, cur.flushingMemtables.get(0)); + Assert.assertEquals(memtable3, cur.getCurrentMemtable()); + + SSTableReader sstable = MockSchema.sstable(1); + cur = View.replaceFlushed(memtable1, sstable).apply(cur); + Assert.assertEquals(0, cur.flushingMemtables.size()); + Assert.assertEquals(1, cur.liveMemtables.size()); + Assert.assertEquals(memtable3, cur.getCurrentMemtable()); + Assert.assertEquals(1, cur.sstables.size()); + Assert.assertEquals(sstable, cur.sstablesMap.get(sstable)); + } + + static View fakeView(int memtableCount, int sstableCount) + { + List<Memtable> memtables = new ArrayList<>(); + List<SSTableReader> sstables = new ArrayList<>(); + for (int i = 0 ; i < memtableCount ; i++) + memtables.add(MockSchema.memtable()); + for (int i = 0 ; i < sstableCount ; i++) + sstables.add(MockSchema.sstable(i)); + return new View(ImmutableList.copyOf(memtables), Collections.<Memtable>emptyList(), Helpers.identityMap(sstables), + Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index eac6094..518d80e 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -41,13 +41,15 @@ import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.metrics.RestorableMeter; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; +import static com.google.common.collect.ImmutableMap.of; +import static java.util.Arrays.asList; import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL; import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD; import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD; @@ -121,12 +123,15 @@ public class IndexSummaryManagerTest return total; } - private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException + private static List<SSTableReader> resetSummaries(ColumnFamilyStore cfs, List<SSTableReader> sstables, long originalOffHeapSize) throws IOException { for (SSTableReader sstable : sstables) sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size()); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size()); + } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); @@ -230,7 +235,11 @@ public class IndexSummaryManagerTest cfs.metadata.minIndexInterval(originalMinIndexInterval / 2); SSTableReader sstable = cfs.getSSTables().iterator().next(); long summarySpace = sstable.getIndexSummaryOffHeapSize(); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), summarySpace); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace); + } + sstable = cfs.getSSTables().iterator().next(); assertEquals(originalMinIndexInterval, sstable.getEffectiveIndexInterval(), 0.001); assertEquals(numRows / originalMinIndexInterval, sstable.getIndexSummarySize()); @@ -238,7 +247,10 @@ public class IndexSummaryManagerTest // keep the min_index_interval the same, but now give the summary enough space to grow by 50% double previousInterval = sstable.getEffectiveIndexInterval(); int previousSize = sstable.getIndexSummarySize(); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace * 1.5)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5)); + } sstable = cfs.getSSTables().iterator().next(); assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1); assertEquals(previousInterval * (1.0 / 1.5), sstable.getEffectiveIndexInterval(), 0.001); @@ -246,7 +258,10 @@ public class IndexSummaryManagerTest // return min_index_interval to it's original value (double it), but only give the summary enough space // to have an effective index interval of twice the new min cfs.metadata.minIndexInterval(originalMinIndexInterval); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), (long) Math.ceil(summarySpace / 2.0)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0)); + } sstable = cfs.getSSTables().iterator().next(); assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001); assertEquals(numRows / (originalMinIndexInterval * 2), sstable.getIndexSummarySize()); @@ -256,7 +271,10 @@ public class IndexSummaryManagerTest // result in an effective interval above the new max) cfs.metadata.minIndexInterval(originalMinIndexInterval * 4); cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, Arrays.asList(sstable), 10); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + } sstable = cfs.getSSTables().iterator().next(); assertEquals(cfs.metadata.getMinIndexInterval(), sstable.getEffectiveIndexInterval(), 0.001); } @@ -276,14 +294,20 @@ public class IndexSummaryManagerTest for (SSTableReader sstable : sstables) sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0)); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + } sstables = new ArrayList<>(cfs.getSSTables()); for (SSTableReader sstable : sstables) assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01); // halve the max_index_interval cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() / 2); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1); + } sstables = new ArrayList<>(cfs.getSSTables()); for (SSTableReader sstable : sstables) { @@ -293,7 +317,10 @@ public class IndexSummaryManagerTest // return max_index_interval to its original value cfs.metadata.maxIndexInterval(cfs.metadata.getMaxIndexInterval() * 2); - IndexSummaryManager.redistributeSummaries(Collections.EMPTY_LIST, sstables, 1); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1); + } for (SSTableReader sstable : cfs.getSSTables()) { assertEquals(cfs.metadata.getMaxIndexInterval(), sstable.getEffectiveIndexInterval(), 0.01); @@ -321,7 +348,10 @@ public class IndexSummaryManagerTest long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize(); // there should be enough space to not downsample anything - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)); + } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables)); @@ -329,26 +359,38 @@ public class IndexSummaryManagerTest // everything should get cut in half assert sstables.size() == 4; - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2))); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))); + } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel()); validateData(cfs, numRows); // everything should get cut to a quarter - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4))); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4))); + } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel()); validateData(cfs, numRows); // upsample back up to half - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2) + 4)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4)); + } assert sstables.size() == 4; for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel()); validateData(cfs, numRows); // upsample back up to the original index summary - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)); + } for (SSTableReader sstable : sstables) assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel()); validateData(cfs, numRows); @@ -357,7 +399,10 @@ public class IndexSummaryManagerTest // so the two cold sstables should get downsampled to be half of their original size sstables.get(0).overrideReadMeter(new RestorableMeter(50.0, 50.0)); sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0)); - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)); + } Collections.sort(sstables, hotnessComparator); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel()); @@ -370,7 +415,10 @@ public class IndexSummaryManagerTest double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10)); sstables.get(0).overrideReadMeter(new RestorableMeter(lowerRate, lowerRate)); sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate)); - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3)); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)); + } Collections.sort(sstables, hotnessComparator); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel()); assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel()); @@ -379,13 +427,16 @@ public class IndexSummaryManagerTest validateData(cfs, numRows); // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled - sstables = resetSummaries(sstables, singleSummaryOffHeapSpace); + sstables = resetSummaries(cfs, sstables, singleSummaryOffHeapSpace); sstables.get(0).overrideReadMeter(new RestorableMeter(1.0, 1.0)); sstables.get(1).overrideReadMeter(new RestorableMeter(2.0, 2.0)); sstables.get(2).overrideReadMeter(new RestorableMeter(1000.0, 1000.0)); sstables.get(3).overrideReadMeter(new RestorableMeter(1000.0, 1000.0)); - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50); + } Collections.sort(sstables, hotnessComparator); if (sstables.get(0).getIndexSummarySamplingLevel() == minSamplingLevel) @@ -406,7 +457,10 @@ public class IndexSummaryManagerTest sstables.get(1).overrideReadMeter(new RestorableMeter(0.0, 0.0)); sstables.get(2).overrideReadMeter(new RestorableMeter(92, 92)); sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0)); - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL)))); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL)))); + } Collections.sort(sstables, hotnessComparator); assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level @@ -416,7 +470,10 @@ public class IndexSummaryManagerTest validateData(cfs, numRows); // Don't leave enough space for even the minimal index summaries - sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 10); + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN)) + { + sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10); + } for (SSTableReader sstable : sstables) assertEquals(1, sstable.getIndexSummarySize()); // at the min sampling level validateData(cfs, numRows); @@ -449,19 +506,19 @@ public class IndexSummaryManagerTest SSTableReader original = sstables.get(0); SSTableReader sstable = original; - for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++) + try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN)) { - SSTableReader prev = sstable; - sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel); - assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel()); - int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL); - assertEquals(expectedSize, sstable.getIndexSummarySize(), 1); - if (prev != original) - prev.selfRef().release(); + for (int samplingLevel = 1; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++) + { + sstable = sstable.cloneWithNewSummarySamplingLevel(cfs, samplingLevel); + assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel()); + int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getMinIndexInterval() * BASE_SAMPLING_LEVEL); + assertEquals(expectedSize, sstable.getIndexSummarySize(), 1); + txn.update(sstable, true); + txn.checkpoint(); + } + txn.finish(); } - - // don't leave replaced SSTRs around to break other tests - cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 82cb8d5..682d999 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -59,7 +59,9 @@ import org.apache.cassandra.db.Row; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.composites.Composites; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.LocalPartitioner.LocalToken; import org.apache.cassandra.dht.Range; @@ -498,8 +500,13 @@ public class SSTableReaderTest })); } - SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); - store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement)); + SSTableReader replacement; + try (LifecycleTransaction txn = store.getTracker().tryModify(Arrays.asList(sstable), OperationType.UNKNOWN)) + { + replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1); + txn.update(replacement, true); + txn.finish(); + } for (Future future : futures) future.get();