http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java new file mode 100644 index 0000000..d10b9c5 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.StatusRequest; +import org.apache.cassandra.repair.messages.StatusResponse; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; + +/** + * Manages all consistent repair sessions a node is participating in. + * <p/> + * Since sessions need to be loaded, and since we need to handle cases where sessions might not exist, most of the logic + * around local sessions is implemented in this class, with the LocalSession class being treated more like a simple struct, + * in contrast with {@link CoordinatorSession} + */ +public class LocalSessions +{ + private static final Logger logger = LoggerFactory.getLogger(LocalSessions.class); + + /** + * Amount of time a session can go without any activity before we start checking the status of other + * participants to see if we've missed a message + */ + static final int CHECK_STATUS_TIMEOUT = Integer.getInteger("cassandra.repair_status_check_timeout_seconds", + Ints.checkedCast(TimeUnit.HOURS.toSeconds(1))); + + /** + * Amount of time a session can go without any activity before being automatically set to FAILED + */ + static final int AUTO_FAIL_TIMEOUT = Integer.getInteger("cassandra.repair_fail_timeout_seconds", + Ints.checkedCast(TimeUnit.DAYS.toSeconds(1))); + + /** + * Amount of time a completed session is kept around after completion before being deleted, this gives + * compaction plenty of time to move sstables from successful sessions into the repaired bucket + */ + static final int AUTO_DELETE_TIMEOUT = Integer.getInteger("cassandra.repair_delete_timeout_seconds", + Ints.checkedCast(TimeUnit.DAYS.toSeconds(1))); + /** + * How often LocalSessions.cleanup is run + */ + public static final int CLEANUP_INTERVAL = Integer.getInteger("cassandra.repair_cleanup_interval_seconds", + Ints.checkedCast(TimeUnit.MINUTES.toSeconds(10))); + + private static Set<TableId> uuidToTableId(Set<UUID> src) + { + return ImmutableSet.copyOf(Iterables.transform(src, TableId::fromUUID)); + } + + private static Set<UUID> tableIdToUuid(Set<TableId> src) + { + return ImmutableSet.copyOf(Iterables.transform(src, TableId::asUUID)); + } + + private final String keyspace = SchemaConstants.SYSTEM_KEYSPACE_NAME; + private final String table = SystemKeyspace.REPAIRS; + private boolean started = false; + private volatile ImmutableMap<UUID, LocalSession> sessions = ImmutableMap.of(); + + @VisibleForTesting + int getNumSessions() + { + return sessions.size(); + } + + @VisibleForTesting + protected InetAddress getBroadcastAddress() + { + return FBUtilities.getBroadcastAddress(); + } + + @VisibleForTesting + protected boolean isAlive(InetAddress address) + { + return FailureDetector.instance.isAlive(address); + } + + @VisibleForTesting + protected boolean isNodeInitialized() + { + return StorageService.instance.isInitialized(); + } + + public List<Map<String, String>> sessionInfo(boolean all) + { + Iterable<LocalSession> currentSessions = sessions.values(); + if (!all) + { + currentSessions = Iterables.filter(currentSessions, s -> !s.isCompleted()); + } + return Lists.newArrayList(Iterables.transform(currentSessions, LocalSessionInfo::sessionToMap)); + } + + /** + * hook for operators to cancel sessions, cancelling from a non-coordinator is an error, unless + * force is set to true. Messages are sent out to other participants, but we don't wait for a response + */ + public void cancelSession(UUID sessionID, boolean force) + { + logger.debug("cancelling session {}", sessionID); + LocalSession session = getSession(sessionID); + Preconditions.checkArgument(session != null, "Session {} does not exist", sessionID); + Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddress()), + "Cancel session %s from it's coordinator (%s) or use --force", + sessionID, session.coordinator); + + setStateAndSave(session, FAILED); + for (InetAddress participant : session.participants) + { + if (!participant.equals(getBroadcastAddress())) + sendMessage(participant, new FailSession(sessionID)); + } + } + + /** + * Loads sessions out of the repairs table and sets state to started + */ + public synchronized void start() + { + Preconditions.checkArgument(!started, "LocalSessions.start can only be called once"); + Preconditions.checkArgument(sessions.isEmpty(), "No sessions should be added before start"); + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", keyspace, table), 1000); + Map<UUID, LocalSession> loadedSessions = new HashMap<>(); + for (UntypedResultSet.Row row : rows) + { + try + { + LocalSession session = load(row); + loadedSessions.put(session.sessionID, session); + } + catch (IllegalArgumentException | NullPointerException e) + { + logger.warn("Unable to load malformed repair session {}, ignoring", row.has("parent_id") ? row.getUUID("parent_id") : null); + } + } + sessions = ImmutableMap.copyOf(loadedSessions); + started = true; + } + + public boolean isStarted() + { + return started; + } + + private static boolean shouldCheckStatus(LocalSession session, int now) + { + return !session.isCompleted() && (now > session.getLastUpdate() + CHECK_STATUS_TIMEOUT); + } + + private static boolean shouldFail(LocalSession session, int now) + { + return !session.isCompleted() && (now > session.getLastUpdate() + AUTO_FAIL_TIMEOUT); + } + + private static boolean shouldDelete(LocalSession session, int now) + { + return session.isCompleted() && (now > session.getLastUpdate() + AUTO_DELETE_TIMEOUT); + } + + /** + * Auto fails and auto deletes timed out and old sessions + * Compaction will clean up the sstables still owned by a deleted session + */ + public void cleanup() + { + logger.debug("Running LocalSessions.cleanup"); + if (!isNodeInitialized()) + { + logger.debug("node not initialized, aborting local session cleanup"); + return; + } + Set<LocalSession> currentSessions = new HashSet<>(sessions.values()); + for (LocalSession session : currentSessions) + { + synchronized (session) + { + int now = FBUtilities.nowInSeconds(); + if (shouldFail(session, now)) + { + logger.warn("Auto failing timed out repair session {}", session); + failSession(session.sessionID, false); + } + else if (shouldDelete(session, now)) + { + logger.warn("Auto deleting repair session {}", session); + deleteSession(session.sessionID); + } + else if (shouldCheckStatus(session, now)) + { + sendStatusRequest(session); + } + } + } + } + + private static ByteBuffer serializeRange(Range<Token> range) + { + int size = (int) Token.serializer.serializedSize(range.left, 0); + size += (int) Token.serializer.serializedSize(range.right, 0); + try (DataOutputBuffer buffer = new DataOutputBuffer(size)) + { + Token.serializer.serialize(range.left, buffer, 0); + Token.serializer.serialize(range.right, buffer, 0); + return buffer.buffer(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static Set<ByteBuffer> serializeRanges(Set<Range<Token>> ranges) + { + Set<ByteBuffer> buffers = new HashSet<>(ranges.size()); + ranges.forEach(r -> buffers.add(serializeRange(r))); + return buffers; + } + + private static Range<Token> deserializeRange(ByteBuffer bb) + { + try (DataInputBuffer in = new DataInputBuffer(bb, false)) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + Token left = Token.serializer.deserialize(in, partitioner, 0); + Token right = Token.serializer.deserialize(in, partitioner, 0); + return new Range<>(left, right); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers) + { + Set<Range<Token>> ranges = new HashSet<>(buffers.size()); + buffers.forEach(bb -> ranges.add(deserializeRange(bb))); + return ranges; + } + + /** + * Save session state to table + */ + @VisibleForTesting + void save(LocalSession session) + { + String query = "INSERT INTO %s.%s " + + "(parent_id, " + + "started_at, " + + "last_update, " + + "repaired_at, " + + "state, " + + "coordinator, " + + "participants, " + + "ranges, " + + "cfids) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + QueryProcessor.executeInternal(String.format(query, keyspace, table), + session.sessionID, + Date.from(Instant.ofEpochSecond(session.startedAt)), + Date.from(Instant.ofEpochSecond(session.getLastUpdate())), + Date.from(Instant.ofEpochMilli(session.repairedAt)), + session.getState().ordinal(), + session.coordinator, + session.participants, + serializeRanges(session.ranges), + tableIdToUuid(session.tableIds)); + } + + private static int dateToSeconds(Date d) + { + return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(d.getTime())); + } + + private LocalSession load(UntypedResultSet.Row row) + { + LocalSession.Builder builder = LocalSession.builder(); + builder.withState(ConsistentSession.State.valueOf(row.getInt("state"))); + builder.withSessionID(row.getUUID("parent_id")); + builder.withCoordinator(row.getInetAddress("coordinator")); + builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance))); + builder.withRepairedAt(row.getTimestamp("repaired_at").getTime()); + builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance))); + builder.withParticipants(row.getSet("participants", InetAddressType.instance)); + + builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at"))); + builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update"))); + + return buildSession(builder); + } + + private void deleteRow(UUID sessionID) + { + String query = "DELETE FROM %s.%s WHERE parent_id=?"; + QueryProcessor.executeInternal(String.format(query, keyspace, table), sessionID); + } + + /** + * Loads a session directly from the table. Should be used for testing only + */ + @VisibleForTesting + LocalSession loadUnsafe(UUID sessionId) + { + String query = "SELECT * FROM %s.%s WHERE parent_id=?"; + UntypedResultSet result = QueryProcessor.executeInternal(String.format(query, keyspace, table), sessionId); + if (result.isEmpty()) + return null; + + UntypedResultSet.Row row = result.one(); + return load(row); + } + + @VisibleForTesting + protected LocalSession buildSession(LocalSession.Builder builder) + { + return new LocalSession(builder); + } + + protected LocalSession getSession(UUID sessionID) + { + return sessions.get(sessionID); + } + + @VisibleForTesting + synchronized void putSessionUnsafe(LocalSession session) + { + putSession(session); + save(session); + } + + private synchronized void putSession(LocalSession session) + { + Preconditions.checkArgument(!sessions.containsKey(session.sessionID), + "LocalSession {} already exists", session.sessionID); + Preconditions.checkArgument(started, "sessions cannot be added before LocalSessions is started"); + sessions = ImmutableMap.<UUID, LocalSession>builder() + .putAll(sessions) + .put(session.sessionID, session) + .build(); + } + + private synchronized void removeSession(UUID sessionID) + { + Preconditions.checkArgument(sessionID != null); + Map<UUID, LocalSession> temp = new HashMap<>(sessions); + temp.remove(sessionID); + sessions = ImmutableMap.copyOf(temp); + } + + @VisibleForTesting + LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers) + { + LocalSession.Builder builder = LocalSession.builder(); + builder.withState(ConsistentSession.State.PREPARING); + builder.withSessionID(sessionId); + builder.withCoordinator(prs.coordinator); + + builder.withTableIds(prs.getTableIds()); + builder.withRepairedAt(prs.repairedAt); + builder.withRanges(prs.getRanges()); + builder.withParticipants(peers); + + int now = FBUtilities.nowInSeconds(); + builder.withStartedAt(now); + builder.withLastUpdate(now); + + return buildSession(builder); + } + + protected ActiveRepairService.ParentRepairSession getParentRepairSession(UUID sessionID) + { + return ActiveRepairService.instance.getParentRepairSession(sessionID); + } + + protected void sendMessage(InetAddress destination, RepairMessage message) + { + logger.debug("sending {} to {}", message, destination); + MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer); + MessagingService.instance().sendOneWay(messageOut, destination); + } + + private void setStateAndSave(LocalSession session, ConsistentSession.State state) + { + synchronized (session) + { + Preconditions.checkArgument(session.getState().canTransitionTo(state), + "Invalid state transition %s -> %s", + session.getState(), state); + logger.debug("Setting LocalSession state from {} -> {} for {}", session.getState(), state, session.sessionID); + session.setState(state); + session.setLastUpdate(); + save(session); + } + } + + public void failSession(UUID sessionID) + { + failSession(sessionID, true); + } + + public void failSession(UUID sessionID, boolean sendMessage) + { + logger.debug("failing session {}", sessionID); + LocalSession session = getSession(sessionID); + if (session != null) + { + setStateAndSave(session, FAILED); + if (sendMessage) + { + sendMessage(session.coordinator, new FailSession(sessionID)); + } + } + } + + public synchronized void deleteSession(UUID sessionID) + { + logger.debug("deleting session {}", sessionID); + LocalSession session = getSession(sessionID); + Preconditions.checkArgument(session.isCompleted(), "Cannot delete incomplete sessions"); + + deleteRow(sessionID); + removeSession(sessionID); + } + + @VisibleForTesting + ListenableFuture submitPendingAntiCompaction(LocalSession session, ExecutorService executor) + { + PendingAntiCompaction pac = new PendingAntiCompaction(session.sessionID, session.ranges, executor); + return pac.run(); + } + + /** + * The PrepareConsistentRequest effectively promotes the parent repair session to a consistent + * incremental session, and begins the 'pending anti compaction' which moves all sstable data + * that is to be repaired into it's own silo, preventing it from mixing with other data. + * + * No response is sent to the repair coordinator until the pending anti compaction has completed + * successfully. If the pending anti compaction fails, a failure message is sent to the coordinator, + * cancelling the session. + */ + public void handlePrepareMessage(InetAddress from, PrepareConsistentRequest request) + { + logger.debug("received {} from {}", request, from); + UUID sessionID = request.parentSession; + InetAddress coordinator = request.coordinator; + Set<InetAddress> peers = request.participants; + + ActiveRepairService.ParentRepairSession parentSession; + try + { + parentSession = getParentRepairSession(sessionID); + } + catch (Throwable e) + { + logger.debug("Error retrieving ParentRepairSession for session {}, responding with failure", sessionID); + sendMessage(coordinator, new FailSession(sessionID)); + return; + } + + LocalSession session = createSessionUnsafe(sessionID, parentSession, peers); + putSessionUnsafe(session); + logger.debug("created local session for {}", sessionID); + + ExecutorService executor = Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size()); + + ListenableFuture pendingAntiCompaction = submitPendingAntiCompaction(session, executor); + Futures.addCallback(pendingAntiCompaction, new FutureCallback() + { + public void onSuccess(@Nullable Object result) + { + logger.debug("pending anti-compaction for {} completed", sessionID); + setStateAndSave(session, PREPARED); + sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), true)); + executor.shutdown(); + } + + public void onFailure(Throwable t) + { + logger.debug("pending anti-compaction for {} failed", sessionID); + failSession(sessionID); + executor.shutdown(); + } + }); + } + + public void maybeSetRepairing(UUID sessionID) + { + LocalSession session = getSession(sessionID); + if (session != null && session.getState() != REPAIRING) + { + logger.debug("Setting local session {} to REPAIRING", session); + setStateAndSave(session, REPAIRING); + } + } + + public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose propose) + { + logger.debug("received {} from {}", propose, from); + UUID sessionID = propose.sessionID; + LocalSession session = getSession(sessionID); + if (session == null) + { + logger.debug("No LocalSession found for session {}, responding with failure", sessionID); + sendMessage(from, new FailSession(sessionID)); + return; + } + + try + { + setStateAndSave(session, FINALIZE_PROMISED); + sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddress(), true)); + } + catch (IllegalArgumentException e) + { + logger.error("error setting session to FINALIZE_PROMISED", e); + failSession(sessionID); + } + } + + /** + * Finalizes the repair session, completing it as successful. + * + * This only changes the state of the session, it doesn't promote the siloed sstables to repaired. That will happen + * as part of the compaction process, and avoids having to worry about in progress compactions interfering with the + * promotion. + */ + public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit commit) + { + logger.debug("received {} from {}", commit, from); + UUID sessionID = commit.sessionID; + LocalSession session = getSession(sessionID); + if (session == null) + { + logger.warn("Received finalize commit message for unknown session {}", sessionID); + return; + } + + setStateAndSave(session, FINALIZED); + } + + public void handleFailSessionMessage(InetAddress from, FailSession msg) + { + logger.debug("received {} from {}", msg, from); + failSession(msg.sessionID, false); + } + + public void sendStatusRequest(LocalSession session) + { + StatusRequest request = new StatusRequest(session.sessionID); + for (InetAddress participant : session.participants) + { + if (!getBroadcastAddress().equals(participant) && isAlive(participant)) + { + sendMessage(participant, request); + } + } + } + + public void handleStatusRequest(InetAddress from, StatusRequest request) + { + logger.debug("received {} from {}", request, from); + UUID sessionID = request.sessionID; + LocalSession session = getSession(sessionID); + if (session == null) + { + logger.warn("Received status response message for unknown session {}", sessionID); + sendMessage(from, new StatusResponse(sessionID, FAILED)); + } + else + { + sendMessage(from, new StatusResponse(sessionID, session.getState())); + } + } + + public void handleStatusResponse(InetAddress from, StatusResponse response) + { + logger.debug("received {} from {}", response, from); + UUID sessionID = response.sessionID; + LocalSession session = getSession(sessionID); + if (session == null) + { + logger.warn("Received status response message for unknown session {}", sessionID); + return; + } + + // only change local state if response state is FINALIZED or FAILED, since those are + // the only statuses that would indicate we've missed a message completing the session + if (response.state == FINALIZED || response.state == FAILED) + { + setStateAndSave(session, response.state); + } + else + { + logger.debug("{} is not actionable"); + } + } + + /** + * determines if a local session exists, and if it's not finalized or failed + */ + public boolean isSessionInProgress(UUID sessionID) + { + LocalSession session = getSession(sessionID); + return session != null && session.getState() != FINALIZED && session.getState() != FAILED; + } + + /** + * Returns the repairedAt time for a sessions which is unknown, failed, or finalized + * calling this for a session which is in progress throws an exception + */ + public long getFinalSessionRepairedAt(UUID sessionID) + { + LocalSession session = getSession(sessionID); + if (session == null || session.getState() == FAILED) + { + return ActiveRepairService.UNREPAIRED_SSTABLE; + } + else if (session.getState() == FINALIZED) + { + return session.repairedAt; + } + else + { + throw new IllegalStateException("Cannot get final repaired at value for in progress session: " + session); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java new file mode 100644 index 0000000..5203c41 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Performs an anti compaction on a set of tables and token ranges, isolating the unrepaired sstables + * for a give token range into a pending repair group so they can't be compacted with other sstables + * while they are being repaired. + */ +public class PendingAntiCompaction +{ + private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class); + + static class AcquireResult + { + final ColumnFamilyStore cfs; + final Refs<SSTableReader> refs; + final LifecycleTransaction txn; + + AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, LifecycleTransaction txn) + { + this.cfs = cfs; + this.refs = refs; + this.txn = txn; + } + + void abort() + { + txn.abort(); + refs.release(); + } + } + + static class AcquisitionCallable implements Callable<AcquireResult> + { + private final ColumnFamilyStore cfs; + private final Collection<Range<Token>> ranges; + private final UUID sessionID; + + public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID) + { + this.cfs = cfs; + this.ranges = ranges; + this.sessionID = sessionID; + } + + private Iterable<SSTableReader> getSSTables() + { + return Iterables.filter(cfs.getLiveSSTables(), s -> !s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges)); + } + + private AcquireResult acquireTuple() + { + List<SSTableReader> sstables = Lists.newArrayList(getSSTables()); + if (sstables.isEmpty()) + return new AcquireResult(cfs, null, null); + + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + if (txn != null) + return new AcquireResult(cfs, Refs.ref(sstables), txn); + else + return null; + } + + public AcquireResult call() throws Exception + { + logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID); + AcquireResult refTxn = acquireTuple(); + if (refTxn != null) + return refTxn; + + // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions for + // up to a minute, after which point, null will be returned + return cfs.runWithCompactionsDisabled(this::acquireTuple, false, false); + } + } + + static class AcquisitionCallback implements AsyncFunction<List<AcquireResult>, Object> + { + private final UUID parentRepairSession; + private final Collection<Range<Token>> ranges; + + public AcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + { + this.parentRepairSession = parentRepairSession; + this.ranges = ranges; + } + + ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result) + { + return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, result.refs, result.txn, parentRepairSession); + } + + public ListenableFuture apply(List<AcquireResult> results) throws Exception + { + if (Iterables.any(results, t -> t == null)) + { + // Release all sstables, and report failure back to coordinator + for (AcquireResult result : results) + { + if (result != null) + { + logger.info("Releasing acquired sstables for {}.{}", result.cfs.metadata.keyspace, result.cfs.metadata.name); + result.abort(); + } + } + return Futures.immediateFailedFuture(new RuntimeException("unable to acquire sstables")); + } + else + { + List<ListenableFuture<?>> pendingAntiCompactions = new ArrayList<>(results.size()); + for (AcquireResult result : results) + { + if (result.txn != null) + { + ListenableFuture<?> future = submitPendingAntiCompaction(result); + pendingAntiCompactions.add(future); + } + } + + return Futures.allAsList(pendingAntiCompactions); + } + } + } + + private final UUID prsId; + private final Collection<Range<Token>> ranges; + private final ExecutorService executor; + + public PendingAntiCompaction(UUID prsId, Collection<Range<Token>> ranges, ExecutorService executor) + { + this.prsId = prsId; + this.ranges = ranges; + this.executor = executor; + } + + public ListenableFuture run() + { + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + List<ListenableFutureTask<AcquireResult>> tasks = new ArrayList<>(); + for (ColumnFamilyStore cfs : prs.getColumnFamilyStores()) + { + cfs.forceBlockingFlush(); + ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId)); + executor.submit(task); + tasks.add(task); + } + ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks); + ListenableFuture compactionResult = Futures.transform(acquisitionResults, new AcquisitionCallback(prsId, ranges)); + return compactionResult; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java deleted file mode 100644 index a29cc87..0000000 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair.messages; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.UUID; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.UUIDSerializer; - -public class AnticompactionRequest extends RepairMessage -{ - public static MessageSerializer serializer = new AnticompactionRequestSerializer(); - public final UUID parentRepairSession; - /** - * Successfully repaired ranges. Does not contain null. - */ - public final Collection<Range<Token>> successfulRanges; - - public AnticompactionRequest(UUID parentRepairSession, Collection<Range<Token>> ranges) - { - super(Type.ANTICOMPACTION_REQUEST, null); - this.parentRepairSession = parentRepairSession; - this.successfulRanges = ranges; - } - - @Override - public boolean equals(Object o) - { - if (!(o instanceof AnticompactionRequest)) - return false; - AnticompactionRequest other = (AnticompactionRequest)o; - return messageType == other.messageType && - parentRepairSession.equals(other.parentRepairSession) && - successfulRanges.equals(other.successfulRanges); - } - - @Override - public int hashCode() - { - return Objects.hash(messageType, parentRepairSession, successfulRanges); - } - - public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest> - { - public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException - { - UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); - out.writeInt(message.successfulRanges.size()); - for (Range<Token> r : message.successfulRanges) - { - MessagingService.validatePartitioner(r); - Range.tokenSerializer.serialize(r, out, version); - } - } - - public AnticompactionRequest deserialize(DataInputPlus in, int version) throws IOException - { - UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); - int rangeCount = in.readInt(); - List<Range<Token>> ranges = new ArrayList<>(rangeCount); - for (int i = 0; i < rangeCount; i++) - ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); - return new AnticompactionRequest(parentRepairSession, ranges); - } - - public long serializedSize(AnticompactionRequest message, int version) - { - long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); - size += Integer.BYTES; // count of items in successfulRanges - for (Range<Token> r : message.successfulRanges) - size += Range.tokenSerializer.serializedSize(r, version); - return size; - } - } - - @Override - public String toString() - { - return "AnticompactionRequest{" + - "parentRepairSession=" + parentRepairSession + - "} " + super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FailSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java b/src/java/org/apache/cassandra/repair/messages/FailSession.java new file mode 100644 index 0000000..1227cc3 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.UUIDSerializer; + +public class FailSession extends RepairMessage +{ + public final UUID sessionID; + + public FailSession(UUID sessionID) + { + super(Type.FAILED_SESSION, null); + assert sessionID != null; + this.sessionID = sessionID; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FailSession that = (FailSession) o; + + return sessionID.equals(that.sessionID); + } + + public int hashCode() + { + return sessionID.hashCode(); + } + + public static final MessageSerializer serializer = new MessageSerializer<FailSession>() + { + public void serialize(FailSession msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + } + + public FailSession deserialize(DataInputPlus in, int version) throws IOException + { + return new FailSession(UUIDSerializer.serializer.deserialize(in, version)); + } + + public long serializedSize(FailSession msg, int version) + { + return UUIDSerializer.serializer.serializedSize(msg.sessionID, version); + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java new file mode 100644 index 0000000..a4eb111 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.UUIDSerializer; + +public class FinalizeCommit extends RepairMessage +{ + public final UUID sessionID; + + public FinalizeCommit(UUID sessionID) + { + super(Type.FINALIZE_COMMIT, null); + assert sessionID != null; + this.sessionID = sessionID; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FinalizeCommit that = (FinalizeCommit) o; + + return sessionID.equals(that.sessionID); + } + + public int hashCode() + { + return sessionID.hashCode(); + } + + public String toString() + { + return "FinalizeCommit{" + + "sessionID=" + sessionID + + '}'; + } + + public static MessageSerializer serializer = new MessageSerializer<FinalizeCommit>() + { + public void serialize(FinalizeCommit msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + } + + public FinalizeCommit deserialize(DataInputPlus in, int version) throws IOException + { + return new FinalizeCommit(UUIDSerializer.serializer.deserialize(in, version)); + } + + public long serializedSize(FinalizeCommit msg, int version) + { + return UUIDSerializer.serializer.serializedSize(msg.sessionID, version); + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java new file mode 100644 index 0000000..6c28347 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.serializers.InetAddressSerializer; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDSerializer; + +public class FinalizePromise extends RepairMessage +{ + public final UUID sessionID; + public final InetAddress participant; + public final boolean promised; + + public FinalizePromise(UUID sessionID, InetAddress participant, boolean promised) + { + super(Type.FINALIZE_PROMISE, null); + assert sessionID != null; + assert participant != null; + this.sessionID = sessionID; + this.participant = participant; + this.promised = promised; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FinalizePromise that = (FinalizePromise) o; + + if (promised != that.promised) return false; + if (!sessionID.equals(that.sessionID)) return false; + return participant.equals(that.participant); + } + + public int hashCode() + { + int result = sessionID.hashCode(); + result = 31 * result + participant.hashCode(); + result = 31 * result + (promised ? 1 : 0); + return result; + } + + public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>() + { + private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; + + public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), out); + out.writeBoolean(msg.promised); + } + + public FinalizePromise deserialize(DataInputPlus in, int version) throws IOException + { + return new FinalizePromise(UUIDSerializer.serializer.deserialize(in, version), + inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)), + in.readBoolean()); + } + + public long serializedSize(FinalizePromise msg, int version) + { + long size = UUIDSerializer.serializer.serializedSize(msg.sessionID, version); + size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant)); + size += TypeSizes.sizeof(msg.promised); + return size; + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java new file mode 100644 index 0000000..c0c49df --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.UUIDSerializer; + +public class FinalizePropose extends RepairMessage +{ + public final UUID sessionID; + + public FinalizePropose(UUID sessionID) + { + super(Type.FINALIZE_PROPOSE, null); + assert sessionID != null; + this.sessionID = sessionID; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FinalizePropose that = (FinalizePropose) o; + + return sessionID.equals(that.sessionID); + } + + public int hashCode() + { + return sessionID.hashCode(); + } + + public String toString() + { + return "FinalizePropose{" + + "sessionID=" + sessionID + + '}'; + } + + public static MessageSerializer serializer = new MessageSerializer<FinalizePropose>() + { + public void serialize(FinalizePropose msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + } + + public FinalizePropose deserialize(DataInputPlus in, int version) throws IOException + { + return new FinalizePropose(UUIDSerializer.serializer.deserialize(in, version)); + } + + public long serializedSize(FinalizePropose msg, int version) + { + return UUIDSerializer.serializer.serializedSize(msg.sessionID, version); + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java new file mode 100644 index 0000000..57056ef --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.serializers.InetAddressSerializer; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDSerializer; + +public class PrepareConsistentRequest extends RepairMessage +{ + public final UUID parentSession; + public final InetAddress coordinator; + public final Set<InetAddress> participants; + + public PrepareConsistentRequest(UUID parentSession, InetAddress coordinator, Set<InetAddress> participants) + { + super(Type.CONSISTENT_REQUEST, null); + assert parentSession != null; + assert coordinator != null; + assert participants != null && !participants.isEmpty(); + this.parentSession = parentSession; + this.coordinator = coordinator; + this.participants = ImmutableSet.copyOf(participants); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PrepareConsistentRequest that = (PrepareConsistentRequest) o; + + if (!parentSession.equals(that.parentSession)) return false; + if (!coordinator.equals(that.coordinator)) return false; + return participants.equals(that.participants); + } + + public int hashCode() + { + int result = parentSession.hashCode(); + result = 31 * result + coordinator.hashCode(); + result = 31 * result + participants.hashCode(); + return result; + } + + public String toString() + { + return "PrepareConsistentRequest{" + + "parentSession=" + parentSession + + ", coordinator=" + coordinator + + ", participants=" + participants + + '}'; + } + + public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>() + { + private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; + + public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(request.parentSession, out, version); + ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator), out); + out.writeInt(request.participants.size()); + for (InetAddress peer : request.participants) + { + ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out); + } + } + + public PrepareConsistentRequest deserialize(DataInputPlus in, int version) throws IOException + { + UUID sessionId = UUIDSerializer.serializer.deserialize(in, version); + InetAddress coordinator = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)); + int numPeers = in.readInt(); + Set<InetAddress> peers = new HashSet<>(numPeers); + for (int i = 0; i < numPeers; i++) + { + InetAddress peer = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)); + peers.add(peer); + } + return new PrepareConsistentRequest(sessionId, coordinator, peers); + } + + public long serializedSize(PrepareConsistentRequest request, int version) + { + long size = UUIDSerializer.serializer.serializedSize(request.parentSession, version); + size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator)); + size += TypeSizes.sizeof(request.participants.size()); + for (InetAddress peer : request.participants) + { + size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer)); + } + return size; + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java new file mode 100644 index 0000000..cf4410a --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.serializers.InetAddressSerializer; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDSerializer; + +public class PrepareConsistentResponse extends RepairMessage +{ + public final UUID parentSession; + public final InetAddress participant; + public final boolean success; + + public PrepareConsistentResponse(UUID parentSession, InetAddress participant, boolean success) + { + super(Type.CONSISTENT_RESPONSE, null); + assert parentSession != null; + assert participant != null; + this.parentSession = parentSession; + this.participant = participant; + this.success = success; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PrepareConsistentResponse that = (PrepareConsistentResponse) o; + + if (success != that.success) return false; + if (!parentSession.equals(that.parentSession)) return false; + return participant.equals(that.participant); + } + + public int hashCode() + { + int result = parentSession.hashCode(); + result = 31 * result + participant.hashCode(); + result = 31 * result + (success ? 1 : 0); + return result; + } + + public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>() + { + private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; + public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(response.parentSession, out, version); + ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant), out); + out.writeBoolean(response.success); + } + + public PrepareConsistentResponse deserialize(DataInputPlus in, int version) throws IOException + { + return new PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version), + inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)), + in.readBoolean()); + } + + public long serializedSize(PrepareConsistentResponse response, int version) + { + long size = UUIDSerializer.serializer.serializedSize(response.parentSession, version); + size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant)); + size += TypeSizes.sizeof(response.success); + return size; + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index 55fdb66..3cb913a 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -37,16 +37,24 @@ public abstract class RepairMessage public static interface MessageSerializer<T extends RepairMessage> extends IVersionedSerializer<T> {} - public static enum Type + public enum Type { VALIDATION_REQUEST(0, ValidationRequest.serializer), VALIDATION_COMPLETE(1, ValidationComplete.serializer), SYNC_REQUEST(2, SyncRequest.serializer), SYNC_COMPLETE(3, SyncComplete.serializer), - ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer), PREPARE_MESSAGE(5, PrepareMessage.serializer), SNAPSHOT(6, SnapshotMessage.serializer), - CLEANUP(7, CleanupMessage.serializer); + CLEANUP(7, CleanupMessage.serializer), + + CONSISTENT_REQUEST(8, PrepareConsistentRequest.serializer), + CONSISTENT_RESPONSE(9, PrepareConsistentResponse.serializer), + FINALIZE_PROPOSE(10, FinalizePropose.serializer), + FINALIZE_PROMISE(11, FinalizePromise.serializer), + FINALIZE_COMMIT(12, FinalizeCommit.serializer), + FAILED_SESSION(13, FailSession.serializer), + STATUS_REQUEST(14, StatusRequest.serializer), + STATUS_RESPONSE(15, StatusResponse.serializer); private final byte type; private final MessageSerializer<RepairMessage> serializer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index ced6e43..3b13cd8 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -237,6 +237,11 @@ public class RepairOption } } + if (option.isIncremental() && !option.isGlobal()) + { + throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges"); + } + return option; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/StatusRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java new file mode 100644 index 0000000..f6a2b82 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.UUIDSerializer; + +public class StatusRequest extends RepairMessage +{ + public final UUID sessionID; + + public StatusRequest(UUID sessionID) + { + super(Type.STATUS_REQUEST, null); + this.sessionID = sessionID; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + StatusRequest request = (StatusRequest) o; + + return sessionID.equals(request.sessionID); + } + + public int hashCode() + { + return sessionID.hashCode(); + } + + public String toString() + { + return "StatusRequest{" + + "sessionID=" + sessionID + + '}'; + } + + public static MessageSerializer serializer = new MessageSerializer<StatusRequest>() + { + public void serialize(StatusRequest msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + } + + public StatusRequest deserialize(DataInputPlus in, int version) throws IOException + { + return new StatusRequest(UUIDSerializer.serializer.deserialize(in, version)); + } + + public long serializedSize(StatusRequest msg, int version) + { + return UUIDSerializer.serializer.serializedSize(msg.sessionID, version); + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/StatusResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java new file mode 100644 index 0000000..99eb76b --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.repair.consistent.ConsistentSession; +import org.apache.cassandra.utils.UUIDSerializer; + +public class StatusResponse extends RepairMessage +{ + public final UUID sessionID; + public final ConsistentSession.State state; + + public StatusResponse(UUID sessionID, ConsistentSession.State state) + { + super(Type.STATUS_RESPONSE, null); + assert sessionID != null; + assert state != null; + this.sessionID = sessionID; + this.state = state; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + StatusResponse that = (StatusResponse) o; + + if (!sessionID.equals(that.sessionID)) return false; + return state == that.state; + } + + public int hashCode() + { + int result = sessionID.hashCode(); + result = 31 * result + state.hashCode(); + return result; + } + + public String toString() + { + return "StatusResponse{" + + "sessionID=" + sessionID + + ", state=" + state + + '}'; + } + + public static final MessageSerializer serializer = new MessageSerializer<StatusResponse>() + { + public void serialize(StatusResponse msg, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(msg.sessionID, out, version); + out.writeInt(msg.state.ordinal()); + } + + public StatusResponse deserialize(DataInputPlus in, int version) throws IOException + { + return new StatusResponse(UUIDSerializer.serializer.deserialize(in, version), + ConsistentSession.State.valueOf(in.readInt())); + } + + public long serializedSize(StatusResponse msg, int version) + { + return UUIDSerializer.serializer.serializedSize(msg.sessionID, version) + + TypeSizes.sizeof(msg.state.ordinal()); + } + }; +}