Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/service/ActiveRepairService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cfeab60 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cfeab60 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cfeab60 Branch: refs/heads/trunk Commit: 0cfeab60a44bf80cdd60a7887012f33db3fc57ab Parents: 6e9aec3 8c003a2 Author: Yuki Morishita <yu...@apache.org> Authored: Mon Feb 2 16:56:46 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Feb 2 16:56:46 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/service/ActiveRepairService.java | 3 +++ 2 files changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cfeab60/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cfeab60/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index fa9be8a,15e7641..1882a7b --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -109,40 -120,29 +109,43 @@@ public class ActiveRepairServic * * @return Future for asynchronous call or null if there is no need to repair */ - public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames) + public RepairSession submitRepairSession(UUID parentRepairSession, + Range<Token> range, + String keyspace, + RepairParallelism parallelismDegree, + Set<InetAddress> endpoints, + long repairedAt, + ListeningExecutorService executor, + String... cfnames) { - if (cfnames.length == 0) + if (endpoints.isEmpty()) return null; - RepairSession session = new RepairSession(parentRepairSession, range, keyspace, parallelismDegree, endpoints, cfnames); - if (session.endpoints.isEmpty()) + ++ if (cfnames.length == 0) + return null; - RepairFuture futureTask = new RepairFuture(session); - executor.execute(futureTask); - return futureTask; - } + - public void addToActiveSessions(RepairSession session) - { + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, cfnames); + sessions.put(session.getId(), session); - Gossiper.instance.register(session); - FailureDetector.instance.registerFailureDetectionEventListener(session); - } + // register listeners + gossiper.register(session); + failureDetector.registerFailureDetectionEventListener(session); - public void removeFromActiveSessions(RepairSession session) - { - Gossiper.instance.unregister(session); - sessions.remove(session.getId()); + // unregister listeners at completion + session.addListener(new Runnable() + { + /** + * When repair finished, do clean up + */ + public void run() + { + failureDetector.unregisterFailureDetectionEventListener(session); + gossiper.unregister(session); + sessions.remove(session.getId()); + } + }, MoreExecutors.sameThreadExecutor()); + session.start(executor); + return session; } public synchronized void terminateSessions()