http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java new file mode 100644 index 0000000..8f53781 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.SSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableDeletingNotification; +import org.apache.cassandra.notifications.SSTableListChangedNotification; +import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Tests CompactionStrategyManager's handling of pending repair sstables + */ +public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingRepairTest +{ + + private static boolean strategiesContain(Collection<AbstractCompactionStrategy> strategies, SSTableReader sstable) + { + return Iterables.any(strategies, strategy -> strategy.getSSTables().contains(sstable)); + } + + private boolean pendingContains(UUID id, SSTableReader sstable) + { + return Iterables.any(csm.getPendingRepairManagers(), p -> p.get(id) != null && p.get(id).getSSTables().contains(sstable)); + } + + private boolean pendingContains(SSTableReader sstable) + { + return Iterables.any(csm.getPendingRepairManagers(), p -> strategiesContain(p.getStrategies(), sstable)); + } + + private boolean repairedContains(SSTableReader sstable) + { + return strategiesContain(csm.getRepaired(), sstable); + } + + private boolean unrepairedContains(SSTableReader sstable) + { + return strategiesContain(csm.getUnrepaired(), sstable); + } + + /** + * Pending repair strategy should be created when we encounter a new pending id + */ + @Test + public void sstableAdded() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + Assert.assertTrue(csm.pendingRepairs().isEmpty()); + + SSTableReader sstable = makeSSTable(true); + Assert.assertFalse(sstable.isRepaired()); + Assert.assertFalse(sstable.isPendingRepair()); + + mutateRepaired(sstable, repairID); + Assert.assertFalse(sstable.isRepaired()); + Assert.assertTrue(sstable.isPendingRepair()); + csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + + // add the sstable + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + Assert.assertFalse(repairedContains(sstable)); + Assert.assertFalse(unrepairedContains(sstable)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); + Assert.assertTrue(pendingContains(repairID, sstable)); + } + + @Test + public void sstableListChangedAddAndRemove() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + SSTableReader sstable1 = makeSSTable(true); + mutateRepaired(sstable1, repairID); + + SSTableReader sstable2 = makeSSTable(true); + mutateRepaired(sstable2, repairID); + + Assert.assertFalse(repairedContains(sstable1)); + Assert.assertFalse(unrepairedContains(sstable1)); + Assert.assertFalse(repairedContains(sstable2)); + Assert.assertFalse(unrepairedContains(sstable2)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + + // add only + SSTableListChangedNotification notification; + notification = new SSTableListChangedNotification(Collections.singleton(sstable1), + Collections.emptyList(), + OperationType.COMPACTION); + csm.handleNotification(notification, cfs.getTracker()); + + csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); + Assert.assertFalse(repairedContains(sstable1)); + Assert.assertFalse(unrepairedContains(sstable1)); + Assert.assertTrue(pendingContains(repairID, sstable1)); + Assert.assertFalse(repairedContains(sstable2)); + Assert.assertFalse(unrepairedContains(sstable2)); + Assert.assertFalse(pendingContains(repairID, sstable2)); + + // remove and add + notification = new SSTableListChangedNotification(Collections.singleton(sstable2), + Collections.singleton(sstable1), + OperationType.COMPACTION); + csm.handleNotification(notification, cfs.getTracker()); + + Assert.assertFalse(repairedContains(sstable1)); + Assert.assertFalse(unrepairedContains(sstable1)); + Assert.assertFalse(pendingContains(repairID, sstable1)); + Assert.assertFalse(repairedContains(sstable2)); + Assert.assertFalse(unrepairedContains(sstable2)); + Assert.assertTrue(pendingContains(repairID, sstable2)); + } + + @Test + public void sstableRepairStatusChanged() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + // add as unrepaired + SSTableReader sstable = makeSSTable(false); + Assert.assertTrue(unrepairedContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + + SSTableRepairStatusChanged notification; + + // change to pending repaired + mutateRepaired(sstable, repairID); + notification = new SSTableRepairStatusChanged(Collections.singleton(sstable)); + csm.handleNotification(notification, cfs.getTracker()); + Assert.assertFalse(unrepairedContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); + Assert.assertTrue(pendingContains(repairID, sstable)); + + // change to repaired + mutateRepaired(sstable, System.currentTimeMillis()); + notification = new SSTableRepairStatusChanged(Collections.singleton(sstable)); + csm.handleNotification(notification, cfs.getTracker()); + Assert.assertFalse(unrepairedContains(sstable)); + Assert.assertTrue(repairedContains(sstable)); + Assert.assertFalse(pendingContains(repairID, sstable)); + } + + @Test + public void sstableDeleted() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + Assert.assertTrue(pendingContains(repairID, sstable)); + + // delete sstable + SSTableDeletingNotification notification = new SSTableDeletingNotification(sstable); + csm.handleNotification(notification, cfs.getTracker()); + Assert.assertFalse(pendingContains(repairID, sstable)); + Assert.assertFalse(unrepairedContains(sstable)); + Assert.assertFalse(repairedContains(sstable)); + } + + /** + * CompactionStrategyManager.getStrategies should include + * pending repair strategies when appropriate + */ + @Test + public void getStrategies() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + List<List<AbstractCompactionStrategy>> strategies; + + strategies = csm.getStrategies(); + Assert.assertEquals(3, strategies.size()); + Assert.assertTrue(strategies.get(2).isEmpty()); + + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + + strategies = csm.getStrategies(); + Assert.assertEquals(3, strategies.size()); + Assert.assertFalse(strategies.get(2).isEmpty()); + } + + /** + * Tests that finalized repairs result in cleanup compaction tasks + * which reclassify the sstables as repaired + */ + @Test + public void cleanupCompactionFinalized() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + LocalSessionAccessor.finalizeUnsafe(repairID); + csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); + Assert.assertNotNull(pendingContains(repairID, sstable)); + Assert.assertTrue(sstable.isPendingRepair()); + Assert.assertFalse(sstable.isRepaired()); + + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + + // run the compaction + compactionTask.execute(null); + + Assert.assertTrue(repairedContains(sstable)); + Assert.assertFalse(unrepairedContains(sstable)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + + // sstable should have pendingRepair cleared, and repairedAt set correctly + long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).getRepairedAt(); + Assert.assertFalse(sstable.isPendingRepair()); + Assert.assertTrue(sstable.isRepaired()); + Assert.assertEquals(expectedRepairedAt, sstable.getSSTableMetadata().repairedAt); + } + + /** + * Tests that failed repairs result in cleanup compaction tasks + * which reclassify the sstables as unrepaired + */ + @Test + public void cleanupCompactionFailed() + { + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + LocalSessionAccessor.failUnsafe(repairID); + + csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); + Assert.assertNotNull(pendingContains(repairID, sstable)); + Assert.assertTrue(sstable.isPendingRepair()); + Assert.assertFalse(sstable.isRepaired()); + + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + + // run the compaction + compactionTask.execute(null); + + Assert.assertFalse(repairedContains(sstable)); + Assert.assertTrue(unrepairedContains(sstable)); + csm.getForPendingRepair(repairID).forEach(Assert::assertNull); + + // sstable should have pendingRepair cleared, and repairedAt set correctly + Assert.assertFalse(sstable.isPendingRepair()); + Assert.assertFalse(sstable.isRepaired()); + Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 9a8371e..ce65f4e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -189,7 +189,7 @@ public class LeveledCompactionStrategyTest Range<Token> range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), true); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, ActiveRepairService.UNREPAIRED_SSTABLE, true); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); @@ -350,7 +350,7 @@ public class LeveledCompactionStrategyTest SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0); SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0); - sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis()); + sstable1.descriptor.getMetadataSerializer().mutateRepaired(sstable1.descriptor, System.currentTimeMillis(), null); sstable1.reloadSSTableMetadata(); assertTrue(sstable1.isRepaired()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java new file mode 100644 index 0000000..1c2f02e --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.consistent.LocalSessionAccessor; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +public class PendingRepairManagerTest extends AbstractPendingRepairTest +{ + /** + * If a local session is ongoing, it should not be cleaned up + */ + @Test + public void needsCleanupInProgress() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + + Assert.assertFalse(prm.canCleanup(repairID)); + } + + /** + * If a local session is finalized, it should be cleaned up + */ + @Test + public void needsCleanupFinalized() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertTrue(prm.canCleanup(repairID)); + } + + /** + * If a local session has failed, it should be cleaned up + */ + @Test + public void needsCleanupFailed() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.failUnsafe(repairID); + + Assert.assertTrue(prm.canCleanup(repairID)); + } + + @Test + public void needsCleanupNoSession() + { + UUID fakeID = UUIDGen.getTimeUUID(); + PendingRepairManager prm = new PendingRepairManager(cfs, null); + Assert.assertTrue(prm.canCleanup(fakeID)); + } + + @Test + public void estimateRemainingTasksInProgress() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + + Assert.assertEquals(0, prm.getEstimatedRemainingTasks()); + Assert.assertEquals(0, prm.getNumPendingRepairFinishedTasks()); + } + + @Test + public void estimateRemainingFinishedRepairTasks() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertEquals(0, prm.getEstimatedRemainingTasks()); + Assert.assertEquals(1, prm.getNumPendingRepairFinishedTasks()); + } + + @Test + public void getNextBackgroundTask() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + + repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertEquals(2, prm.getSessions().size()); + Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds())); + AbstractCompactionTask compactionTask = prm.getNextRepairFinishedTask(); + Assert.assertNotNull(compactionTask); + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + PendingRepairManager.RepairFinishedCompactionTask cleanupTask = (PendingRepairManager.RepairFinishedCompactionTask) compactionTask; + Assert.assertEquals(repairID, cleanupTask.getSessionID()); + } + + @Test + public void getNextBackgroundTaskNoSessions() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + Assert.assertNull(prm.getNextBackgroundTask(FBUtilities.nowInSeconds())); + } + + @Test + public void maximalTaskNeedsCleanup() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertNotNull(prm.get(repairID)); + Assert.assertNotNull(prm.get(repairID)); + LocalSessionAccessor.finalizeUnsafe(repairID); + + Assert.assertEquals(1, prm.getMaximalTasks(FBUtilities.nowInSeconds(), false).size()); + } + + @Test + public void userDefinedTaskTest() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + UUID repairId = registerSession(cfs); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairId); + prm.addSSTable(sstable); + List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100); + Assert.assertEquals(1, tasks.size()); + } + + @Test + public void mixedPendingSessionsTest() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + UUID repairId = registerSession(cfs); + UUID repairId2 = registerSession(cfs); + SSTableReader sstable = makeSSTable(true); + SSTableReader sstable2 = makeSSTable(true); + + mutateRepaired(sstable, repairId); + mutateRepaired(sstable2, repairId2); + prm.addSSTable(sstable); + prm.addSSTable(sstable2); + List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100); + Assert.assertEquals(2, tasks.size()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 2021538..536bdca 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@ -1183,7 +1183,7 @@ public class LogTransactionTest extends AbstractTransactionalTest SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) - .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, header) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index f4443f1..bb8cc68 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -163,6 +163,7 @@ public class RealTransactionsTest extends SchemaLoader desc, 0, 0, + null, 0, SerializationHeader.make(cfs.metadata(), txn.originals()), cfs.indexManager.listIndexes(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index fd9b03e..cebceca 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -50,7 +50,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddress local = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false); + StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false, null); session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0); StreamStateStore store = new StreamStateStore(); @@ -71,7 +71,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false); + session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false, null); session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index 7e8c1fb..666c2e9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest private TestableBTW(Descriptor desc) { - this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))); + this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))); } private TestableBTW(Descriptor desc, SSTableTxnWriter sw) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index e3afaeb..41a6828 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -936,7 +936,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase File dir = cfs.getDirectories().getDirectoryForNewSSTables(); Descriptor desc = cfs.newSSTableDescriptor(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) { int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; for ( ; i < end ; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 189782c..d3eef01 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -30,9 +30,10 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.Util; + +import static org.apache.cassandra.service.ActiveRepairService.*; import static org.junit.Assert.assertEquals; public class SSTableUtils @@ -217,7 +218,7 @@ public class SSTableUtils TableMetadata metadata = Schema.instance.getTableMetadata(ksname, cfname); ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id); SerializationHeader header = appender.header(); - SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header); + SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 0, header); while (appender.append(writer)) { /* pass */ } Collection<SSTableReader> readers = writer.finish(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java index cc92b2c..d42c49b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java @@ -164,7 +164,7 @@ public class SSTableWriterTestBase extends SchemaLoader public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) { Descriptor desc = cfs.newSSTableDescriptor(directory); - return SSTableWriter.create(desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn); + return SSTableWriter.create(desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn); } public static ByteBuffer random(int i, int size) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java index 18defdf..18bb6b1 100644 --- a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java @@ -96,7 +96,7 @@ public class SSTableFlushObserverTest KS_NAME, CF_NAME, 0, sstableFormat), - 10L, 0L, TableMetadataRef.forOfflineTools(cfm), + 10L, 0L, null, TableMetadataRef.forOfflineTools(cfm), new MetadataCollector(cfm.comparator).sstableLevel(0), new SerializationHeader(true, cfm, cfm.regularAndStaticColumns(), EncodingStats.NO_STATS), Collections.singletonList(observer), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index b03f275..b918dfd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -44,6 +44,8 @@ import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.RandomAccessReader; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class MetadataSerializerTest { @@ -96,7 +98,7 @@ public class MetadataSerializerTest String partitioner = RandomPartitioner.class.getCanonicalName(); double bfFpChance = 0.1; - Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0, SerializationHeader.make(cfm, Collections.emptyList())); + Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0, null, SerializationHeader.make(cfm, Collections.emptyList())); return originalMetadata; } @@ -118,6 +120,12 @@ public class MetadataSerializerTest testOldReadsNew("mb", "mc"); } + @Test + public void testMdReadMc() throws IOException + { + testOldReadsNew("mc", "md"); + } + public void testOldReadsNew(String oldV, String newV) throws IOException { Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata(); @@ -146,4 +154,13 @@ public class MetadataSerializerTest } } } + + @Test + public void pendingRepairCompatibility() + { + Version mc = BigFormat.instance.getVersion("mc"); + assertFalse(mc.hasPendingRepair()); + Version md = BigFormat.instance.getVersion("md"); + assertTrue(md.hasPendingRepair()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 0fceaf4..b799d66 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -76,7 +76,7 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(ep1, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, null, false); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -90,7 +90,9 @@ public class LocalSyncTaskTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), false); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), + Arrays.asList(cfs), Arrays.asList(range), false, + ActiveRepairService.UNREPAIRED_SSTABLE, false); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); @@ -111,7 +113,7 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, null, false); task.run(); // ensure that the changed range was recorded http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index f65bedb..7be8cb5 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -62,7 +62,7 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddress> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, "Standard1"); + RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, false, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index e2fb2c4..bbcdbb8 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -193,7 +193,7 @@ public class ValidatorTest false); final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false); CompactionManager.instance.submitValidation(cfs, validator); MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java new file mode 100644 index 0000000..26168ad --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Ignore; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +@Ignore +public abstract class AbstractConsistentSessionTest +{ + protected static final InetAddress COORDINATOR; + protected static final InetAddress PARTICIPANT1; + protected static final InetAddress PARTICIPANT2; + protected static final InetAddress PARTICIPANT3; + + static + { + try + { + COORDINATOR = InetAddress.getByName("10.0.0.1"); + PARTICIPANT1 = InetAddress.getByName("10.0.0.1"); + PARTICIPANT2 = InetAddress.getByName("10.0.0.2"); + PARTICIPANT3 = InetAddress.getByName("10.0.0.3"); + } + catch (UnknownHostException e) + { + + throw new AssertionError(e); + } + + DatabaseDescriptor.daemonInitialization(); + } + + protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); + + protected static Token t(int v) + { + return DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v)); + } + + protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2)); + protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3)); + protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5)); + + + protected static UUID registerSession(ColumnFamilyStore cfs) + { + UUID sessionId = UUIDGen.getTimeUUID(); + + ActiveRepairService.instance.registerParentRepairSession(sessionId, + COORDINATOR, + Lists.newArrayList(cfs), + Sets.newHashSet(RANGE1, RANGE2, RANGE3), + true, + System.currentTimeMillis(), + true); + return sessionId; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java new file mode 100644 index 0000000..b570920 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.repair.RepairSessionResult; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; + +public class CoordinatorSessionTest extends AbstractConsistentSessionTest +{ + + static CoordinatorSession.Builder createBuilder() + { + CoordinatorSession.Builder builder = CoordinatorSession.builder(); + builder.withState(PREPARING); + builder.withSessionID(UUIDGen.getTimeUUID()); + builder.withCoordinator(COORDINATOR); + builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID())); + builder.withRepairedAt(System.currentTimeMillis()); + builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3)); + builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3)); + return builder; + } + + static CoordinatorSession createSession() + { + return createBuilder().build(); + } + + static InstrumentedCoordinatorSession createInstrumentedSession() + { + return new InstrumentedCoordinatorSession(createBuilder()); + } + + private static RepairSessionResult createResult(CoordinatorSession coordinator) + { + return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null); + } + + private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected) + { + Assert.assertTrue(coordinator.sentMessages.containsKey(participant)); + Assert.assertEquals(1, coordinator.sentMessages.get(participant).size()); + Assert.assertEquals(expected, coordinator.sentMessages.get(participant).get(0)); + } + + private static class InstrumentedCoordinatorSession extends CoordinatorSession + { + public InstrumentedCoordinatorSession(Builder builder) + { + super(builder); + } + + Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>(); + + protected void sendMessage(InetAddress destination, RepairMessage message) + { + if (!sentMessages.containsKey(destination)) + { + sentMessages.put(destination, new ArrayList<>()); + } + sentMessages.get(destination).add(message); + } + + Runnable onSetRepairing = null; + boolean setRepairingCalled = false; + public synchronized void setRepairing() + { + setRepairingCalled = true; + if (onSetRepairing != null) + { + onSetRepairing.run(); + } + super.setRepairing(); + } + + Runnable onFinalizeCommit = null; + boolean finalizeCommitCalled = false; + public synchronized void finalizeCommit(Executor executor) + { + finalizeCommitCalled = true; + if (onFinalizeCommit != null) + { + onFinalizeCommit.run(); + } + super.finalizeCommit(executor); + } + + Runnable onFail = null; + boolean failCalled = false; + public synchronized void fail(Executor executor) + { + failCalled = true; + if (onFail != null) + { + onFail.run(); + } + super.fail(executor); + } + } + + /** + * Coordinator state should only switch after all participants are set + */ + @Test + public void setPeerState() + { + CoordinatorSession session = createSession(); + Assert.assertEquals(PREPARING, session.getState()); + + session.setParticipantState(PARTICIPANT1, PREPARED); + Assert.assertEquals(PREPARING, session.getState()); + + session.setParticipantState(PARTICIPANT2, PREPARED); + Assert.assertEquals(PREPARING, session.getState()); + + session.setParticipantState(PARTICIPANT3, PREPARED); + Assert.assertEquals(PREPARED, session.getState()); + } + + @Test + public void hasFailed() + { + CoordinatorSession session; + + // participant failure + session = createSession(); + Assert.assertFalse(session.hasFailed()); + session.setParticipantState(PARTICIPANT1, FAILED); + Assert.assertTrue(session.hasFailed()); + + // coordinator failure + session = createSession(); + Assert.assertFalse(session.hasFailed()); + session.setState(FAILED); + Assert.assertTrue(session.hasFailed()); + } + + /** + * Coordinator should only send out failures messages once + */ + @Test + public void multipleFailures() + { + InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); + + Assert.assertEquals(PREPARING, coordinator.getState()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + + coordinator.fail(); + Assert.assertEquals(FAILED, coordinator.getState()); + for (InetAddress participant : PARTICIPANTS) + { + assertMessageSent(coordinator, participant, new FailSession(coordinator.sessionID)); + } + + coordinator.sentMessages.clear(); + coordinator.fail(); + Assert.assertEquals(FAILED, coordinator.getState()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + } + + /** + * Tests the complete coordinator side consistent repair cycle + */ + @Test + public void successCase() + { + InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); + Executor executor = MoreExecutors.directExecutor(); + AtomicBoolean repairSubmitted = new AtomicBoolean(false); + SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); + Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> + { + repairSubmitted.set(true); + return repairFuture; + }; + + // coordinator sends prepare requests to create local session and perform anticompaction + AtomicBoolean hasFailures = new AtomicBoolean(false); + Assert.assertFalse(repairSubmitted.get()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + + for (InetAddress participant : PARTICIPANTS) + { + + RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); + assertMessageSent(coordinator, participant, expected); + } + + // participants respond to coordinator, and repair begins once all participants have responded with success + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT2, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + // set the setRepairing callback to verify the correct state when it's called + Assert.assertFalse(coordinator.setRepairingCalled); + coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, coordinator.getState()); + coordinator.handlePrepareResponse(PARTICIPANT3, true); + Assert.assertTrue(coordinator.setRepairingCalled); + Assert.assertTrue(repairSubmitted.get()); + + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + ArrayList<RepairSessionResult> results = Lists.newArrayList(createResult(coordinator), + createResult(coordinator), + createResult(coordinator)); + + coordinator.sentMessages.clear(); + repairFuture.set(results); + + // propose messages should have been sent once all repair sessions completed successfully + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FinalizePropose(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + // finalize commit messages will be sent once all participants respond with a promize to finalize + coordinator.sentMessages.clear(); + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + coordinator.handleFinalizePromise(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + coordinator.handleFinalizePromise(PARTICIPANT2, true); + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + // set the finalizeCommit callback so we can verify the state when it's called + Assert.assertFalse(coordinator.finalizeCommitCalled); + coordinator.onFinalizeCommit = () -> Assert.assertEquals(FINALIZE_PROMISED, coordinator.getState()); + coordinator.handleFinalizePromise(PARTICIPANT3, true); + Assert.assertTrue(coordinator.finalizeCommitCalled); + + Assert.assertEquals(ConsistentSession.State.FINALIZED, coordinator.getState()); + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FinalizeCommit(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + Assert.assertTrue(sessionResult.isDone()); + Assert.assertFalse(hasFailures.get()); + } + + @Test + public void failedRepairs() + { + InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); + Executor executor = MoreExecutors.directExecutor(); + AtomicBoolean repairSubmitted = new AtomicBoolean(false); + SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); + Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> + { + repairSubmitted.set(true); + return repairFuture; + }; + + // coordinator sends prepare requests to create local session and perform anticompaction + AtomicBoolean hasFailures = new AtomicBoolean(false); + Assert.assertFalse(repairSubmitted.get()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + for (InetAddress participant : PARTICIPANTS) + { + PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); + assertMessageSent(coordinator, participant, expected); + } + + // participants respond to coordinator, and repair begins once all participants have responded with success + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT2, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + // set the setRepairing callback to verify the correct state when it's called + Assert.assertFalse(coordinator.setRepairingCalled); + coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, coordinator.getState()); + coordinator.handlePrepareResponse(PARTICIPANT3, true); + Assert.assertTrue(coordinator.setRepairingCalled); + Assert.assertTrue(repairSubmitted.get()); + + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + ArrayList<RepairSessionResult> results = Lists.newArrayList(createResult(coordinator), + null, + createResult(coordinator)); + + coordinator.sentMessages.clear(); + Assert.assertFalse(coordinator.failCalled); + coordinator.onFail = () -> Assert.assertEquals(REPAIRING, coordinator.getState()); + repairFuture.set(results); + Assert.assertTrue(coordinator.failCalled); + + // all participants should have been notified of session failure + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FailSession(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + Assert.assertTrue(sessionResult.isDone()); + Assert.assertTrue(hasFailures.get()); + } + + @Test + public void failedPrepare() + { + InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); + Executor executor = MoreExecutors.directExecutor(); + AtomicBoolean repairSubmitted = new AtomicBoolean(false); + SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); + Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> + { + repairSubmitted.set(true); + return repairFuture; + }; + + // coordinator sends prepare requests to create local session and perform anticompaction + AtomicBoolean hasFailures = new AtomicBoolean(false); + Assert.assertFalse(repairSubmitted.get()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + for (InetAddress participant : PARTICIPANTS) + { + PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); + assertMessageSent(coordinator, participant, expected); + } + + coordinator.sentMessages.clear(); + + // participants respond to coordinator, and repair begins once all participants have responded with success + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + // participant 2 fails to prepare for consistent repair + Assert.assertFalse(coordinator.failCalled); + coordinator.handlePrepareResponse(PARTICIPANT2, false); + Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState()); + Assert.assertTrue(coordinator.failCalled); + + // additional success messages should be ignored + Assert.assertFalse(coordinator.setRepairingCalled); + coordinator.onSetRepairing = Assert::fail; + coordinator.handlePrepareResponse(PARTICIPANT3, true); + Assert.assertFalse(coordinator.setRepairingCalled); + Assert.assertFalse(repairSubmitted.get()); + + // all participants should have been notified of session failure + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FailSession(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + Assert.assertTrue(sessionResult.isDone()); + Assert.assertTrue(hasFailures.get()); + } + + @Test + public void failedPropose() + { + InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); + Executor executor = MoreExecutors.directExecutor(); + AtomicBoolean repairSubmitted = new AtomicBoolean(false); + SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); + Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> + { + repairSubmitted.set(true); + return repairFuture; + }; + + // coordinator sends prepare requests to create local session and perform anticompaction + AtomicBoolean hasFailures = new AtomicBoolean(false); + Assert.assertFalse(repairSubmitted.get()); + Assert.assertTrue(coordinator.sentMessages.isEmpty()); + ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + + for (InetAddress participant : PARTICIPANTS) + { + + RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); + assertMessageSent(coordinator, participant, expected); + } + + // participants respond to coordinator, and repair begins once all participants have responded with success + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + coordinator.handlePrepareResponse(PARTICIPANT2, true); + Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState()); + + // set the setRepairing callback to verify the correct state when it's called + Assert.assertFalse(coordinator.setRepairingCalled); + coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, coordinator.getState()); + coordinator.handlePrepareResponse(PARTICIPANT3, true); + Assert.assertTrue(coordinator.setRepairingCalled); + Assert.assertTrue(repairSubmitted.get()); + + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + ArrayList<RepairSessionResult> results = Lists.newArrayList(createResult(coordinator), + createResult(coordinator), + createResult(coordinator)); + + coordinator.sentMessages.clear(); + repairFuture.set(results); + + // propose messages should have been sent once all repair sessions completed successfully + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FinalizePropose(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + // finalize commit messages will be sent once all participants respond with a promize to finalize + coordinator.sentMessages.clear(); + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + coordinator.handleFinalizePromise(PARTICIPANT1, true); + Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState()); + + Assert.assertFalse(coordinator.failCalled); + coordinator.handleFinalizePromise(PARTICIPANT2, false); + Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState()); + Assert.assertTrue(coordinator.failCalled); + + // additional success messages should be ignored + Assert.assertFalse(coordinator.finalizeCommitCalled); + coordinator.onFinalizeCommit = Assert::fail; + coordinator.handleFinalizePromise(PARTICIPANT3, true); + Assert.assertFalse(coordinator.finalizeCommitCalled); + Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState()); + + // failure messages should have been sent to all participants + for (InetAddress participant : PARTICIPANTS) + { + RepairMessage expected = new FailSession(coordinator.sessionID); + assertMessageSent(coordinator, participant, expected); + } + + Assert.assertTrue(sessionResult.isDone()); + Assert.assertTrue(hasFailures.get()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java new file mode 100644 index 0000000..dfa9bc5 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executor; + +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.UUIDGen; + +public class CoordinatorSessionsTest extends AbstractConsistentSessionTest +{ + private static TableMetadata cfm; + private static ColumnFamilyStore cfs; + + // to check CoordinatorSessions is passing the messages to the coordinator session correctly + private static class InstrumentedCoordinatorSession extends CoordinatorSession + { + + public InstrumentedCoordinatorSession(Builder builder) + { + super(builder); + } + + int prepareResponseCalls = 0; + InetAddress preparePeer = null; + boolean prepareSuccess = false; + public synchronized void handlePrepareResponse(InetAddress participant, boolean success) + { + prepareResponseCalls++; + preparePeer = participant; + prepareSuccess = success; + } + + int finalizePromiseCalls = 0; + InetAddress promisePeer = null; + boolean promiseSuccess = false; + public synchronized void handleFinalizePromise(InetAddress participant, boolean success) + { + finalizePromiseCalls++; + promisePeer = participant; + promiseSuccess = success; + } + + int failCalls = 0; + public synchronized void fail(Executor executor) + { + failCalls++; + } + } + + private static class InstrumentedCoordinatorSessions extends CoordinatorSessions + { + protected CoordinatorSession buildSession(CoordinatorSession.Builder builder) + { + return new InstrumentedCoordinatorSession(builder); + } + + public InstrumentedCoordinatorSession getSession(UUID sessionId) + { + return (InstrumentedCoordinatorSession) super.getSession(sessionId); + } + + public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddress> peers) + { + return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers); + } + } + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build(); + SchemaLoader.createKeyspace("coordinatorsessiontest", KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + private static UUID registerSession() + { + return registerSession(cfs); + } + + @Test + public void registerSessionTest() + { + CoordinatorSessions sessions = new CoordinatorSessions(); + UUID sessionID = registerSession(); + CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + + Assert.assertEquals(ConsistentSession.State.PREPARING, session.getState()); + Assert.assertEquals(sessionID, session.sessionID); + Assert.assertEquals(COORDINATOR, session.coordinator); + Assert.assertEquals(Sets.newHashSet(cfm.id), session.tableIds); + + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + Assert.assertEquals(prs.repairedAt, session.repairedAt); + Assert.assertEquals(prs.getRanges(), session.ranges); + Assert.assertEquals(PARTICIPANTS, session.participants); + + Assert.assertSame(session, sessions.getSession(sessionID)); + } + + @Test + public void handlePrepareResponse() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID sessionID = registerSession(); + + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + Assert.assertEquals(0, session.prepareResponseCalls); + + sessions.handlePrepareResponse(new PrepareConsistentResponse(sessionID, PARTICIPANT1, true)); + Assert.assertEquals(1, session.prepareResponseCalls); + Assert.assertEquals(PARTICIPANT1, session.preparePeer); + Assert.assertEquals(true, session.prepareSuccess); + } + + @Test + public void handlePrepareResponseNoSession() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID fakeID = UUIDGen.getTimeUUID(); + + sessions.handlePrepareResponse(new PrepareConsistentResponse(fakeID, PARTICIPANT1, true)); + Assert.assertNull(sessions.getSession(fakeID)); + } + + @Test + public void handlePromiseResponse() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID sessionID = registerSession(); + + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + Assert.assertEquals(0, session.finalizePromiseCalls); + + sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, PARTICIPANT1, true)); + Assert.assertEquals(1, session.finalizePromiseCalls); + Assert.assertEquals(PARTICIPANT1, session.promisePeer); + Assert.assertEquals(true, session.promiseSuccess); + } + + @Test + public void handlePromiseResponseNoSession() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID fakeID = UUIDGen.getTimeUUID(); + + sessions.handleFinalizePromiseMessage(new FinalizePromise(fakeID, PARTICIPANT1, true)); + Assert.assertNull(sessions.getSession(fakeID)); + } + + @Test + public void handleFailureMessage() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID sessionID = registerSession(); + + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + Assert.assertEquals(0, session.failCalls); + + sessions.handleFailSessionMessage(new FailSession(sessionID)); + Assert.assertEquals(1, session.failCalls); + } + + @Test + public void handleFailureMessageNoSession() + { + InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); + UUID fakeID = UUIDGen.getTimeUUID(); + + sessions.handleFailSessionMessage(new FailSession(fakeID)); + Assert.assertNull(sessions.getSession(fakeID)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java new file mode 100644 index 0000000..6808efe --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.Set; +import java.util.UUID; + +import org.apache.cassandra.service.ActiveRepairService; + +/** + * makes package private hacks available to compaction tests + */ +public class LocalSessionAccessor +{ + private static final ActiveRepairService ARS = ActiveRepairService.instance; + + public static void startup() + { + ARS.consistent.local.start(); + } + + public static void prepareUnsafe(UUID sessionID, InetAddress coordinator, Set<InetAddress> peers) + { + ActiveRepairService.ParentRepairSession prs = ARS.getParentRepairSession(sessionID); + assert prs != null; + LocalSession session = ARS.consistent.local.createSessionUnsafe(sessionID, prs, peers); + ARS.consistent.local.putSessionUnsafe(session); + } + + public static void finalizeUnsafe(UUID sessionID) + { + LocalSession session = ARS.consistent.local.getSession(sessionID); + assert session != null; + session.setState(ConsistentSession.State.FINALIZED); + ARS.consistent.local.save(session); + } + + public static void failUnsafe(UUID sessionID) + { + LocalSession session = ARS.consistent.local.getSession(sessionID); + assert session != null; + session.setState(ConsistentSession.State.FAILED); + ARS.consistent.local.save(session); + } +}