Repository: cassandra Updated Branches: refs/heads/trunk 50ba850be -> 9308159bc
Cleanup isIncremental/repairedAt usage Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13430 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9308159b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9308159b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9308159b Branch: refs/heads/trunk Commit: 9308159bc87d29c33e64dbcfcefa948d7f5643dd Parents: 50ba850 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Fri Apr 7 10:38:56 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Apr 20 16:31:22 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/dht/RangeStreamer.java | 3 +- .../cassandra/io/sstable/SSTableLoader.java | 4 +- .../io/sstable/format/SSTableReader.java | 10 +++ .../net/IncomingStreamingConnection.java | 2 +- .../apache/cassandra/repair/LocalSyncTask.java | 40 +++++------ .../org/apache/cassandra/repair/RepairJob.java | 4 +- .../repair/RepairMessageVerbHandler.java | 6 +- .../apache/cassandra/repair/RepairRunnable.java | 6 +- .../apache/cassandra/repair/RepairSession.java | 3 - .../cassandra/repair/StreamingRepairTask.java | 23 +++--- .../cassandra/service/ActiveRepairService.java | 3 +- .../cassandra/streaming/ConnectionHandler.java | 1 - .../cassandra/streaming/StreamCoordinator.java | 15 ++-- .../apache/cassandra/streaming/StreamPlan.java | 20 +++--- .../cassandra/streaming/StreamReader.java | 13 +++- .../cassandra/streaming/StreamRequest.java | 9 +-- .../cassandra/streaming/StreamResultFuture.java | 7 +- .../cassandra/streaming/StreamSession.java | 36 +++------- .../cassandra/streaming/StreamTransferTask.java | 4 +- .../compress/CompressedStreamReader.java | 4 +- .../streaming/messages/FileMessageHeader.java | 20 +++++- .../streaming/messages/OutgoingFileMessage.java | 5 +- .../streaming/messages/StreamInitMessage.java | 9 +-- .../cassandra/dht/StreamStateStoreTest.java | 8 +-- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 4 +- .../cassandra/repair/LocalSyncTaskTest.java | 73 +++++++++++++++++--- .../cassandra/repair/RepairSessionTest.java | 2 +- .../repair/StreamingRepairTaskTest.java | 11 ++- .../cassandra/streaming/StreamSessionTest.java | 1 - .../streaming/StreamTransferTaskTest.java | 10 +-- .../streaming/StreamingTransferTest.java | 2 +- 33 files changed, 201 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2710060..e72c7a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 89a96cd..fd976c9 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -156,8 +156,7 @@ public class RangeStreamer this.tokens = tokens; this.address = address; this.description = streamOperation.getDescription(); - this.streamPlan = new StreamPlan(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost, - true, false, connectSequentially, null); + this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 759fa0f..e9ea35a 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); Ref<SSTableReader> ref = sstable.ref(); - StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE); + StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys); streamingDetails.put(endpoint, details); } @@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 568336e..4495edf 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1760,6 +1760,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR; } + public UUID getPendingRepair() + { + return sstableMetadata.pendingRepair; + } + + public long getRepairedAt() + { + return sstableMetadata.repairedAt; + } + public boolean intersects(Collection<Range<Token>> ranges) { Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index eee0042..e5fdc99 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.pendingRepair); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 3dd6532..06b1661 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.List; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,19 +48,33 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); - private final long repairedAt; private final UUID pendingRepair; - private final boolean pullRepair; - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, UUID pendingRepair, boolean pullRepair) + public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair) { super(desc, r1, r2); - this.repairedAt = repairedAt; this.pendingRepair = pendingRepair; this.pullRepair = pullRepair; } + + @VisibleForTesting + StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences) + { + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair) + .listeners(this) + .flushBeforeTransfer(pendingRepair == null) + .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node + if (!pullRepair) + { + // send ranges to the remote node if we are not performing a pull repair + plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); + } + + return plan; + } + /** * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback * that will be called out of band once the streams complete. @@ -73,24 +88,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); logger.info("[repair #{}] {}", desc.sessionId, message); - boolean isIncremental = false; - if (desc.parentSessionId != null) - { - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); - isIncremental = prs.isIncremental; - } Tracing.traceRepair(message); - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this) - .flushBeforeTransfer(true) - // request ranges from the remote node - .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); - if (!pullRepair) - { - // send ranges to the remote node if we are not performing a pull repair - plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); - } - plan.execute(); + createStreamPlan(dst, preferred, differences).execute(); } public void handleStreamEvent(StreamEvent event) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 07bc1e2..d1f6a94 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -40,7 +40,6 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final RepairSession session; private final RepairJobDesc desc; private final RepairParallelism parallelismDegree; - private final long repairedAt; private final ListeningExecutorService taskExecutor; private final boolean isConsistent; @@ -54,7 +53,6 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); - this.repairedAt = session.repairedAt; this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; this.isConsistent = isConsistent; @@ -130,7 +128,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable SyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, repairedAt, isConsistent ? desc.parentSessionId : null, session.pullRepair); + task = new LocalSyncTask(desc, r1, r2, isConsistent ? desc.parentSessionId : null, session.pullRepair); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 4f412f0..b6b9f87 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -135,11 +135,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); - long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; - if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) - repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt(); - - StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt, isConsistent(desc.parentSessionId)); + StreamingRepairTask task = new StreamingRepairTask(desc, request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null); task.run(); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 986bd7c..b8bef95 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -255,7 +255,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ListeningExecutorService executor = createExecutor(); // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables - final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, ActiveRepairService.UNREPAIRED_SSTABLE, false, executor, commonRanges, cfnames); + final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); // After all repair sessions completes(successful or not), // run anticompaction if necessary and send finish notice back to client @@ -301,7 +301,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ListeningExecutorService executor = createExecutor(); AtomicBoolean hasFailure = new AtomicBoolean(false); ListenableFuture repairResult = coordinatorSession.execute(executor, - () -> submitRepairSessions(parentSession, repairedAt, true, executor, commonRanges, cfnames), + () -> submitRepairSessions(parentSession, true, executor, commonRanges, cfnames), hasFailure); Collection<Range<Token>> ranges = new HashSet<>(); for (Collection<Range<Token>> range : Iterables.transform(commonRanges, cr -> cr.right)) @@ -312,7 +312,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, - long repairedAt, boolean isConsistent, ListeningExecutorService executor, List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, @@ -326,7 +325,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti keyspace, options.getParallelism(), p.left, - repairedAt, isConsistent, options.isPullRepair(), executor, http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 43a9bfb..2aa068c 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -90,7 +90,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement /** Range to repair */ public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; - public final long repairedAt; public final boolean isConsistent; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -124,7 +123,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, - long repairedAt, boolean isConsistent, boolean pullRepair, String... cfnames) @@ -138,7 +136,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.cfnames = cfnames; this.ranges = ranges; this.endpoints = endpoints; - this.repairedAt = repairedAt; this.isConsistent = isConsistent; this.pullRepair = pullRepair; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index c5f3c95..7042de1 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -18,6 +18,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; +import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -44,15 +45,13 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private final RepairJobDesc desc; private final SyncRequest request; - private final long repairedAt; - private final boolean isConsistent; + private final UUID pendingRepair; - public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long repairedAt, boolean isConsistent) + public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair) { this.desc = desc; this.request = request; - this.repairedAt = repairedAt; - this.isConsistent = isConsistent; + this.pendingRepair = pendingRepair; } public void run() @@ -60,21 +59,15 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler InetAddress dest = request.dst; InetAddress preferred = SystemKeyspace.getPreferredIP(dest); logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, request.ranges.size(), request.dst); - boolean isIncremental = false; - if (desc.parentSessionId != null) - { - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); - isIncremental = prs.isIncremental; - } - createStreamPlan(dest, preferred, isIncremental).execute(); + createStreamPlan(dest, preferred).execute(); } @VisibleForTesting - StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, boolean isIncremental) + StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred) { - return new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null) + return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair) .listeners(this) - .flushBeforeTransfer(!isIncremental) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary + .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily); // send ranges to the remote node } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index c03c470..fd98b37 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -165,7 +165,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, - long repairedAt, boolean isConsistent, boolean pullRepair, ListeningExecutorService executor, @@ -177,7 +176,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, isConsistent, pullRepair, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, cfnames); sessions.put(session.getId(), session); // register listeners http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 91f1249..86340a5 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -198,7 +198,6 @@ public class ConnectionHandler session.streamOperation(), !isOutgoingHandler, session.keepSSTableLevel(), - session.isIncremental(), session.getPendingRepair()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); DataOutputStreamPlus out = getWriteChannel(socket); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 81d0498..6aa34cd 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -47,17 +47,15 @@ public class StreamCoordinator private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; - private final boolean isIncremental; private Iterator<StreamSession> sessionsToConnect = null; private final UUID pendingRepair; - public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, - StreamConnectionFactory factory, boolean connectSequentially, UUID pendingRepair) + public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory, + boolean connectSequentially, UUID pendingRepair) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; - this.isIncremental = isIncremental; this.connectSequentially = connectSequentially; this.pendingRepair = pendingRepair; } @@ -251,6 +249,11 @@ public class StreamCoordinator return data; } + public UUID getPendingRepair() + { + return pendingRepair; + } + private static class StreamSessionConnector implements Runnable { private final StreamSession session; @@ -290,7 +293,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental, pendingRepair); + StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair); streamSessions.put(++lastReturned, session); return session; } @@ -322,7 +325,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental, pendingRepair); + session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index faaac0e..b5a6214 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -36,7 +36,6 @@ public class StreamPlan private final UUID planId = UUIDGen.getTimeUUID(); private final StreamOperation streamOperation; private final List<StreamEventHandler> handlers = new ArrayList<>(); - private final long repairedAt; private final StreamCoordinator coordinator; private boolean flushBeforeTransfer = true; @@ -48,20 +47,19 @@ public class StreamPlan */ public StreamPlan(StreamOperation streamOperation) { - this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null); + this(streamOperation, 1, false, false, null); } public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially) { - this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null); + this(streamOperation, 1, keepSSTableLevels, connectSequentially, null); } - public StreamPlan(StreamOperation streamOperation, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, - boolean isIncremental, boolean connectSequentially, UUID pendingRepair) + public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels, + boolean connectSequentially, UUID pendingRepair) { this.streamOperation = streamOperation; - this.repairedAt = repairedAt; - this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), + this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(), connectSequentially, pendingRepair); } @@ -92,7 +90,7 @@ public class StreamPlan public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(from, connecting); - session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt); + session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); return this; } @@ -133,7 +131,7 @@ public class StreamPlan public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(to, connecting); - session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt); + session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); return this; } @@ -203,9 +201,9 @@ public class StreamPlan return this; } - public long getRepairedAt() + public UUID getPendingRepair() { - return repairedAt; + return coordinator.getPendingRepair(); } public boolean getFlushBeforeTransfer() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 7d00e48..3a95015 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -23,6 +23,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.Collection; import java.util.UUID; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.UnmodifiableIterator; @@ -59,6 +60,7 @@ public class StreamReader protected final StreamSession session; protected final Version inputVersion; protected final long repairedAt; + protected final UUID pendingRepair; protected final SSTableFormat.Type format; protected final int sstableLevel; protected final SerializationHeader.Component header; @@ -66,12 +68,19 @@ public class StreamReader public StreamReader(FileMessageHeader header, StreamSession session) { + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair + // sstables if the session has a pending repair id + assert session.getPendingRepair().equals(header.pendingRepair); + } this.session = session; this.tableId = header.tableId; this.estimatedKeys = header.estimatedKeys; this.sections = header.sections; this.inputVersion = header.version; this.repairedAt = header.repairedAt; + this.pendingRepair = header.pendingRepair; this.format = header.format; this.sstableLevel = header.sstableLevel; this.header = header.header; @@ -97,14 +106,14 @@ public class StreamReader logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), - cfs.getTableName(), session.getPendingRepair()); + cfs.getTableName(), pendingRepair); TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel))); StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); SSTableMultiWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); while (in.getBytesRead() < totalSize) { writePartition(deserializer, writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index 93726e7..4a3761e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -38,13 +38,11 @@ public class StreamRequest public final String keyspace; public final Collection<Range<Token>> ranges; public final Collection<String> columnFamilies = new HashSet<>(); - public final long repairedAt; - public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt) + public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) { this.keyspace = keyspace; this.ranges = ranges; this.columnFamilies.addAll(columnFamilies); - this.repairedAt = repairedAt; } public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest> @@ -52,7 +50,6 @@ public class StreamRequest public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException { out.writeUTF(request.keyspace); - out.writeLong(request.repairedAt); out.writeInt(request.ranges.size()); for (Range<Token> range : request.ranges) { @@ -68,7 +65,6 @@ public class StreamRequest public StreamRequest deserialize(DataInputPlus in, int version) throws IOException { String keyspace = in.readUTF(); - long repairedAt = in.readLong(); int rangeCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangeCount); for (int i = 0; i < rangeCount; i++) @@ -81,13 +77,12 @@ public class StreamRequest List<String> columnFamilies = new ArrayList<>(cfCount); for (int i = 0; i < cfCount; i++) columnFamilies.add(in.readUTF()); - return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt); + return new StreamRequest(keyspace, ranges, columnFamilies); } public long serializedSize(StreamRequest request, int version) { int size = TypeSizes.sizeof(request.keyspace); - size += TypeSizes.sizeof(request.repairedAt); size += TypeSizes.sizeof(request.ranges.size()); for (Range<Token> range : request.ranges) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 4890b63..7845986 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -71,9 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair) + private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair) { - this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair)); + this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair)); } static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, @@ -107,7 +107,6 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> boolean isForOutgoing, int version, boolean keepSSTableLevel, - boolean isIncremental, UUID pendingRepair) throws IOException { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); @@ -116,7 +115,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, isIncremental, pendingRepair); + future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, connection, isForOutgoing, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 62fa317..adb8e79 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -163,7 +163,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber private AtomicBoolean isAborted = new AtomicBoolean(false); private final boolean keepSSTableLevel; - private final boolean isIncremental; private ScheduledFuture<?> keepAliveFuture = null; private final UUID pendingRepair; @@ -187,7 +186,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) + public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair) { this.peer = peer; this.connecting = connecting; @@ -198,7 +197,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber DatabaseDescriptor.getStreamingSocketTimeout()); this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; - this.isIncremental = isIncremental; this.pendingRepair = pendingRepair; } @@ -222,11 +220,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber return keepSSTableLevel; } - public boolean isIncremental() - { - return isIncremental; - } - public UUID getPendingRepair() { return pendingRepair; @@ -298,9 +291,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param ranges Ranges to retrieve data * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace. */ - public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt) + public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) { - requests.add(new StreamRequest(keyspace, ranges, columnFamilies, repairedAt)); + requests.add(new StreamRequest(keyspace, ranges, columnFamilies)); } /** @@ -312,9 +305,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param ranges Transfer ranges * @param columnFamilies Transfer ColumnFamilies * @param flushTables flush tables? - * @param repairedAt the time the repair started. */ - public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt) + public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables) { failIfFinished(); Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies); @@ -322,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, pendingRepair); + List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair); try { addTransferFiles(sections); @@ -364,7 +356,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber } @VisibleForTesting - public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, UUID pendingRepair) + public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair) { Refs<SSTableReader> refs = new Refs<>(); try @@ -410,13 +402,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber List<SSTableStreamingSections> sections = new ArrayList<>(refs.size()); for (SSTableReader sstable : refs) { - long repairedAt = overriddenRepairedAt; - if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) - repairedAt = sstable.getSSTableMetadata().repairedAt; - sections.add(new SSTableStreamingSections(refs.get(sstable), - sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges), - repairedAt)); + sections.add(new SSTableStreamingSections(refs.get(sstable), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges))); } return sections; } @@ -452,7 +438,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber if (task == null) task = newTask; } - task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt); + task.addTransferFile(details.ref, details.estimatedKeys, details.sections); iter.remove(); } } @@ -462,14 +448,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber public final Ref<SSTableReader> ref; public final List<Pair<Long, Long>> sections; public final long estimatedKeys; - public final long repairedAt; - public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt) + public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys) { this.ref = ref; this.sections = sections; this.estimatedKeys = estimatedKeys; - this.repairedAt = repairedAt; } } @@ -623,7 +607,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber // prepare tasks state(State.PREPARING); for (StreamRequest request : requests) - addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true, request.repairedAt); // always flush on stream request + addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request for (StreamSummary summary : summaries) prepareReceiving(summary); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index aa3251b..748da8b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -53,10 +53,10 @@ public class StreamTransferTask extends StreamTask super(session, tableId); } - public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections) { assert ref.get() != null && tableId.equals(ref.get().metadata().id); - OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); + OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel()); message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 3e53fa2..f8e4b40 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -73,7 +73,7 @@ public class CompressedStreamReader extends StreamReader } logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.", - session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), session.getPendingRepair(), + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, cfs.getTableName()); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, @@ -84,7 +84,7 @@ public class CompressedStreamReader extends StreamReader SSTableMultiWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); String filename = writer.getFilename(); int sectionIdx = 0; for (Pair<Long, Long> section : sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index f449982..c65e1d4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -20,6 +20,7 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; @@ -28,10 +29,10 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.compress.CompressionInfo; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDSerializer; /** * StreamingFileHeader is appended before sending actual data to describe what it's sending. @@ -57,6 +58,7 @@ public class FileMessageHeader public final CompressionInfo compressionInfo; private final CompressionMetadata compressionMetadata; public final long repairedAt; + public final UUID pendingRepair; public final int sstableLevel; public final SerializationHeader.Component header; @@ -71,6 +73,7 @@ public class FileMessageHeader List<Pair<Long, Long>> sections, CompressionInfo compressionInfo, long repairedAt, + UUID pendingRepair, int sstableLevel, SerializationHeader.Component header) { @@ -83,6 +86,7 @@ public class FileMessageHeader this.compressionInfo = compressionInfo; this.compressionMetadata = null; this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.sstableLevel = sstableLevel; this.header = header; this.size = calculateSize(); @@ -96,6 +100,7 @@ public class FileMessageHeader List<Pair<Long, Long>> sections, CompressionMetadata compressionMetadata, long repairedAt, + UUID pendingRepair, int sstableLevel, SerializationHeader.Component header) { @@ -108,6 +113,7 @@ public class FileMessageHeader this.compressionInfo = null; this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; this.sstableLevel = sstableLevel; this.header = header; this.size = calculateSize(); @@ -159,6 +165,7 @@ public class FileMessageHeader sb.append(", transfer size: ").append(size()); sb.append(", compressed?: ").append(isCompressed()); sb.append(", repairedAt: ").append(repairedAt); + sb.append(", pendingRepair: ").append(pendingRepair); sb.append(", level: ").append(sstableLevel); sb.append(')'); return sb.toString(); @@ -203,6 +210,11 @@ public class FileMessageHeader compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters); CompressionInfo.serializer.serialize(compressionInfo, out, version); out.writeLong(header.repairedAt); + out.writeBoolean(header.pendingRepair != null); + if (header.pendingRepair != null) + { + UUIDSerializer.serializer.serialize(header.pendingRepair, out, version); + } out.writeInt(header.sstableLevel); SerializationHeader.serializer.serialize(header.version, header.header, out); @@ -223,10 +235,11 @@ public class FileMessageHeader sections.add(Pair.create(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); long repairedAt = in.readLong(); + UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; int sstableLevel = in.readInt(); SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); - return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header); + return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header); } public long serializedSize(FileMessageHeader header, int version) @@ -244,6 +257,9 @@ public class FileMessageHeader size += TypeSizes.sizeof(section.right); } size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); + size += TypeSizes.sizeof(header.repairedAt); + size += TypeSizes.sizeof(header.pendingRepair != null); + size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0; size += TypeSizes.sizeof(header.sstableLevel); size += SerializationHeader.serializer.serializedSize(header.version, header.header); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index fba9ec4..e3e6b9b 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -65,7 +65,7 @@ public class OutgoingFileMessage extends StreamMessage private boolean completed = false; private boolean transferring = false; - public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel) + public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel) { super(Type.FILE); this.ref = ref; @@ -79,7 +79,8 @@ public class OutgoingFileMessage extends StreamMessage estimatedKeys, sections, sstable.compression ? sstable.getCompressionMetadata() : null, - repairedAt, + sstable.getRepairedAt(), + sstable.getPendingRepair(), keepSSTableLevel ? sstable.getSSTableLevel() : 0, sstable.header.toComponent()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 59f28e0..4619561 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -49,10 +49,9 @@ public class StreamInitMessage // true if this init message is to connect for outgoing message on receiving side public final boolean isForOutgoing; public final boolean keepSSTableLevel; - public final boolean isIncremental; public final UUID pendingRepair; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair) { this.from = from; this.sessionIndex = sessionIndex; @@ -60,7 +59,6 @@ public class StreamInitMessage this.streamOperation = streamOperation; this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; - this.isIncremental = isIncremental; this.pendingRepair = pendingRepair; } @@ -116,7 +114,6 @@ public class StreamInitMessage out.writeUTF(message.streamOperation.getDescription()); out.writeBoolean(message.isForOutgoing); out.writeBoolean(message.keepSSTableLevel); - out.writeBoolean(message.isIncremental); out.writeBoolean(message.pendingRepair != null); if (message.pendingRepair != null) @@ -134,9 +131,8 @@ public class StreamInitMessage boolean sentByInitiator = in.readBoolean(); boolean keepSSTableLevel = in.readBoolean(); - boolean isIncremental = in.readBoolean(); UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; - return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair); } public long serializedSize(StreamInitMessage message, int version) @@ -147,7 +143,6 @@ public class StreamInitMessage size += TypeSizes.sizeof(message.streamOperation.getDescription()); size += TypeSizes.sizeof(message.isForOutgoing); size += TypeSizes.sizeof(message.keepSSTableLevel); - size += TypeSizes.sizeof(message.isIncremental); size += TypeSizes.sizeof(message.pendingRepair != null); if (message.pendingRepair != null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index cebceca..f11362f 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -50,8 +50,8 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddress local = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false, null); - session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0); + StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null); + session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); // session complete event that is not completed makes data not available for keyspace/ranges @@ -71,8 +71,8 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false, null); - session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0); + session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null); + session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 944e320..0f11aee 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -194,7 +194,7 @@ public class LegacySSTableTest ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); details.add(new StreamSession.SSTableStreamingSections(sstable.ref(), sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); + sstable.estimatedKeysForRanges(ranges))); new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddress(), details) .execute().get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index b8595af..97bd321 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -789,7 +789,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( Collections.singleton(new Range<Token>(firstToken, firstToken)), - Collections.singleton(cfs), 0L, null); + Collections.singleton(cfs), null); assertEquals(1, sectionsBeforeRewrite.size()); for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) section.ref.release(); @@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase while (!done.get()) { Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, null); + List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null); if (sections.size() != 1) failed.set(true); for (StreamSession.SSTableStreamingSections section : sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index b799d66..75742dc 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -24,9 +24,11 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import com.google.common.collect.Lists; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.IPartitioner; @@ -34,18 +36,28 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -public class LocalSyncTaskTest extends SchemaLoader +public class LocalSyncTaskTest extends AbstractRepairTest { - private static final IPartitioner partirioner = Murmur3Partitioner.instance; + private static final IPartitioner partitioner = Murmur3Partitioner.instance; public static final String KEYSPACE1 = "DifferencerTest"; public static final String CF_STANDARD = "Standard1"; + public static ColumnFamilyStore cfs; @BeforeClass public static void defineSchema() throws Exception @@ -54,6 +66,9 @@ public class LocalSyncTaskTest extends SchemaLoader SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); + + TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id; + cfs = Schema.instance.getColumnFamilyStoreInstance(tid); } /** @@ -65,7 +80,7 @@ public class LocalSyncTaskTest extends SchemaLoader final InetAddress ep1 = InetAddress.getByName("127.0.0.1"); final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); - Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); MerkleTrees tree1 = createInitialTree(desc); @@ -76,7 +91,7 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(ep1, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, null, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -85,7 +100,7 @@ public class LocalSyncTaskTest extends SchemaLoader @Test public void testDifference() throws Throwable { - Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); UUID parentRepairSession = UUID.randomUUID(); Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); @@ -101,7 +116,7 @@ public class LocalSyncTaskTest extends SchemaLoader MerkleTrees tree2 = createInitialTree(desc); // change a range in one of the trees - Token token = partirioner.midpoint(range.left, range.right); + Token token = partitioner.midpoint(range.left, range.right); tree1.invalidate(token); MerkleTree.TreeRange changed = tree1.get(token); changed.hash("non-empty hash!".getBytes()); @@ -113,16 +128,50 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, null, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false); task.run(); // ensure that the changed range was recorded assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); } - private MerkleTrees createInitialTree(RepairJobDesc desc) + @Test + public void fullRepairStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); + + assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); + assertTrue(plan.getFlushBeforeTransfer()); + } + + @Test + public void incrementalRepairStreamPlan() throws Exception { - MerkleTrees tree = new MerkleTrees(partirioner); + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges()); + + TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); + + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); + + assertEquals(desc.parentSessionId, plan.getPendingRepair()); + assertFalse(plan.getFlushBeforeTransfer()); + } + + private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner) + { + MerkleTrees tree = new MerkleTrees(partitioner); tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); tree.init(); for (MerkleTree.TreeRange r : tree.invalids()) @@ -131,4 +180,10 @@ public class LocalSyncTaskTest extends SchemaLoader } return tree; } + + private MerkleTrees createInitialTree(RepairJobDesc desc) + { + return createInitialTree(desc, partitioner); + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 7be8cb5..0260cd0 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -62,7 +62,7 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddress> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, false, "Standard1"); + RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, false, false, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java index 90988ae..5f13e3d 100644 --- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java @@ -65,12 +65,10 @@ public class StreamingRepairTaskTest extends AbstractRepairTest ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); - StreamingRepairTask task = new StreamingRepairTask(desc, request, prs.getRepairedAt(), prs.isIncremental); + StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId); - StreamPlan plan = task.createStreamPlan(request.src, request.dst, prs.isIncremental); + StreamPlan plan = task.createStreamPlan(request.src, request.dst); Assert.assertFalse(plan.getFlushBeforeTransfer()); - Assert.assertEquals(prs.repairedAt, plan.getRepairedAt()); - } @Test @@ -80,10 +78,9 @@ public class StreamingRepairTaskTest extends AbstractRepairTest ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); - StreamingRepairTask task = new StreamingRepairTask(desc, request, prs.getRepairedAt(), prs.isIncremental); + StreamingRepairTask task = new StreamingRepairTask(desc, request, null); - StreamPlan plan = task.createStreamPlan(request.src, request.dst, prs.isIncremental); + StreamPlan plan = task.createStreamPlan(request.src, request.dst); Assert.assertTrue(plan.getFlushBeforeTransfer()); - Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, plan.getRepairedAt()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java index 8d388ab..84053d4 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java @@ -97,7 +97,6 @@ public class StreamSessionTest Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges, Lists.newArrayList(cfs), - ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); Set<SSTableReader> sstables = new HashSet<>(); for (StreamSession.SSTableStreamingSections section: sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index ce8d2dd..57d40e9 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -74,7 +74,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null); + StreamSession session = new StreamSession(peer, peer, null, 0, true, null); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -90,7 +90,7 @@ public class StreamTransferTaskTest { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0); + task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges)); } assertEquals(2, task.getTotalNumberOfFiles()); @@ -120,9 +120,9 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false, null); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, null, false, null); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null); + StreamSession session = new StreamSession(peer, peer, null, 0, true, null); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); @@ -142,7 +142,7 @@ public class StreamTransferTaskTest ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); Ref<SSTableReader> ref = sstable.selfRef(); refs.add(ref); - task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0); + task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges)); } assertEquals(2, task.getTotalNumberOfFiles()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 36329f4..aa9e666 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -299,7 +299,7 @@ public class StreamingTransferTest { details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable), sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); + sstable.estimatedKeysForRanges(ranges))); } return details; }