Repository: cassandra
Updated Branches:
  refs/heads/trunk c718bfff9 -> c878b6968


StreamPlan for incremental repairs flushing memtables unnecessarily
Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c878b696
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c878b696
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c878b696

Branch: refs/heads/trunk
Commit: c878b6968be88fa89fb1d1d0212411bcbc4fae7c
Parents: c718bff
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Wed Feb 15 10:47:24 2017 -0800
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Fri Feb 17 13:57:17 2017 -0800

----------------------------------------------------------------------
 .../cassandra/repair/StreamingRepairTask.java   | 19 ++--
 .../apache/cassandra/streaming/StreamPlan.java  | 10 +++
 .../compaction/AbstractPendingRepairTest.java   |  4 +-
 ...pactionStrategyManagerPendingRepairTest.java | 14 +--
 .../db/compaction/PendingRepairManagerTest.java | 22 ++---
 .../cassandra/repair/AbstractRepairTest.java    | 91 ++++++++++++++++++++
 .../repair/StreamingRepairTaskTest.java         | 89 +++++++++++++++++++
 .../consistent/CoordinatorSessionTest.java      |  3 +-
 .../consistent/CoordinatorSessionsTest.java     |  5 +-
 .../repair/consistent/LocalSessionTest.java     |  5 +-
 10 files changed, 230 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f24a79a..6bce1fa 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,13 +65,17 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
             ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
             isIncremental = prs.isIncremental;
         }
-        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, 
isConsistent ? desc.parentSessionId : null).listeners(this)
-                                            .flushBeforeTransfer(true)
-                                            // request ranges from the remote 
node
-                                            .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
-                                            // send ranges to the remote node
-                                            .transferRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
-                                            .execute();
+        createStreamPlan(dest, preferred, isIncremental).execute();
+    }
+
+    @VisibleForTesting
+    StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, 
boolean isIncremental)
+    {
+        return new StreamPlan("Repair", repairedAt, 1, false, isIncremental, 
false, isConsistent ? desc.parentSessionId : null)
+               .listeners(this)
+               .flushBeforeTransfer(!isIncremental) // sstables are isolated 
at the beginning of an incremental repair session, so flushing isn't neccessary
+               .requestRanges(dest, preferred, desc.keyspace, request.ranges, 
desc.columnFamily) // request ranges from the remote node
+               .transferRanges(dest, preferred, desc.keyspace, request.ranges, 
desc.columnFamily); // send ranges to the remote node
     }
 
     public void handleStreamEvent(StreamEvent event)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 5526da8..5a2ce77 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -202,4 +202,14 @@ public class StreamPlan
         this.flushBeforeTransfer = flushBeforeTransfer;
         return this;
     }
+
+    public long getRepairedAt()
+    {
+        return repairedAt;
+    }
+
+    public boolean getFlushBeforeTransfer()
+    {
+        return flushBeforeTransfer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java 
b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index 08be550..75f555d 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -40,13 +40,13 @@ import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.consistent.AbstractConsistentSessionTest;
+import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 
 @Ignore
-public class AbstractPendingRepairTest extends AbstractConsistentSessionTest
+public class AbstractPendingRepairTest extends AbstractRepairTest
 {
     protected String ks;
     protected final String tbl = "tbl";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index 8f53781..27bff20 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -73,7 +73,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void sstableAdded()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         Assert.assertTrue(csm.pendingRepairs().isEmpty());
 
@@ -97,7 +97,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void sstableListChangedAddAndRemove()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 
         SSTableReader sstable1 = makeSSTable(true);
@@ -144,7 +144,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void sstableRepairStatusChanged()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 
         // add as unrepaired
@@ -176,7 +176,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void sstableDeleted()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 
         SSTableReader sstable = makeSSTable(true);
@@ -199,7 +199,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void getStrategies()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
 
         List<List<AbstractCompactionStrategy>> strategies;
@@ -224,7 +224,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void cleanupCompactionFinalized()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -260,7 +260,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
     @Test
     public void cleanupCompactionFailed()
     {
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java 
b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index 1c2f02e..1b43217 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -41,7 +41,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -59,7 +59,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -78,7 +78,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -102,7 +102,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -118,7 +118,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -136,13 +136,13 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
         prm.addSSTable(sstable);
 
-        repairID = registerSession(cfs);
+        repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -170,7 +170,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
 
-        UUID repairID = registerSession(cfs);
+        UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, 
PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
@@ -186,7 +186,7 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     public void userDefinedTaskTest()
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
-        UUID repairId = registerSession(cfs);
+        UUID repairId = registerSession(cfs, true, true);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairId);
         prm.addSSTable(sstable);
@@ -198,8 +198,8 @@ public class PendingRepairManagerTest extends 
AbstractPendingRepairTest
     public void mixedPendingSessionsTest()
     {
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
-        UUID repairId = registerSession(cfs);
-        UUID repairId2 = registerSession(cfs);
+        UUID repairId = registerSession(cfs, true, true);
+        UUID repairId2 = registerSession(cfs, true, true);
         SSTableReader sstable = makeSSTable(true);
         SSTableReader sstable2 = makeSSTable(true);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java 
b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
new file mode 100644
index 0000000..1c508a0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Ignore;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+@Ignore
+public abstract class AbstractRepairTest
+{
+    protected static final InetAddress COORDINATOR;
+    protected static final InetAddress PARTICIPANT1;
+    protected static final InetAddress PARTICIPANT2;
+    protected static final InetAddress PARTICIPANT3;
+
+    static
+    {
+        try
+        {
+            COORDINATOR = InetAddress.getByName("10.0.0.1");
+            PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
+            PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
+            PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+        }
+        catch (UnknownHostException e)
+        {
+
+            throw new AssertionError(e);
+        }
+
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    protected static final Set<InetAddress> PARTICIPANTS = 
ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+
+    protected static Token t(int v)
+    {
+        return 
DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v));
+    }
+
+    protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2));
+    protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
+    protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
+
+    protected static UUID registerSession(ColumnFamilyStore cfs, boolean 
isIncremental, boolean isGlobal)
+    {
+        UUID sessionId = UUIDGen.getTimeUUID();
+
+        long repairedAt = isIncremental ? System.currentTimeMillis() : 
ActiveRepairService.UNREPAIRED_SSTABLE;
+        ActiveRepairService.instance.registerParentRepairSession(sessionId,
+                                                                 COORDINATOR,
+                                                                 
Lists.newArrayList(cfs),
+                                                                 
Sets.newHashSet(RANGE1, RANGE2, RANGE3),
+                                                                 isIncremental,
+                                                                 repairedAt,
+                                                                 isGlobal);
+        return sessionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java 
b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
new file mode 100644
index 0000000..90988ae
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class StreamingRepairTaskTest extends AbstractRepairTest
+{
+    protected String ks;
+    protected final String tbl = "tbl";
+    protected TableMetadata cfm;
+    protected ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Before
+    public void setup()
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k 
INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    @Test
+    public void incrementalStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges());
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
prs.getRepairedAt(), prs.isIncremental);
+
+        StreamPlan plan = task.createStreamPlan(request.src, request.dst, 
prs.isIncremental);
+        Assert.assertFalse(plan.getFlushBeforeTransfer());
+        Assert.assertEquals(prs.repairedAt, plan.getRepairedAt());
+
+    }
+
+    @Test
+    public void fullStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, false, true);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, 
UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3, prs.getRanges());
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, 
prs.getRepairedAt(), prs.isIncremental);
+
+        StreamPlan plan = task.createStreamPlan(request.src, request.dst, 
prs.isIncremental);
+        Assert.assertTrue(plan.getFlushBeforeTransfer());
+        Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, 
plan.getRepairedAt());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index b570920..4f5b7e6 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -47,7 +48,7 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
 
-public class CoordinatorSessionTest extends AbstractConsistentSessionTest
+public class CoordinatorSessionTest extends AbstractRepairTest
 {
 
     static CoordinatorSession.Builder createBuilder()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index dfa9bc5..b7adb27 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -40,7 +41,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.UUIDGen;
 
-public class CoordinatorSessionsTest extends AbstractConsistentSessionTest
+public class CoordinatorSessionsTest extends AbstractRepairTest
 {
     private static TableMetadata cfm;
     private static ColumnFamilyStore cfs;
@@ -110,7 +111,7 @@ public class CoordinatorSessionsTest extends 
AbstractConsistentSessionTest
 
     private static UUID registerSession()
     {
-        return registerSession(cfs);
+        return registerSession(cfs, true, true);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index a85e01b..2a4ce9a 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -64,7 +65,7 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
 
-public class LocalSessionTest extends AbstractConsistentSessionTest
+public class LocalSessionTest extends AbstractRepairTest
 {
 
     static LocalSession.Builder createBuilder()
@@ -196,7 +197,7 @@ public class LocalSessionTest extends 
AbstractConsistentSessionTest
 
     private static UUID registerSession()
     {
-        return registerSession(cfs);
+        return registerSession(cfs, true, true);
     }
 
     @Test

Reply via email to