Repository: cassandra
Updated Branches:
  refs/heads/trunk 50ba850be -> 9308159bc


Cleanup isIncremental/repairedAt usage

Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13430


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9308159b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9308159b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9308159b

Branch: refs/heads/trunk
Commit: 9308159bc87d29c33e64dbcfcefa948d7f5643dd
Parents: 50ba850
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Fri Apr 7 10:38:56 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Apr 20 16:31:22 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java |  3 +-
 .../cassandra/io/sstable/SSTableLoader.java     |  4 +-
 .../io/sstable/format/SSTableReader.java        | 10 +++
 .../net/IncomingStreamingConnection.java        |  2 +-
 .../apache/cassandra/repair/LocalSyncTask.java  | 40 +++++------
 .../org/apache/cassandra/repair/RepairJob.java  |  4 +-
 .../repair/RepairMessageVerbHandler.java        |  6 +-
 .../apache/cassandra/repair/RepairRunnable.java |  6 +-
 .../apache/cassandra/repair/RepairSession.java  |  3 -
 .../cassandra/repair/StreamingRepairTask.java   | 23 +++---
 .../cassandra/service/ActiveRepairService.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  1 -
 .../cassandra/streaming/StreamCoordinator.java  | 15 ++--
 .../apache/cassandra/streaming/StreamPlan.java  | 20 +++---
 .../cassandra/streaming/StreamReader.java       | 13 +++-
 .../cassandra/streaming/StreamRequest.java      |  9 +--
 .../cassandra/streaming/StreamResultFuture.java |  7 +-
 .../cassandra/streaming/StreamSession.java      | 36 +++-------
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../compress/CompressedStreamReader.java        |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++++-
 .../streaming/messages/OutgoingFileMessage.java |  5 +-
 .../streaming/messages/StreamInitMessage.java   |  9 +--
 .../cassandra/dht/StreamStateStoreTest.java     |  8 +--
 .../cassandra/io/sstable/LegacySSTableTest.java |  2 +-
 .../io/sstable/SSTableRewriterTest.java         |  4 +-
 .../cassandra/repair/LocalSyncTaskTest.java     | 73 +++++++++++++++++---
 .../cassandra/repair/RepairSessionTest.java     |  2 +-
 .../repair/StreamingRepairTaskTest.java         | 11 ++-
 .../cassandra/streaming/StreamSessionTest.java  |  1 -
 .../streaming/StreamTransferTaskTest.java       | 10 +--
 .../streaming/StreamingTransferTest.java        |  2 +-
 33 files changed, 201 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2710060..e72c7a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
  * Change protocol to allow sending key space independent of query string 
(CASSANDRA-10145)
  * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
  * Take number of files in L0 in account when estimating remaining compaction 
tasks (CASSANDRA-13354)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 89a96cd..fd976c9 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -156,8 +156,7 @@ public class RangeStreamer
         this.tokens = tokens;
         this.address = address;
         this.description = streamOperation.getDescription();
-        this.streamPlan = new StreamPlan(streamOperation, 
ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
-                true, false, connectSequentially, null);
+        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, 
true, connectSequentially, null);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 759fa0f..e9ea35a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler
                                                   List<Pair<Long, Long>> 
sstableSections = sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = 
sstable.estimatedKeysForRanges(tokenRanges);
                                                   Ref<SSTableReader> ref = 
sstable.ref();
-                                                  
StreamSession.SSTableStreamingSections details = new 
StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+                                                  
StreamSession.SSTableStreamingSections details = new 
StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys);
                                                   
streamingDetails.put(endpoint, details);
                                               }
 
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 0, 
connectionsPerHost, false, false, false, 
null).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 
connectionsPerHost, false, false, 
null).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 568336e..4495edf 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1760,6 +1760,16 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         return sstableMetadata.pendingRepair != 
ActiveRepairService.NO_PENDING_REPAIR;
     }
 
+    public UUID getPendingRepair()
+    {
+        return sstableMetadata.pendingRepair;
+    }
+
+    public long getRepairedAt()
+    {
+        return sstableMetadata.repairedAt;
+    }
+
     public boolean intersects(Collection<Range<Token>> ranges)
     {
         Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index eee0042..e5fdc99 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread 
implements Closeable
             // The receiving side distinguish two connections by looking at 
StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
             // parallelize said streams and the socket is blocking, so we 
might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, this, init.isForOutgoing, 
version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, this, init.isForOutgoing, 
version, init.keepSSTableLevel, init.pendingRepair);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 3dd6532..06b1661 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.util.List;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,19 +48,33 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
 
     private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 
-    private final long repairedAt;
     private final UUID pendingRepair;
-
     private final boolean pullRepair;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
long repairedAt, UUID pendingRepair, boolean pullRepair)
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
UUID pendingRepair, boolean pullRepair)
     {
         super(desc, r1, r2);
-        this.repairedAt = repairedAt;
         this.pendingRepair = pendingRepair;
         this.pullRepair = pullRepair;
     }
 
+
+    @VisibleForTesting
+    StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, 
List<Range<Token>> differences)
+    {
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
false, pendingRepair)
+                          .listeners(this)
+                          .flushBeforeTransfer(pendingRepair == null)
+                          .requestRanges(dst, preferred, desc.keyspace, 
differences, desc.columnFamily);  // request ranges from the remote node
+        if (!pullRepair)
+        {
+            // send ranges to the remote node if we are not performing a pull 
repair
+            plan.transferRanges(dst, preferred, desc.keyspace, differences, 
desc.columnFamily);
+        }
+
+        return plan;
+    }
+
     /**
      * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
      * that will be called out of band once the streams complete.
@@ -73,24 +88,9 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
 
         String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
         logger.info("[repair #{}] {}", desc.sessionId, message);
-        boolean isIncremental = false;
-        if (desc.parentSessionId != null)
-        {
-            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
-            isIncremental = prs.isIncremental;
-        }
         Tracing.traceRepair(message);
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, repairedAt, 
1, false, isIncremental, false, pendingRepair).listeners(this)
-                                                                               
                                            .flushBeforeTransfer(true)
-                                                                               
                                            // request ranges from the remote 
node
-                                                                               
                                            .requestRanges(dst, preferred, 
desc.keyspace, differences, desc.columnFamily);
-        if (!pullRepair)
-        {
-            // send ranges to the remote node if we are not performing a pull 
repair
-            plan.transferRanges(dst, preferred, desc.keyspace, differences, 
desc.columnFamily);
-        }
 
-        plan.execute();
+        createStreamPlan(dst, preferred, differences).execute();
     }
 
     public void handleStreamEvent(StreamEvent event)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 07bc1e2..d1f6a94 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -40,7 +40,6 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private final RepairSession session;
     private final RepairJobDesc desc;
     private final RepairParallelism parallelismDegree;
-    private final long repairedAt;
     private final ListeningExecutorService taskExecutor;
     private final boolean isConsistent;
 
@@ -54,7 +53,6 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, 
session.getId(), session.keyspace, columnFamily, session.getRanges());
-        this.repairedAt = session.repairedAt;
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.isConsistent = isConsistent;
@@ -130,7 +128,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || 
r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, repairedAt, 
isConsistent ? desc.parentSessionId : null, session.pullRepair);
+                            task = new LocalSyncTask(desc, r1, r2, 
isConsistent ? desc.parentSessionId : null, session.pullRepair);
                         }
                         else
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 4f412f0..b6b9f87 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -135,11 +135,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
-                    long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
-                    if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
-                        repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt();
-
-                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, repairedAt, isConsistent(desc.parentSessionId));
+                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null);
                     task.run();
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 986bd7c..b8bef95 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -255,7 +255,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         ListeningExecutorService executor = createExecutor();
 
         // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the 
repairedAt times to be preserved across streamed sstables
-        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, ActiveRepairService.UNREPAIRED_SSTABLE, 
false, executor, commonRanges, cfnames);
+        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
 
         // After all repair sessions completes(successful or not),
         // run anticompaction if necessary and send finish notice back to 
client
@@ -301,7 +301,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
         ListenableFuture repairResult = coordinatorSession.execute(executor,
-                                                                   () -> 
submitRepairSessions(parentSession, repairedAt, true, executor, commonRanges, 
cfnames),
+                                                                   () -> 
submitRepairSessions(parentSession, true, executor, commonRanges, cfnames),
                                                                    hasFailure);
         Collection<Range<Token>> ranges = new HashSet<>();
         for (Collection<Range<Token>> range : 
Iterables.transform(commonRanges, cr -> cr.right))
@@ -312,7 +312,6 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
     }
 
     private ListenableFuture<List<RepairSessionResult>> 
submitRepairSessions(UUID parentSession,
-                                                                             
long repairedAt,
                                                                              
boolean isConsistent,
                                                                              
ListeningExecutorService executor,
                                                                              
List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
@@ -326,7 +325,6 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                                                                                
      keyspace,
                                                                                
      options.getParallelism(),
                                                                                
      p.left,
-                                                                               
      repairedAt,
                                                                                
      isConsistent,
                                                                                
      options.isPullRepair(),
                                                                                
      executor,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 43a9bfb..2aa068c 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -90,7 +90,6 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
-    public final long repairedAt;
     public final boolean isConsistent;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
@@ -124,7 +123,6 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
                          String keyspace,
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
-                         long repairedAt,
                          boolean isConsistent,
                          boolean pullRepair,
                          String... cfnames)
@@ -138,7 +136,6 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         this.cfnames = cfnames;
         this.ranges = ranges;
         this.endpoints = endpoints;
-        this.repairedAt = repairedAt;
         this.isConsistent = isConsistent;
         this.pullRepair = pullRepair;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index c5f3c95..7042de1 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
+import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -44,15 +45,13 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
 
     private final RepairJobDesc desc;
     private final SyncRequest request;
-    private final long repairedAt;
-    private final boolean isConsistent;
+    private final UUID pendingRepair;
 
-    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long 
repairedAt, boolean isConsistent)
+    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID 
pendingRepair)
     {
         this.desc = desc;
         this.request = request;
-        this.repairedAt = repairedAt;
-        this.isConsistent = isConsistent;
+        this.pendingRepair = pendingRepair;
     }
 
     public void run()
@@ -60,21 +59,15 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
         InetAddress dest = request.dst;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info("[streaming task #{}] Performing streaming repair of {} 
ranges with {}", desc.sessionId, request.ranges.size(), request.dst);
-        boolean isIncremental = false;
-        if (desc.parentSessionId != null)
-        {
-            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
-            isIncremental = prs.isIncremental;
-        }
-        createStreamPlan(dest, preferred, isIncremental).execute();
+        createStreamPlan(dest, preferred).execute();
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, 
boolean isIncremental)
+    StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
     {
-        return new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, 
isIncremental, false, isConsistent ? desc.parentSessionId : null)
+        return new StreamPlan(StreamOperation.REPAIR, 1, false, false, 
pendingRepair)
                .listeners(this)
-               .flushBeforeTransfer(!isIncremental) // sstables are isolated 
at the beginning of an incremental repair session, so flushing isn't neccessary
+               .flushBeforeTransfer(pendingRepair == null) // sstables are 
isolated at the beginning of an incremental repair session, so flushing isn't 
neccessary
                .requestRanges(dest, preferred, desc.keyspace, request.ranges, 
desc.columnFamily) // request ranges from the remote node
                .transferRanges(dest, preferred, desc.keyspace, request.ranges, 
desc.columnFamily); // send ranges to the remote node
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index c03c470..fd98b37 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -165,7 +165,6 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              String keyspace,
                                              RepairParallelism 
parallelismDegree,
                                              Set<InetAddress> endpoints,
-                                             long repairedAt,
                                              boolean isConsistent,
                                              boolean pullRepair,
                                              ListeningExecutorService executor,
@@ -177,7 +176,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
repairedAt, isConsistent, pullRepair, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
isConsistent, pullRepair, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 91f1249..86340a5 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -198,7 +198,6 @@ public class ConnectionHandler
                     session.streamOperation(),
                     !isOutgoingHandler,
                     session.keepSSTableLevel(),
-                    session.isIncremental(),
                     session.getPendingRepair());
             ByteBuffer messageBuf = message.createMessage(false, 
protocolVersion);
             DataOutputStreamPlus out = getWriteChannel(socket);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 81d0498..6aa34cd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -47,17 +47,15 @@ public class StreamCoordinator
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
     private final boolean keepSSTableLevel;
-    private final boolean isIncremental;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
 
-    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, 
boolean isIncremental,
-                             StreamConnectionFactory factory, boolean 
connectSequentially, UUID pendingRepair)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, 
StreamConnectionFactory factory,
+                             boolean connectSequentially, UUID pendingRepair)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
-        this.isIncremental = isIncremental;
         this.connectSequentially = connectSequentially;
         this.pendingRepair = pendingRepair;
     }
@@ -251,6 +249,11 @@ public class StreamCoordinator
         return data;
     }
 
+    public UUID getPendingRepair()
+    {
+        return pendingRepair;
+    }
+
     private static class StreamSessionConnector implements Runnable
     {
         private final StreamSession session;
@@ -290,7 +293,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size(), keepSSTableLevel, isIncremental, pendingRepair);
+                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size(), keepSSTableLevel, pendingRepair);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -322,7 +325,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, 
keepSSTableLevel, isIncremental, pendingRepair);
+                session = new StreamSession(peer, connecting, factory, id, 
keepSSTableLevel, pendingRepair);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index faaac0e..b5a6214 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -36,7 +36,6 @@ public class StreamPlan
     private final UUID planId = UUIDGen.getTimeUUID();
     private final StreamOperation streamOperation;
     private final List<StreamEventHandler> handlers = new ArrayList<>();
-    private final long repairedAt;
     private final StreamCoordinator coordinator;
 
     private boolean flushBeforeTransfer = true;
@@ -48,20 +47,19 @@ public class StreamPlan
      */
     public StreamPlan(StreamOperation streamOperation)
     {
-        this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, 
false, false, false, null);
+        this(streamOperation, 1, false, false, null);
     }
 
     public StreamPlan(StreamOperation streamOperation, boolean 
keepSSTableLevels, boolean connectSequentially)
     {
-        this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, 
keepSSTableLevels, false, connectSequentially, null);
+        this(streamOperation, 1, keepSSTableLevels, connectSequentially, null);
     }
 
-    public StreamPlan(StreamOperation streamOperation, long repairedAt, int 
connectionsPerHost, boolean keepSSTableLevels,
-                      boolean isIncremental, boolean connectSequentially, UUID 
pendingRepair)
+    public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, 
boolean keepSSTableLevels,
+                      boolean connectSequentially, UUID pendingRepair)
     {
         this.streamOperation = streamOperation;
-        this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, 
keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
+        this.coordinator = new StreamCoordinator(connectionsPerHost, 
keepSSTableLevels, new DefaultConnectionFactory(),
                                                  connectSequentially, 
pendingRepair);
     }
 
@@ -92,7 +90,7 @@ public class StreamPlan
     public StreamPlan requestRanges(InetAddress from, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = coordinator.getOrCreateNextSession(from, 
connecting);
-        session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies), repairedAt);
+        session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies));
         return this;
     }
 
@@ -133,7 +131,7 @@ public class StreamPlan
     public StreamPlan transferRanges(InetAddress to, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = coordinator.getOrCreateNextSession(to, 
connecting);
-        session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
+        session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer);
         return this;
     }
 
@@ -203,9 +201,9 @@ public class StreamPlan
         return this;
     }
 
-    public long getRepairedAt()
+    public UUID getPendingRepair()
     {
-        return repairedAt;
+        return coordinator.getPendingRepair();
     }
 
     public boolean getFlushBeforeTransfer()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 7d00e48..3a95015 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -23,6 +23,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
 import java.util.UUID;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.UnmodifiableIterator;
 
@@ -59,6 +60,7 @@ public class StreamReader
     protected final StreamSession session;
     protected final Version inputVersion;
     protected final long repairedAt;
+    protected final UUID pendingRepair;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
     protected final SerializationHeader.Component header;
@@ -66,12 +68,19 @@ public class StreamReader
 
     public StreamReader(FileMessageHeader header, StreamSession session)
     {
+        if (session.getPendingRepair() != null)
+        {
+            // we should only ever be streaming pending repair
+            // sstables if the session has a pending repair id
+            assert session.getPendingRepair().equals(header.pendingRepair);
+        }
         this.session = session;
         this.tableId = header.tableId;
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
         this.inputVersion = header.version;
         this.repairedAt = header.repairedAt;
+        this.pendingRepair = header.pendingRepair;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
         this.header = header.header;
@@ -97,14 +106,14 @@ public class StreamReader
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
                      session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(),
-                     cfs.getTableName(), session.getPendingRepair());
+                     cfs.getTableName(), pendingRepair);
 
         TrackedInputStream in = new TrackedInputStream(new 
LZFInputStream(Channels.newInputStream(channel)));
         StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
         SSTableMultiWriter writer = null;
         try
         {
-            writer = createWriter(cfs, totalSize, repairedAt, 
session.getPendingRepair(), format);
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             while (in.getBytesRead() < totalSize)
             {
                 writePartition(deserializer, writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java 
b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 93726e7..4a3761e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -38,13 +38,11 @@ public class StreamRequest
     public final String keyspace;
     public final Collection<Range<Token>> ranges;
     public final Collection<String> columnFamilies = new HashSet<>();
-    public final long repairedAt;
-    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, 
Collection<String> columnFamilies, long repairedAt)
+    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, 
Collection<String> columnFamilies)
     {
         this.keyspace = keyspace;
         this.ranges = ranges;
         this.columnFamilies.addAll(columnFamilies);
-        this.repairedAt = repairedAt;
     }
 
     public static class StreamRequestSerializer implements 
IVersionedSerializer<StreamRequest>
@@ -52,7 +50,6 @@ public class StreamRequest
         public void serialize(StreamRequest request, DataOutputPlus out, int 
version) throws IOException
         {
             out.writeUTF(request.keyspace);
-            out.writeLong(request.repairedAt);
             out.writeInt(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {
@@ -68,7 +65,6 @@ public class StreamRequest
         public StreamRequest deserialize(DataInputPlus in, int version) throws 
IOException
         {
             String keyspace = in.readUTF();
-            long repairedAt = in.readLong();
             int rangeCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
@@ -81,13 +77,12 @@ public class StreamRequest
             List<String> columnFamilies = new ArrayList<>(cfCount);
             for (int i = 0; i < cfCount; i++)
                 columnFamilies.add(in.readUTF());
-            return new StreamRequest(keyspace, ranges, columnFamilies, 
repairedAt);
+            return new StreamRequest(keyspace, ranges, columnFamilies);
         }
 
         public long serializedSize(StreamRequest request, int version)
         {
             int size = TypeSizes.sizeof(request.keyspace);
-            size += TypeSizes.sizeof(request.repairedAt);
             size += TypeSizes.sizeof(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 4890b63..7845986 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,9 +71,9 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, StreamOperation streamOperation, 
boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair)
+    private StreamResultFuture(UUID planId, StreamOperation streamOperation, 
boolean keepSSTableLevels, UUID pendingRepair)
     {
-        this(planId, streamOperation, new StreamCoordinator(0, 
keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, 
pendingRepair));
+        this(planId, streamOperation, new StreamCoordinator(0, 
keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair));
     }
 
     static StreamResultFuture init(UUID planId, StreamOperation 
streamOperation, Collection<StreamEventHandler> listeners,
@@ -107,7 +107,6 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
                                                                     boolean 
isForOutgoing,
                                                                     int 
version,
                                                                     boolean 
keepSSTableLevel,
-                                                                    boolean 
isIncremental,
                                                                     UUID 
pendingRepair) throws IOException
     {
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
@@ -116,7 +115,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for 
{}", planId, sessionIndex, streamOperation.getDescription());
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
-            future = new StreamResultFuture(planId, streamOperation, 
keepSSTableLevel, isIncremental, pendingRepair);
+            future = new StreamResultFuture(planId, streamOperation, 
keepSSTableLevel, pendingRepair);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, connection, isForOutgoing, 
version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 62fa317..adb8e79 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -163,7 +163,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
-    private final boolean isIncremental;
     private ScheduledFuture<?> keepAliveFuture = null;
     private final UUID pendingRepair;
 
@@ -187,7 +186,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean 
isIncremental, UUID pendingRepair)
+    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID 
pendingRepair)
     {
         this.peer = peer;
         this.connecting = connecting;
@@ -198,7 +197,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                                                    
DatabaseDescriptor.getStreamingSocketTimeout());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
-        this.isIncremental = isIncremental;
         this.pendingRepair = pendingRepair;
     }
 
@@ -222,11 +220,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return keepSSTableLevel;
     }
 
-    public boolean isIncremental()
-    {
-        return isIncremental;
-    }
-
     public UUID getPendingRepair()
     {
         return pendingRepair;
@@ -298,9 +291,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      * @param ranges Ranges to retrieve data
      * @param columnFamilies ColumnFamily names. Can be empty if requesting 
all CF under the keyspace.
      */
-    public void addStreamRequest(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies, long repairedAt)
+    public void addStreamRequest(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies)
     {
-        requests.add(new StreamRequest(keyspace, ranges, columnFamilies, 
repairedAt));
+        requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
     }
 
     /**
@@ -312,9 +305,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      * @param ranges Transfer ranges
      * @param columnFamilies Transfer ColumnFamilies
      * @param flushTables flush tables?
-     * @param repairedAt the time the repair started.
      */
-    public synchronized void addTransferRanges(String keyspace, 
Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean 
flushTables, long repairedAt)
+    public synchronized void addTransferRanges(String keyspace, 
Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean 
flushTables)
     {
         failIfFinished();
         Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, 
columnFamilies);
@@ -322,7 +314,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, 
pendingRepair);
+        List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair);
         try
         {
             addTransferFiles(sections);
@@ -364,7 +356,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    public static List<SSTableStreamingSections> 
getSSTableSectionsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, UUID 
pendingRepair)
+    public static List<SSTableStreamingSections> 
getSSTableSectionsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, UUID pendingRepair)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
@@ -410,13 +402,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             List<SSTableStreamingSections> sections = new 
ArrayList<>(refs.size());
             for (SSTableReader sstable : refs)
             {
-                long repairedAt = overriddenRepairedAt;
-                if (overriddenRepairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE)
-                    repairedAt = sstable.getSSTableMetadata().repairedAt;
-                sections.add(new SSTableStreamingSections(refs.get(sstable),
-                                                          
sstable.getPositionsForRanges(ranges),
-                                                          
sstable.estimatedKeysForRanges(ranges),
-                                                          repairedAt));
+                sections.add(new SSTableStreamingSections(refs.get(sstable), 
sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
             }
             return sections;
         }
@@ -452,7 +438,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 if (task == null)
                     task = newTask;
             }
-            task.addTransferFile(details.ref, details.estimatedKeys, 
details.sections, details.repairedAt);
+            task.addTransferFile(details.ref, details.estimatedKeys, 
details.sections);
             iter.remove();
         }
     }
@@ -462,14 +448,12 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         public final Ref<SSTableReader> ref;
         public final List<Pair<Long, Long>> sections;
         public final long estimatedKeys;
-        public final long repairedAt;
 
-        public SSTableStreamingSections(Ref<SSTableReader> ref, 
List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
+        public SSTableStreamingSections(Ref<SSTableReader> ref, 
List<Pair<Long, Long>> sections, long estimatedKeys)
         {
             this.ref = ref;
             this.sections = sections;
             this.estimatedKeys = estimatedKeys;
-            this.repairedAt = repairedAt;
         }
     }
 
@@ -623,7 +607,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         // prepare tasks
         state(State.PREPARING);
         for (StreamRequest request : requests)
-            addTransferRanges(request.keyspace, request.ranges, 
request.columnFamilies, true, request.repairedAt); // always flush on stream 
request
+            addTransferRanges(request.keyspace, request.ranges, 
request.columnFamilies, true); // always flush on stream request
         for (StreamSummary summary : summaries)
             prepareReceiving(summary);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index aa3251b..748da8b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -53,10 +53,10 @@ public class StreamTransferTask extends StreamTask
         super(session, tableId);
     }
 
-    public synchronized void addTransferFile(Ref<SSTableReader> ref, long 
estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public synchronized void addTransferFile(Ref<SSTableReader> ref, long 
estimatedKeys, List<Pair<Long, Long>> sections)
     {
         assert ref.get() != null && tableId.equals(ref.get().metadata().id);
-        OutgoingFileMessage message = new OutgoingFileMessage(ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, 
session.keepSSTableLevel());
+        OutgoingFileMessage message = new OutgoingFileMessage(ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, 
session.keepSSTableLevel());
         message = StreamHook.instance.reportOutgoingFile(session, ref.get(), 
message);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 3e53fa2..f8e4b40 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -73,7 +73,7 @@ public class CompressedStreamReader extends StreamReader
         }
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), session.getPendingRepair(),
+                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
                      cfs.getTableName());
 
         CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
@@ -84,7 +84,7 @@ public class CompressedStreamReader extends StreamReader
         SSTableMultiWriter writer = null;
         try
         {
-            writer = createWriter(cfs, totalSize, repairedAt, 
session.getPendingRepair(), format);
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             String filename = writer.getFilename();
             int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java 
b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index f449982..c65e1d4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
@@ -28,10 +29,10 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * StreamingFileHeader is appended before sending actual data to describe what 
it's sending.
@@ -57,6 +58,7 @@ public class FileMessageHeader
     public final CompressionInfo compressionInfo;
     private final CompressionMetadata compressionMetadata;
     public final long repairedAt;
+    public final UUID pendingRepair;
     public final int sstableLevel;
     public final SerializationHeader.Component header;
 
@@ -71,6 +73,7 @@ public class FileMessageHeader
                              List<Pair<Long, Long>> sections,
                              CompressionInfo compressionInfo,
                              long repairedAt,
+                             UUID pendingRepair,
                              int sstableLevel,
                              SerializationHeader.Component header)
     {
@@ -83,6 +86,7 @@ public class FileMessageHeader
         this.compressionInfo = compressionInfo;
         this.compressionMetadata = null;
         this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
         this.sstableLevel = sstableLevel;
         this.header = header;
         this.size = calculateSize();
@@ -96,6 +100,7 @@ public class FileMessageHeader
                              List<Pair<Long, Long>> sections,
                              CompressionMetadata compressionMetadata,
                              long repairedAt,
+                             UUID pendingRepair,
                              int sstableLevel,
                              SerializationHeader.Component header)
     {
@@ -108,6 +113,7 @@ public class FileMessageHeader
         this.compressionInfo = null;
         this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
         this.sstableLevel = sstableLevel;
         this.header = header;
         this.size = calculateSize();
@@ -159,6 +165,7 @@ public class FileMessageHeader
         sb.append(", transfer size: ").append(size());
         sb.append(", compressed?: ").append(isCompressed());
         sb.append(", repairedAt: ").append(repairedAt);
+        sb.append(", pendingRepair: ").append(pendingRepair);
         sb.append(", level: ").append(sstableLevel);
         sb.append(')');
         return sb.toString();
@@ -203,6 +210,11 @@ public class FileMessageHeader
                 compressionInfo = new 
CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections),
 header.compressionMetadata.parameters);
             CompressionInfo.serializer.serialize(compressionInfo, out, 
version);
             out.writeLong(header.repairedAt);
+            out.writeBoolean(header.pendingRepair != null);
+            if (header.pendingRepair != null)
+            {
+                UUIDSerializer.serializer.serialize(header.pendingRepair, out, 
version);
+            }
             out.writeInt(header.sstableLevel);
 
             SerializationHeader.serializer.serialize(header.version, 
header.header, out);
@@ -223,10 +235,11 @@ public class FileMessageHeader
                 sections.add(Pair.create(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
             long repairedAt = in.readLong();
+            UUID pendingRepair = in.readBoolean() ? 
UUIDSerializer.serializer.deserialize(in, version) : null;
             int sstableLevel = in.readInt();
             SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-            return new FileMessageHeader(tableId, sequenceNumber, 
sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, 
sstableLevel, header);
+            return new FileMessageHeader(tableId, sequenceNumber, 
sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, 
pendingRepair, sstableLevel, header);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
@@ -244,6 +257,9 @@ public class FileMessageHeader
                 size += TypeSizes.sizeof(section.right);
             }
             size += 
CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+            size += TypeSizes.sizeof(header.repairedAt);
+            size += TypeSizes.sizeof(header.pendingRepair != null);
+            size += header.pendingRepair != null ? 
UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0;
             size += TypeSizes.sizeof(header.sstableLevel);
 
             size += 
SerializationHeader.serializer.serializedSize(header.version, header.header);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index fba9ec4..e3e6b9b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -65,7 +65,7 @@ public class OutgoingFileMessage extends StreamMessage
     private boolean completed = false;
     private boolean transferring = false;
 
-    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, 
long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean 
keepSSTableLevel)
+    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, 
long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
     {
         super(Type.FILE);
         this.ref = ref;
@@ -79,7 +79,8 @@ public class OutgoingFileMessage extends StreamMessage
                                             estimatedKeys,
                                             sections,
                                             sstable.compression ? 
sstable.getCompressionMetadata() : null,
-                                            repairedAt,
+                                            sstable.getRepairedAt(),
+                                            sstable.getPendingRepair(),
                                             keepSSTableLevel ? 
sstable.getSSTableLevel() : 0,
                                             sstable.header.toComponent());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 59f28e0..4619561 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -49,10 +49,9 @@ public class StreamInitMessage
     // true if this init message is to connect for outgoing message on 
receiving side
     public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
-    public final boolean isIncremental;
     public final UUID pendingRepair;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
StreamOperation streamOperation, boolean isForOutgoing, boolean 
keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
StreamOperation streamOperation, boolean isForOutgoing, boolean 
keepSSTableLevel, UUID pendingRepair)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
@@ -60,7 +59,6 @@ public class StreamInitMessage
         this.streamOperation = streamOperation;
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
-        this.isIncremental = isIncremental;
         this.pendingRepair = pendingRepair;
     }
 
@@ -116,7 +114,6 @@ public class StreamInitMessage
             out.writeUTF(message.streamOperation.getDescription());
             out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
-            out.writeBoolean(message.isIncremental);
 
             out.writeBoolean(message.pendingRepair != null);
             if (message.pendingRepair != null)
@@ -134,9 +131,8 @@ public class StreamInitMessage
             boolean sentByInitiator = in.readBoolean();
             boolean keepSSTableLevel = in.readBoolean();
 
-            boolean isIncremental = in.readBoolean();
             UUID pendingRepair = in.readBoolean() ? 
UUIDSerializer.serializer.deserialize(in, version) : null;
-            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, 
isIncremental, pendingRepair);
+            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, 
pendingRepair);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -147,7 +143,6 @@ public class StreamInitMessage
             size += TypeSizes.sizeof(message.streamOperation.getDescription());
             size += TypeSizes.sizeof(message.isForOutgoing);
             size += TypeSizes.sizeof(message.keepSSTableLevel);
-            size += TypeSizes.sizeof(message.isIncremental);
             size += TypeSizes.sizeof(message.pendingRepair != null);
             if (message.pendingRepair != null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java 
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index cebceca..f11362f 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -50,8 +50,8 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), 
factory.fromString("100"));
 
         InetAddress local = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false, null);
-        session.addStreamRequest("keyspace1", Collections.singleton(range), 
Collections.singleton("cf"), 0);
+        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, null);
+        session.addStreamRequest("keyspace1", Collections.singleton(range), 
Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
         // session complete event that is not completed makes data not 
available for keyspace/ranges
@@ -71,8 +71,8 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), 
factory.fromString("200"));
-        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false, null);
-        session.addStreamRequest("keyspace1", Collections.singleton(range2), 
Collections.singleton("cf"), 0);
+        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, null);
+        session.addStreamRequest("keyspace1", Collections.singleton(range2), 
Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java 
b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 944e320..0f11aee 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -194,7 +194,7 @@ public class LegacySSTableTest
         ArrayList<StreamSession.SSTableStreamingSections> details = new 
ArrayList<>();
         details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
                                                                
sstable.getPositionsForRanges(ranges),
-                                                               
sstable.estimatedKeysForRanges(ranges), 
sstable.getSSTableMetadata().repairedAt));
+                                                               
sstable.estimatedKeysForRanges(ranges)));
         new 
StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddress(),
 details)
                                   .execute().get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index b8595af..97bd321 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -789,7 +789,7 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
 
         List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = 
StreamSession.getSSTableSectionsForRanges(
             Collections.singleton(new Range<Token>(firstToken, firstToken)),
-            Collections.singleton(cfs), 0L, null);
+            Collections.singleton(cfs), null);
         assertEquals(1, sectionsBeforeRewrite.size());
         for (StreamSession.SSTableStreamingSections section : 
sectionsBeforeRewrite)
             section.ref.release();
@@ -804,7 +804,7 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
                 while (!done.get())
                 {
                     Set<Range<Token>> range = Collections.singleton(new 
Range<Token>(firstToken, firstToken));
-                    List<StreamSession.SSTableStreamingSections> sections = 
StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 
0L, null);
+                    List<StreamSession.SSTableStreamingSections> sections = 
StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 
null);
                     if (sections.size() != 1)
                         failed.set(true);
                     for (StreamSession.SSTableStreamingSections section : 
sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index b799d66..75742dc 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -24,9 +24,11 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.IPartitioner;
@@ -34,18 +36,28 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-public class LocalSyncTaskTest extends SchemaLoader
+public class LocalSyncTaskTest extends AbstractRepairTest
 {
-    private static final IPartitioner partirioner = 
Murmur3Partitioner.instance;
+    private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
     public static final String KEYSPACE1 = "DifferencerTest";
     public static final String CF_STANDARD = "Standard1";
+    public static ColumnFamilyStore cfs;
 
     @BeforeClass
     public static void defineSchema() throws Exception
@@ -54,6 +66,9 @@ public class LocalSyncTaskTest extends SchemaLoader
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
+
+        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, 
CF_STANDARD).id;
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
     }
 
     /**
@@ -65,7 +80,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
         final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
 
-        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
         RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
 
         MerkleTrees tree1 = createInitialTree(desc);
@@ -76,7 +91,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(ep1, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -85,7 +100,7 @@ public class LocalSyncTaskTest extends SchemaLoader
     @Test
     public void testDifference() throws Throwable
     {
-        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
         UUID parentRepairSession = UUID.randomUUID();
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
@@ -101,7 +116,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         MerkleTrees tree2 = createInitialTree(desc);
 
         // change a range in one of the trees
-        Token token = partirioner.midpoint(range.left, range.right);
+        Token token = partitioner.midpoint(range.left, range.right);
         tree1.invalidate(token);
         MerkleTree.TreeRange changed = tree1.get(token);
         changed.hash("non-empty hash!".getBytes());
@@ -113,16 +128,50 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), 
tree1);
         TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), 
tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
         task.run();
 
         // ensure that the changed range was recorded
         assertEquals("Wrong differing ranges", interesting.size(), 
task.getCurrentStat().numberOfDifferences);
     }
 
-    private MerkleTrees createInitialTree(RepairJobDesc desc)
+    @Test
+    public void fullRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, false);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, 
Lists.newArrayList(RANGE1));
+
+        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
+        assertTrue(plan.getFlushBeforeTransfer());
+    }
+
+    @Test
+    public void incrementalRepairStreamPlan() throws Exception
     {
-        MerkleTrees tree = new MerkleTrees(partirioner);
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(PARTICIPANT1, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, false);
+        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, 
Lists.newArrayList(RANGE1));
+
+        assertEquals(desc.parentSessionId, plan.getPendingRepair());
+        assertFalse(plan.getFlushBeforeTransfer());
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner 
partitioner)
+    {
+        MerkleTrees tree = new MerkleTrees(partitioner);
         tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
         tree.init();
         for (MerkleTree.TreeRange r : tree.invalids())
@@ -131,4 +180,10 @@ public class LocalSyncTaskTest extends SchemaLoader
         }
         return tree;
     }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc)
+    {
+        return createInitialTree(desc, partitioner);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 7be8cb5..0260cd0 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -62,7 +62,7 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new 
Range<>(p.getToken(ByteBufferUtil.bytes(0)), 
p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, 
endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, false, "Standard1");
+        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, 
endpoints, false, false, "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java 
b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index 90988ae..5f13e3d 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -65,12 +65,10 @@ public class StreamingRepairTaskTest extends 
AbstractRepairTest
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
         SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges());
-        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
prs.getRepairedAt(), prs.isIncremental);
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
desc.sessionId);
 
-        StreamPlan plan = task.createStreamPlan(request.src, request.dst, 
prs.isIncremental);
+        StreamPlan plan = task.createStreamPlan(request.src, request.dst);
         Assert.assertFalse(plan.getFlushBeforeTransfer());
-        Assert.assertEquals(prs.repairedAt, plan.getRepairedAt());
-
     }
 
     @Test
@@ -80,10 +78,9 @@ public class StreamingRepairTaskTest extends 
AbstractRepairTest
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
         SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges());
-        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
prs.getRepairedAt(), prs.isIncremental);
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
null);
 
-        StreamPlan plan = task.createStreamPlan(request.src, request.dst, 
prs.isIncremental);
+        StreamPlan plan = task.createStreamPlan(request.src, request.dst);
         Assert.assertTrue(plan.getFlushBeforeTransfer());
-        Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, 
plan.getRepairedAt());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
index 8d388ab..84053d4 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -97,7 +97,6 @@ public class StreamSessionTest
         Collection<Range<Token>> ranges = Lists.newArrayList(new 
Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
         List<StreamSession.SSTableStreamingSections> sections = 
StreamSession.getSSTableSectionsForRanges(ranges,
                                                                                
                           Lists.newArrayList(cfs),
-                                                                               
                           ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                                
                           pendingRepair);
         Set<SSTableReader> sstables = new HashSet<>();
         for (StreamSession.SSTableStreamingSections section: sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ce8d2dd..57d40e9 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -74,7 +74,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false, null);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
null);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -90,7 +90,7 @@ public class StreamTransferTaskTest
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
-            task.addTransferFile(sstable.selfRef(), 1, 
sstable.getPositionsForRanges(ranges), 0);
+            task.addTransferFile(sstable.selfRef(), 1, 
sstable.getPositionsForRanges(ranges));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
@@ -120,9 +120,9 @@ public class StreamTransferTaskTest
     public void testFailSessionDuringTransferShouldNotReleaseReferences() 
throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
false, null, false, null);
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
null, false, null);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false, null);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
null);
         session.init(future);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
@@ -142,7 +142,7 @@ public class StreamTransferTaskTest
             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
             Ref<SSTableReader> ref = sstable.selfRef();
             refs.add(ref);
-            task.addTransferFile(ref, 1, 
sstable.getPositionsForRanges(ranges), 0);
+            task.addTransferFile(ref, 1, 
sstable.getPositionsForRanges(ranges));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9308159b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 36329f4..aa9e666 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -299,7 +299,7 @@ public class StreamingTransferTest
         {
             details.add(new 
StreamSession.SSTableStreamingSections(sstables.get(sstable),
                                                                    
sstable.getPositionsForRanges(ranges),
-                                                                   
sstable.estimatedKeysForRanges(ranges), 
sstable.getSSTableMetadata().repairedAt));
+                                                                   
sstable.estimatedKeysForRanges(ranges)));
         }
         return details;
     }

Reply via email to