Repository: cassandra Updated Branches: refs/heads/trunk 1757e1330 -> 98d74ed99
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java new file mode 100644 index 0000000..a85e01b --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -0,0 +1,885 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.StatusRequest; +import org.apache.cassandra.repair.messages.StatusResponse; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; + +public class LocalSessionTest extends AbstractConsistentSessionTest +{ + + static LocalSession.Builder createBuilder() + { + LocalSession.Builder builder = LocalSession.builder(); + builder.withState(PREPARING); + builder.withSessionID(UUIDGen.getTimeUUID()); + builder.withCoordinator(COORDINATOR); + builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID())); + builder.withRepairedAt(System.currentTimeMillis()); + builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3)); + builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3)); + + int now = FBUtilities.nowInSeconds(); + builder.withStartedAt(now); + builder.withLastUpdate(now); + + return builder; + } + + static LocalSession createSession() + { + return createBuilder().build(); + } + + private static void assertValidationFailure(Consumer<LocalSession.Builder> consumer) + { + try + { + LocalSession.Builder builder = createBuilder(); + consumer.accept(builder); + builder.build(); + Assert.fail("Expected assertion error"); + } + catch (IllegalArgumentException e) + { + // expected + } + } + + private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddress to) + { + Assert.assertNull(sessions.sentMessages.get(to)); + } + + private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddress to, RepairMessage... expected) + { + Assert.assertEquals(Lists.newArrayList(expected), sessions.sentMessages.get(to)); + } + + static class InstrumentedLocalSessions extends LocalSessions + { + Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>(); + protected void sendMessage(InetAddress destination, RepairMessage message) + { + if (!sentMessages.containsKey(destination)) + { + sentMessages.put(destination, new ArrayList<>()); + } + sentMessages.get(destination).add(message); + } + + SettableFuture<Object> pendingAntiCompactionFuture = null; + boolean submitPendingAntiCompactionCalled = false; + ListenableFuture submitPendingAntiCompaction(LocalSession session, ExecutorService executor) + { + submitPendingAntiCompactionCalled = true; + if (pendingAntiCompactionFuture != null) + { + return pendingAntiCompactionFuture; + } + else + { + return super.submitPendingAntiCompaction(session, executor); + } + } + + boolean failSessionCalled = false; + public void failSession(UUID sessionID, boolean sendMessage) + { + failSessionCalled = true; + super.failSession(sessionID, sendMessage); + } + + public LocalSession prepareForTest(UUID sessionID) + { + pendingAntiCompactionFuture = SettableFuture.create(); + handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + pendingAntiCompactionFuture.set(new Object()); + sentMessages.clear(); + return getSession(sessionID); + } + + protected InetAddress getBroadcastAddress() + { + return PARTICIPANT1; + } + + protected boolean isAlive(InetAddress address) + { + return true; + } + + protected boolean isNodeInitialized() + { + return true; + } + } + + private static TableMetadata cfm; + private static ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "localsessiontest").build(); + SchemaLoader.createKeyspace("localsessiontest", KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + @Before + public void setup() + { + // clear out any data from previous test runs + ColumnFamilyStore repairCfs = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.REPAIRS); + repairCfs.truncateBlocking(); + } + + private static UUID registerSession() + { + return registerSession(cfs); + } + + @Test + public void validation() + { + assertValidationFailure(b -> b.withState(null)); + assertValidationFailure(b -> b.withSessionID(null)); + assertValidationFailure(b -> b.withCoordinator(null)); + assertValidationFailure(b -> b.withTableIds(null)); + assertValidationFailure(b -> b.withTableIds(new HashSet<>())); + assertValidationFailure(b -> b.withRepairedAt(0)); + assertValidationFailure(b -> b.withRepairedAt(-1)); + assertValidationFailure(b -> b.withRanges(null)); + assertValidationFailure(b -> b.withRanges(new HashSet<>())); + assertValidationFailure(b -> b.withParticipants(null)); + assertValidationFailure(b -> b.withParticipants(new HashSet<>())); + assertValidationFailure(b -> b.withStartedAt(0)); + assertValidationFailure(b -> b.withLastUpdate(0)); + } + + /** + * Test that sessions are loaded and saved properly + */ + @Test + public void persistence() + { + LocalSessions sessions = new LocalSessions(); + LocalSession expected = createSession(); + sessions.save(expected); + LocalSession actual = sessions.loadUnsafe(expected.sessionID); + Assert.assertEquals(expected, actual); + } + + @Test + public void prepareSuccessCase() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + // replacing future so we can inspect state before and after anti compaction callback + sessions.pendingAntiCompactionFuture = SettableFuture.create(); + Assert.assertFalse(sessions.submitPendingAntiCompactionCalled); + sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + Assert.assertTrue(sessions.submitPendingAntiCompactionCalled); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + + // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING + LocalSession session = sessions.getSession(sessionID); + Assert.assertNotNull(session); + Assert.assertEquals(PREPARING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + + // anti compaction has now finished, so state in memory and on disk should be PREPARED + sessions.pendingAntiCompactionFuture.set(new Object()); + session = sessions.getSession(sessionID); + Assert.assertNotNull(session); + Assert.assertEquals(PREPARED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + + // ...and we should have sent a success message back to the coordinator + assertMessagesSent(sessions, COORDINATOR, new PrepareConsistentResponse(sessionID, PARTICIPANT1, true)); + } + + /** + * If anti compactionn fails, we should fail the session locally, + * and send a failure message back to the coordinator + */ + @Test + public void prepareAntiCompactFailure() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + // replacing future so we can inspect state before and after anti compaction callback + sessions.pendingAntiCompactionFuture = SettableFuture.create(); + Assert.assertFalse(sessions.submitPendingAntiCompactionCalled); + sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + Assert.assertTrue(sessions.submitPendingAntiCompactionCalled); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + + // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING + LocalSession session = sessions.getSession(sessionID); + Assert.assertNotNull(session); + Assert.assertEquals(PREPARING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + + // anti compaction has now finished, so state in memory and on disk should be PREPARED + sessions.pendingAntiCompactionFuture.setException(new RuntimeException()); + session = sessions.getSession(sessionID); + Assert.assertNotNull(session); + Assert.assertEquals(FAILED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + + // ...and we should have sent a success message back to the coordinator + assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); + + } + + /** + * If a ParentRepairSession wasn't previously created, we shouldn't + * create a session locally, but we should send a failure message to + * the coordinator. + */ + @Test + public void prepareWithNonExistantParentSession() + { + UUID sessionID = UUIDGen.getTimeUUID(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + Assert.assertNull(sessions.getSession(sessionID)); + assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); + } + + @Test + public void maybeSetRepairing() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + + sessions.sentMessages.clear(); + sessions.maybeSetRepairing(sessionID); + Assert.assertEquals(REPAIRING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + /** + * Multiple calls to maybeSetRepairing shouldn't cause any problems + */ + @Test + public void maybeSetRepairingDuplicates() + { + + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + + // initial set + sessions.sentMessages.clear(); + sessions.maybeSetRepairing(sessionID); + Assert.assertEquals(REPAIRING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + + // repeated call 1 + sessions.maybeSetRepairing(sessionID); + Assert.assertEquals(REPAIRING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + + // repeated call 2 + sessions.maybeSetRepairing(sessionID); + Assert.assertEquals(REPAIRING, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + /** + * We shouldn't fail if we don't have a session for the given session id + */ + @Test + public void maybeSetRepairingNonExistantSession() + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + UUID fakeID = UUIDGen.getTimeUUID(); + sessions.maybeSetRepairing(fakeID); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + /** + * In the success case, session state should be set to FINALIZE_PROMISED and + * persisted, and a FinalizePromise message should be sent back to the coordinator + */ + @Test + public void finalizeProposeSuccessCase() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + // create session and move to preparing + LocalSession session = sessions.prepareForTest(sessionID); + sessions.maybeSetRepairing(sessionID); + + // + Assert.assertEquals(REPAIRING, session.getState()); + + // should send a promised message to coordinator and set session state accordingly + sessions.sentMessages.clear(); + sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); + Assert.assertEquals(FINALIZE_PROMISED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + assertMessagesSent(sessions, COORDINATOR, new FinalizePromise(sessionID, PARTICIPANT1, true)); + } + + /** + * Trying to propose finalization when the session isn't in the repaired + * state should fail the session and send a failure message to the proposer + */ + @Test + public void finalizeProposeInvalidStateFailure() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + + // should fail the session and send a failure message to the coordinator + sessions.sentMessages.clear(); + sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); + Assert.assertEquals(FAILED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); + } + + @Test + public void finalizeProposeNonExistantSessionFailure() + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + UUID fakeID = UUIDGen.getTimeUUID(); + sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(fakeID)); + Assert.assertNull(sessions.getSession(fakeID)); + assertMessagesSent(sessions, COORDINATOR, new FailSession(fakeID)); + } + + /** + * Session state should be set to finalized, sstables should be promoted + * to repaired. No messages should be sent to the coordinator + */ + @Test + public void finalizeCommitSuccessCase() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + // create session and move to finalized promised + sessions.prepareForTest(sessionID); + sessions.maybeSetRepairing(sessionID); + sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); + + sessions.sentMessages.clear(); + LocalSession session = sessions.getSession(sessionID); + sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(sessionID)); + + Assert.assertEquals(FINALIZED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + @Test + public void finalizeCommitNonExistantSession() + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + UUID fakeID = UUIDGen.getTimeUUID(); + sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(fakeID)); + Assert.assertNull(sessions.getSession(fakeID)); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + @Test + public void failSession() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + sessions.sentMessages.clear(); + + // fail session + sessions.failSession(sessionID); + Assert.assertEquals(FAILED, session.getState()); + assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); + } + + /** + * Session should be failed, but no messages should be sent + */ + @Test + public void handleFailMessage() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + sessions.sentMessages.clear(); + + sessions.handleFailSessionMessage(PARTICIPANT1, new FailSession(sessionID)); + Assert.assertEquals(FAILED, session.getState()); + Assert.assertTrue(sessions.sentMessages.isEmpty()); + } + + @Test + public void sendStatusRequest() throws Exception + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + LocalSession session = sessions.prepareForTest(sessionID); + + sessions.sentMessages.clear(); + sessions.sendStatusRequest(session); + + assertNoMessagesSent(sessions, PARTICIPANT1); + StatusRequest expected = new StatusRequest(sessionID); + assertMessagesSent(sessions, PARTICIPANT2, expected); + assertMessagesSent(sessions, PARTICIPANT3, expected); + } + + @Test + public void handleStatusRequest() throws Exception + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + LocalSession session = sessions.prepareForTest(sessionID); + Assert.assertEquals(PREPARED, session.getState()); + + sessions.sentMessages.clear(); + sessions.handleStatusRequest(PARTICIPANT2, new StatusRequest(sessionID)); + assertNoMessagesSent(sessions, PARTICIPANT1); + assertMessagesSent(sessions, PARTICIPANT2, new StatusResponse(sessionID, PREPARED)); + assertNoMessagesSent(sessions, PARTICIPANT3); + } + + @Test + public void handleStatusRequestNoSession() throws Exception + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + sessions.sentMessages.clear(); + UUID sessionID = UUIDGen.getTimeUUID(); + sessions.handleStatusRequest(PARTICIPANT2, new StatusRequest(sessionID)); + assertNoMessagesSent(sessions, PARTICIPANT1); + assertMessagesSent(sessions, PARTICIPANT2, new StatusResponse(sessionID, FAILED)); + assertNoMessagesSent(sessions, PARTICIPANT3); + } + + @Test + public void handleStatusResponseFinalized() throws Exception + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + LocalSession session = sessions.prepareForTest(sessionID); + session.setState(FINALIZE_PROMISED); + + sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZED)); + Assert.assertEquals(FINALIZED, session.getState()); + } + + @Test + public void handleStatusResponseFailed() throws Exception + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + LocalSession session = sessions.prepareForTest(sessionID); + session.setState(FINALIZE_PROMISED); + + sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FAILED)); + Assert.assertEquals(FAILED, session.getState()); + } + + @Test + public void handleStatusResponseNoop() throws Exception + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + LocalSession session = sessions.prepareForTest(sessionID); + session.setState(REPAIRING); + + sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZE_PROMISED)); + Assert.assertEquals(REPAIRING, session.getState()); + } + + @Test + public void handleStatusResponseNoSession() throws Exception + { + UUID sessionID = UUIDGen.getTimeUUID(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + sessions.handleStatusResponse(PARTICIPANT1, new StatusResponse(sessionID, FINALIZE_PROMISED)); + Assert.assertNull(sessions.getSession(sessionID)); + } + + /** + * Check all states (except failed) + */ + @Test + public void isSessionInProgress() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + sessions.pendingAntiCompactionFuture = SettableFuture.create(); // prevent moving to prepared + sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + + LocalSession session = sessions.getSession(sessionID); + Assert.assertNotNull(session); + Assert.assertEquals(PREPARING, session.getState()); + Assert.assertTrue(sessions.isSessionInProgress(sessionID)); + + session.setState(PREPARED); + Assert.assertTrue(sessions.isSessionInProgress(sessionID)); + + session.setState(REPAIRING); + Assert.assertTrue(sessions.isSessionInProgress(sessionID)); + + session.setState(FINALIZE_PROMISED); + Assert.assertTrue(sessions.isSessionInProgress(sessionID)); + + session.setState(FINALIZED); + Assert.assertFalse(sessions.isSessionInProgress(sessionID)); + } + + @Test + public void isSessionInProgressFailed() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + sessions.pendingAntiCompactionFuture = SettableFuture.create(); + sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); + sessions.pendingAntiCompactionFuture.set(new Object()); + + Assert.assertTrue(sessions.isSessionInProgress(sessionID)); + sessions.failSession(sessionID); + Assert.assertFalse(sessions.isSessionInProgress(sessionID)); + } + + @Test + public void isSessionInProgressNonExistantSession() + { + UUID fakeID = UUIDGen.getTimeUUID(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + Assert.assertFalse(sessions.isSessionInProgress(fakeID)); + } + + @Test + public void finalRepairedAtFinalized() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + sessions.prepareForTest(sessionID); + sessions.maybeSetRepairing(sessionID); + sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); + sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(sessionID)); + + LocalSession session = sessions.getSession(sessionID); + Assert.assertTrue(session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE); + Assert.assertEquals(session.repairedAt, sessions.getFinalSessionRepairedAt(sessionID)); + } + + @Test + public void finalRepairedAtFailed() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + sessions.prepareForTest(sessionID); + sessions.failSession(sessionID); + + LocalSession session = sessions.getSession(sessionID); + Assert.assertTrue(session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE); + long repairedAt = sessions.getFinalSessionRepairedAt(sessionID); + Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, repairedAt); + } + + @Test + public void finalRepairedAtNoSession() + { + UUID fakeID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + long repairedAt = sessions.getFinalSessionRepairedAt(fakeID); + Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, repairedAt); + } + + @Test(expected = IllegalStateException.class) + public void finalRepairedAtInProgress() + { + UUID sessionID = registerSession(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + sessions.prepareForTest(sessionID); + + sessions.getFinalSessionRepairedAt(sessionID); + } + + /** + * Startup happy path + */ + @Test + public void startup() throws Exception + { + InstrumentedLocalSessions initialSessions = new InstrumentedLocalSessions(); + initialSessions.start(); + Assert.assertEquals(0, initialSessions.getNumSessions()); + UUID id1 = registerSession(); + UUID id2 = registerSession(); + + initialSessions.prepareForTest(id1); + initialSessions.prepareForTest(id2); + Assert.assertEquals(2, initialSessions.getNumSessions()); + LocalSession session1 = initialSessions.getSession(id1); + LocalSession session2 = initialSessions.getSession(id2); + + + // subsequent startups should load persisted sessions + InstrumentedLocalSessions nextSessions = new InstrumentedLocalSessions(); + Assert.assertEquals(0, nextSessions.getNumSessions()); + nextSessions.start(); + Assert.assertEquals(2, nextSessions.getNumSessions()); + + Assert.assertEquals(session1, nextSessions.getSession(id1)); + Assert.assertEquals(session2, nextSessions.getSession(id2)); + } + + /** + * If LocalSessions.start is called more than + * once, an exception should be thrown + */ + @Test (expected = IllegalArgumentException.class) + public void multipleStartupFailure() throws Exception + { + InstrumentedLocalSessions initialSessions = new InstrumentedLocalSessions(); + initialSessions.start(); + initialSessions.start(); + } + + /** + * If there are problems with the rows we're reading out of the repair table, we should + * do the best we can to repair them, but not refuse to startup. + */ + @Test + public void loadCorruptRow() throws Exception + { + LocalSessions sessions = new LocalSessions(); + LocalSession session = createSession(); + sessions.save(session); + + sessions = new LocalSessions(); + sessions.start(); + Assert.assertNotNull(sessions.getSession(session.sessionID)); + + QueryProcessor.instance.executeInternal("DELETE participants FROM system.repairs WHERE parent_id=?", session.sessionID); + + sessions = new LocalSessions(); + sessions.start(); + Assert.assertNull(sessions.getSession(session.sessionID)); + } + + private static LocalSession sessionWithTime(int started, int updated) + { + LocalSession.Builder builder = createBuilder(); + builder.withStartedAt(started); + builder.withLastUpdate(updated); + return builder.build(); + } + + /** + * Sessions that shouldn't be failed or deleted are left alone + */ + @Test + public void cleanupNoOp() throws Exception + { + LocalSessions sessions = new LocalSessions(); + sessions.start(); + + int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT + 60; + LocalSession session = sessionWithTime(time - 1, time); + + sessions.putSessionUnsafe(session); + Assert.assertNotNull(sessions.getSession(session.sessionID)); + + sessions.cleanup(); + + Assert.assertNotNull(sessions.getSession(session.sessionID)); + } + + /** + * Sessions past the auto fail cutoff should be failed + */ + @Test + public void cleanupFail() throws Exception + { + LocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; + LocalSession session = sessionWithTime(time - 1, time); + session.setState(REPAIRING); + + sessions.putSessionUnsafe(session); + Assert.assertNotNull(sessions.getSession(session.sessionID)); + + sessions.cleanup(); + + Assert.assertNotNull(sessions.getSession(session.sessionID)); + Assert.assertEquals(FAILED, session.getState()); + Assert.assertEquals(session, sessions.loadUnsafe(session.sessionID)); + } + + /** + * Sessions past the auto delete cutoff should be deleted + */ + @Test + public void cleanupDelete() throws Exception + { + LocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; + LocalSession failed = sessionWithTime(time - 1, time); + failed.setState(FAILED); + + LocalSession finalized = sessionWithTime(time - 1, time); + finalized.setState(FINALIZED); + + sessions.putSessionUnsafe(failed); + sessions.putSessionUnsafe(finalized); + Assert.assertNotNull(sessions.getSession(failed.sessionID)); + Assert.assertNotNull(sessions.getSession(finalized.sessionID)); + + sessions.cleanup(); + + Assert.assertNull(sessions.getSession(failed.sessionID)); + Assert.assertNull(sessions.getSession(finalized.sessionID)); + + Assert.assertNull(sessions.loadUnsafe(failed.sessionID)); + Assert.assertNull(sessions.loadUnsafe(finalized.sessionID)); + } + + /** + * Sessions should start checking the status of their participants if + * there hasn't been activity for the CHECK_STATUS_TIMEOUT period + */ + @Test + public void cleanupStatusRequest() throws Exception + { + AtomicReference<LocalSession> checkedSession = new AtomicReference<>(); + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions() { + public void sendStatusRequest(LocalSession session) + { + Assert.assertTrue(checkedSession.compareAndSet(null, session)); + } + }; + sessions.start(); + + int time = FBUtilities.nowInSeconds() - LocalSessions.CHECK_STATUS_TIMEOUT - 1; + LocalSession session = sessionWithTime(time - 1, time); + session.setState(REPAIRING); + + sessions.putSessionUnsafe(session); + Assert.assertNotNull(sessions.getSession(session.sessionID)); + + sessions.cleanup(); + + Assert.assertEquals(session, checkedSession.get()); + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java new file mode 100644 index 0000000..2cb6326 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class PendingAntiCompactionTest +{ + private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class); + private static final Collection<Range<Token>> FULL_RANGE; + static + { + DatabaseDescriptor.daemonInitialization(); + Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); + FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken)); + } + + private String ks; + private final String tbl = "tbl"; + private TableMetadata cfm; + private ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + } + + @Before + public void setup() + { + ks = "ks_" + System.currentTimeMillis(); + cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + private void makeSSTables(int num) + { + for (int i = 0; i < num; i++) + { + int val = i * 2; // multiplied to prevent ranges from overlapping + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val, val); + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val+1, val+1); + cfs.forceBlockingFlush(); + } + Assert.assertEquals(num, cfs.getLiveSSTables().size()); + } + + private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback + { + public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + { + super(parentRepairSession, ranges); + } + + Set<TableId> submittedCompactions = new HashSet<>(); + + ListenableFuture<?> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result) + { + submittedCompactions.add(result.cfs.metadata.id); + result.abort(); // prevent ref leak complaints + return ListenableFutureTask.create(() -> {}, null); + } + } + + /** + * verify the pending anti compaction happy path + */ + @Test + public void successCase() throws Exception + { + Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass()); + cfs.disableAutoCompaction(); + + // create 2 sstables, one that will be split, and another that will be moved + for (int i = 0; i < 8; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + for (int i = 8; i < 12; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + Assert.assertEquals(2, cfs.getLiveSSTables().size()); + + Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6)); + Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16)); + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(left, right)); + + // create a session so the anti compaction can fine it + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true); + + PendingAntiCompaction pac; + ExecutorService executor = Executors.newSingleThreadExecutor(); + try + { + pac = new PendingAntiCompaction(sessionID, ranges, executor); + pac.run().get(); + } + finally + { + executor.shutdown(); + } + + Assert.assertEquals(3, cfs.getLiveSSTables().size()); + int pendingRepair = 0; + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + if (sstable.isPendingRepair()) + pendingRepair++; + } + Assert.assertEquals(2, pendingRepair); + } + + @Test + public void acquisitionSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(6); + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + List<SSTableReader> expected = sstables.subList(0, 3); + Collection<Range<Token>> ranges = new HashSet<>(); + for (SSTableReader sstable : expected) + { + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + } + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID()); + + logger.info("SSTables: {}", sstables); + logger.info("Expected: {}", expected); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(3, result.txn.originals().size()); + for (SSTableReader sstable : expected) + { + logger.info("Checking {}", sstable); + Assert.assertTrue(result.txn.originals().contains(sstable)); + } + + Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + result.abort(); + } + + @Test + public void repairedSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + Assert.assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + Assert.assertTrue(repaired.intersects(FULL_RANGE)); + Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); + + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null); + repaired.reloadSSTableMetadata(); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(1, result.txn.originals().size()); + Assert.assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // release sstable refs + } + + @Test + public void pendingRepairSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + Assert.assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + Assert.assertTrue(repaired.intersects(FULL_RANGE)); + Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); + + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID()); + repaired.reloadSSTableMetadata(); + Assert.assertTrue(repaired.isPendingRepair()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(1, result.txn.originals().size()); + Assert.assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // releases sstable refs + } + + /** + * anti compaction task should be submitted if everything is ok + */ + @Test + public void callbackSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result)); + + Assert.assertEquals(1, cb.submittedCompactions.size()); + Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); + } + + /** + * If one of the supplied AcquireResults is null, either an Exception was thrown, or + * we couldn't get a transaction for the sstables. In either case we need to cancel the repair, and release + * any sstables acquired for other tables + */ + @Test + public void callbackNullResult() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, null)); + + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state()); + } + + /** + * If an AcquireResult has a null txn, there were no sstables to acquire references + * for, so no anti compaction should have been submitted. + */ + @Test + public void callbackNullTxn() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id); + PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, fakeResult)); + + Assert.assertEquals(1, cb.submittedCompactions.size()); + Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); + Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java index 3924045..449a5dc 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -159,13 +159,6 @@ public class RepairMessageSerializationsTest } @Test - public void antiCompactionRequestMessage() throws IOException - { - AnticompactionRequest msg = new AnticompactionRequest(UUID.randomUUID(), buildTokenRanges()); - serializeRoundTrip(msg, AnticompactionRequest.serializer); - } - - @Test public void prepareMessage() throws IOException { PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new ArrayList<TableId>() {{add(TableId.generate());}}, http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java new file mode 100644 index 0000000..9789b38 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; + +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDGen; + +/** + * verifies repair message serializers are working as advertised + */ +public class RepairMessageSerializerTest +{ + private static int MS_VERSION = MessagingService.current_version; + + static RepairMessage serdes(RepairMessage message) + { + int expectedSize = (int) RepairMessage.serializer.serializedSize(message, MS_VERSION); + try (DataOutputBuffer out = new DataOutputBuffer(expectedSize)) + { + RepairMessage.serializer.serialize(message, out, MS_VERSION); + Assert.assertEquals(expectedSize, out.buffer().limit()); + try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false)) + { + return RepairMessage.serializer.deserialize(in, MS_VERSION); + } + } + catch (IOException e) + { + throw new AssertionError(e); + } + } + + @Test + public void prepareConsistentRequest() throws Exception + { + InetAddress coordinator = InetAddress.getByName("10.0.0.1"); + InetAddress peer1 = InetAddress.getByName("10.0.0.2"); + InetAddress peer2 = InetAddress.getByName("10.0.0.3"); + InetAddress peer3 = InetAddress.getByName("10.0.0.4"); + RepairMessage expected = new PrepareConsistentRequest(UUIDGen.getTimeUUID(), + coordinator, + Sets.newHashSet(peer1, peer2, peer3)); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual); + } + + @Test + public void prepareConsistentResponse() throws Exception + { + RepairMessage expected = new PrepareConsistentResponse(UUIDGen.getTimeUUID(), + InetAddress.getByName("10.0.0.2"), + true); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual); + } + + @Test + public void failSession() throws Exception + { + RepairMessage expected = new FailSession(UUIDGen.getTimeUUID()); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual);; + } + + @Test + public void finalizeCommit() throws Exception + { + RepairMessage expected = new FinalizeCommit(UUIDGen.getTimeUUID()); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual);; + } + + @Test + public void finalizePromise() throws Exception + { + RepairMessage expected = new FinalizePromise(UUIDGen.getTimeUUID(), + InetAddress.getByName("10.0.0.2"), + true); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual); + } + + @Test + public void finalizePropose() throws Exception + { + RepairMessage expected = new FinalizePropose(UUIDGen.getTimeUUID()); + RepairMessage actual = serdes(expected); + Assert.assertEquals(expected, actual);; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java index 29d9756..9eb7c86 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@ -149,14 +149,16 @@ public class RepairOptionTest } @Test - public void testIncrementalRepairWithSubrangesIsNotGlobal() throws Exception + public void testNonGlobalIncrementalRepairParse() throws Exception { - RepairOption ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, "42:42"), - Murmur3Partitioner.instance); - assertFalse(ro.isGlobal()); - ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, ""), - Murmur3Partitioner.instance); - assertTrue(ro.isGlobal()); + Map<String, String> options = new HashMap<>(); + options.put(RepairOption.PARALLELISM_KEY, "parallel"); + options.put(RepairOption.PRIMARY_RANGE_KEY, "false"); + options.put(RepairOption.INCREMENTAL_KEY, "true"); + options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3"); + options.put(RepairOption.HOSTS_KEY, "127.0.0.1, 127.0.0.2"); + assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Incremental repairs cannot be run against a subset of tokens or ranges"); + } private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage) http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/schema/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java index b94b49c..99fff32 100644 --- a/test/unit/org/apache/cassandra/schema/MockSchema.java +++ b/test/unit/org/apache/cassandra/schema/MockSchema.java @@ -118,7 +118,7 @@ public class MockSchema } SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) - .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, header) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header) .get(MetadataType.STATS); SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 44bd58c..8f8fe6d 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -32,7 +32,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; @@ -226,81 +225,7 @@ public class ActiveRepairServiceTest } @Test - public void testGetActiveRepairedSSTableRefs() - { - ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getLiveSSTables(); - - UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, 0, false); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - prs.markSSTablesRepairing(store.metadata.id, prsId); - - //retrieve all sstable references from parent repair sessions - Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId); - Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); - assertEquals(original, retrieved); - refs.release(); - - //remove 1 sstable from data data tracker - Set<SSTableReader> newLiveSet = new HashSet<>(original); - Iterator<SSTableReader> it = newLiveSet.iterator(); - final SSTableReader removed = it.next(); - it.remove(); - store.getTracker().dropSSTables(new com.google.common.base.Predicate<SSTableReader>() - { - public boolean apply(SSTableReader reader) - { - return removed.equals(reader); - } - }, OperationType.COMPACTION, null); - - //retrieve sstable references from parent repair session again - removed sstable must not be present - refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId); - retrieved = Sets.newHashSet(refs.iterator()); - assertEquals(newLiveSet, retrieved); - assertFalse(retrieved.contains(removed)); - refs.release(); - } - - @Test - public void testAddingMoreSSTables() - { - ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); - UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); - - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - prs.markSSTablesRepairing(store.metadata.id, prsId); - try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId)) - { - Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); - assertEquals(original, retrieved); - } - createSSTables(store, 2); - boolean exception = false; - try - { - UUID newPrsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); - ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.id, newPrsId); - } - catch (Throwable t) - { - exception = true; - } - assertTrue(exception); - - try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId)) - { - Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); - assertEquals(original, retrieved); - } - } - - @Test - public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException + public void testSnapshotAddSSTables() throws Exception { ColumnFamilyStore store = prepareColumnFamilyStore(); UUID prsId = UUID.randomUUID(); @@ -312,40 +237,7 @@ public class ActiveRepairServiceTest ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); createSSTables(store, 2); ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); - try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId)) - { - assertEquals(original, Sets.newHashSet(refs.iterator())); - } - store.forceMajorCompaction(); - // after a major compaction the original sstables will be gone and we will have no sstables to anticompact: - try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId)) - { - assertEquals(0, refs.size()); - } - } - - @Test - public void testSnapshotMultipleRepairs() - { - ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); - UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); - ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); - - UUID prsId2 = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); - boolean exception = false; - try - { - ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.id, prsId2); - } - catch (Throwable t) - { - exception = true; - } - assertTrue(exception); - try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId)) + try (Refs<SSTableReader> refs = store.getSnapshotSSTableReaders(prsId.toString())) { assertEquals(original, Sets.newHashSet(refs.iterator())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index ffe26a2..682e039 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -74,7 +74,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(peer, peer, null, 0, true, false); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -120,9 +120,9 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false, null); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(peer, peer, null, 0, true, false); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);