Repository: cassandra Updated Branches: refs/heads/trunk c718bfff9 -> c878b6968
StreamPlan for incremental repairs flushing memtables unnecessarily Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13226 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c878b696 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c878b696 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c878b696 Branch: refs/heads/trunk Commit: c878b6968be88fa89fb1d1d0212411bcbc4fae7c Parents: c718bff Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Feb 15 10:47:24 2017 -0800 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Feb 17 13:57:17 2017 -0800 ---------------------------------------------------------------------- .../cassandra/repair/StreamingRepairTask.java | 19 ++-- .../apache/cassandra/streaming/StreamPlan.java | 10 +++ .../compaction/AbstractPendingRepairTest.java | 4 +- ...pactionStrategyManagerPendingRepairTest.java | 14 +-- .../db/compaction/PendingRepairManagerTest.java | 22 ++--- .../cassandra/repair/AbstractRepairTest.java | 91 ++++++++++++++++++++ .../repair/StreamingRepairTaskTest.java | 89 +++++++++++++++++++ .../consistent/CoordinatorSessionTest.java | 3 +- .../consistent/CoordinatorSessionsTest.java | 5 +- .../repair/consistent/LocalSessionTest.java | 5 +- 10 files changed, 230 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index f24a79a..6bce1fa 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,13 +65,17 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); isIncremental = prs.isIncremental; } - new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null).listeners(this) - .flushBeforeTransfer(true) - // request ranges from the remote node - .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) - // send ranges to the remote node - .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) - .execute(); + createStreamPlan(dest, preferred, isIncremental).execute(); + } + + @VisibleForTesting + StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, boolean isIncremental) + { + return new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null) + .listeners(this) + .flushBeforeTransfer(!isIncremental) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary + .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node + .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily); // send ranges to the remote node } public void handleStreamEvent(StreamEvent event) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 5526da8..5a2ce77 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -202,4 +202,14 @@ public class StreamPlan this.flushBeforeTransfer = flushBeforeTransfer; return this; } + + public long getRepairedAt() + { + return repairedAt; + } + + public boolean getFlushBeforeTransfer() + { + return flushBeforeTransfer; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java index 08be550..75f555d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@ -40,13 +40,13 @@ import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.consistent.AbstractConsistentSessionTest; +import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; @Ignore -public class AbstractPendingRepairTest extends AbstractConsistentSessionTest +public class AbstractPendingRepairTest extends AbstractRepairTest { protected String ks; protected final String tbl = "tbl"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index 8f53781..27bff20 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -73,7 +73,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void sstableAdded() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); Assert.assertTrue(csm.pendingRepairs().isEmpty()); @@ -97,7 +97,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void sstableListChangedAddAndRemove() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable1 = makeSSTable(true); @@ -144,7 +144,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void sstableRepairStatusChanged() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); // add as unrepaired @@ -176,7 +176,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void sstableDeleted() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); @@ -199,7 +199,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void getStrategies() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); List<List<AbstractCompactionStrategy>> strategies; @@ -224,7 +224,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void cleanupCompactionFinalized() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -260,7 +260,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR @Test public void cleanupCompactionFailed() { - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java index 1c2f02e..1b43217 100644 --- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@ -41,7 +41,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -59,7 +59,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -78,7 +78,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -102,7 +102,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -118,7 +118,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -136,13 +136,13 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); prm.addSSTable(sstable); - repairID = registerSession(cfs); + repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -170,7 +170,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairID = registerSession(cfs); + UUID repairID = registerSession(cfs, true, true); LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); @@ -186,7 +186,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest public void userDefinedTaskTest() { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairId = registerSession(cfs); + UUID repairId = registerSession(cfs, true, true); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairId); prm.addSSTable(sstable); @@ -198,8 +198,8 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest public void mixedPendingSessionsTest() { PendingRepairManager prm = csm.getPendingRepairManagers().get(0); - UUID repairId = registerSession(cfs); - UUID repairId2 = registerSession(cfs); + UUID repairId = registerSession(cfs, true, true); + UUID repairId2 = registerSession(cfs, true, true); SSTableReader sstable = makeSSTable(true); SSTableReader sstable2 = makeSSTable(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java new file mode 100644 index 0000000..1c508a0 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Ignore; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +@Ignore +public abstract class AbstractRepairTest +{ + protected static final InetAddress COORDINATOR; + protected static final InetAddress PARTICIPANT1; + protected static final InetAddress PARTICIPANT2; + protected static final InetAddress PARTICIPANT3; + + static + { + try + { + COORDINATOR = InetAddress.getByName("10.0.0.1"); + PARTICIPANT1 = InetAddress.getByName("10.0.0.1"); + PARTICIPANT2 = InetAddress.getByName("10.0.0.2"); + PARTICIPANT3 = InetAddress.getByName("10.0.0.3"); + } + catch (UnknownHostException e) + { + + throw new AssertionError(e); + } + + DatabaseDescriptor.daemonInitialization(); + } + + protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); + + protected static Token t(int v) + { + return DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v)); + } + + protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2)); + protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3)); + protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5)); + + protected static UUID registerSession(ColumnFamilyStore cfs, boolean isIncremental, boolean isGlobal) + { + UUID sessionId = UUIDGen.getTimeUUID(); + + long repairedAt = isIncremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE; + ActiveRepairService.instance.registerParentRepairSession(sessionId, + COORDINATOR, + Lists.newArrayList(cfs), + Sets.newHashSet(RANGE1, RANGE2, RANGE3), + isIncremental, + repairedAt, + isGlobal); + return sessionId; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java new file mode 100644 index 0000000..90988ae --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.utils.UUIDGen; + +public class StreamingRepairTaskTest extends AbstractRepairTest +{ + protected String ks; + protected final String tbl = "tbl"; + protected TableMetadata cfm; + protected ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + } + + @Before + public void setup() + { + ks = "ks_" + System.currentTimeMillis(); + cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + @Test + public void incrementalStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); + SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); + StreamingRepairTask task = new StreamingRepairTask(desc, request, prs.getRepairedAt(), prs.isIncremental); + + StreamPlan plan = task.createStreamPlan(request.src, request.dst, prs.isIncremental); + Assert.assertFalse(plan.getFlushBeforeTransfer()); + Assert.assertEquals(prs.repairedAt, plan.getRepairedAt()); + + } + + @Test + public void fullStreamPlan() throws Exception + { + UUID sessionID = registerSession(cfs, false, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); + SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); + StreamingRepairTask task = new StreamingRepairTask(desc, request, prs.getRepairedAt(), prs.isIncremental); + + StreamPlan plan = task.createStreamPlan(request.src, request.dst, prs.isIncremental); + Assert.assertTrue(plan.getFlushBeforeTransfer()); + Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, plan.getRepairedAt()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java index b570920..4f5b7e6 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.messages.FailSession; import org.apache.cassandra.repair.messages.FinalizeCommit; @@ -47,7 +48,7 @@ import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; -public class CoordinatorSessionTest extends AbstractConsistentSessionTest +public class CoordinatorSessionTest extends AbstractRepairTest { static CoordinatorSession.Builder createBuilder() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java index dfa9bc5..b7adb27 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; @@ -40,7 +41,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.UUIDGen; -public class CoordinatorSessionsTest extends AbstractConsistentSessionTest +public class CoordinatorSessionsTest extends AbstractRepairTest { private static TableMetadata cfm; private static ColumnFamilyStore cfs; @@ -110,7 +111,7 @@ public class CoordinatorSessionsTest extends AbstractConsistentSessionTest private static UUID registerSession() { - return registerSession(cfs); + return registerSession(cfs, true, true); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/c878b696/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index a85e01b..2a4ce9a 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; @@ -64,7 +65,7 @@ import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; -public class LocalSessionTest extends AbstractConsistentSessionTest +public class LocalSessionTest extends AbstractRepairTest { static LocalSession.Builder createBuilder() @@ -196,7 +197,7 @@ public class LocalSessionTest extends AbstractConsistentSessionTest private static UUID registerSession() { - return registerSession(cfs); + return registerSession(cfs, true, true); } @Test