Author: slebresne
Date: Fri Dec  2 10:52:57 2011
New Revision: 1209399

URL: http://svn.apache.org/viewvc?rev=1209399&view=rev
Log:
fix potential race in AES when repair fails
patch by slebresne; reviewed by amorton for CASSANDRA-3548

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1209399&r1=1209398&r2=1209399&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Dec  2 10:52:57 2011
@@ -7,6 +7,7 @@
    be qualified by keyspace (CASSANDRA-3419)
  * always remove endpoints from delevery queue in HH (CASSANDRA-3546)
  * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547)
+ * fix potential race in AES when a repair fails (CASSANDRA-3548)
 
 
 1.0.5

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1209399&r1=1209398&r2=1209399&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Fri Dec  2 10:52:57 2011
@@ -180,14 +180,15 @@ public class AntiEntropyService
             return;
         }
 
-        if (session.terminated())
+        RepairSession.RepairJob job = session.jobs.peek();
+        if (job == null)
+        {
+            assert session.terminated();
             return;
+        }
 
         logger.info(String.format("[repair #%s] Received merkle tree for %s 
from %s", session.getName(), request.cf.right, request.endpoint));
 
-        RepairSession.RepairJob job = session.jobs.peek();
-        assert job != null : "A repair should have at least some jobs 
scheduled";
-
         if (job.addTree(request, tree) == 0)
         {
             logger.debug("All trees received for " + session.getName() + "/" + 
request.cf.right);
@@ -704,14 +705,14 @@ public class AntiEntropyService
             }
             catch (InterruptedException e)
             {
-                throw new RuntimeException("Interrupted while waiting for 
repair: repair will continue in the background.");
+                throw new RuntimeException("Interrupted while waiting for 
repair.");
             }
             finally
             {
+                // mark this session as terminated
+                terminate();
                 
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
                 Gossiper.instance.unregister(this);
-                // mark this session as terminated
-                terminated = true;
                 AntiEntropyService.instance.sessions.remove(getName());
             }
         }
@@ -724,28 +725,36 @@ public class AntiEntropyService
             return terminated;
         }
 
+        public void terminate()
+        {
+            terminated = true;
+            jobs.clear();
+            activeJobs.clear();
+        }
+
         /**
          * clear all RepairJobs and terminate this session.
          */
         public void forceShutdown()
         {
-            jobs.clear();
-            activeJobs.clear();
             differencingDone.signalAll();
             completed.signalAll();
         }
 
         void completed(Differencer differencer)
         {
-            if (terminated)
-                return;
-
             logger.debug(String.format("[repair #%s] Repair completed between 
%s and %s on %s",
                                        getName(),
                                        differencer.r1.endpoint,
                                        differencer.r2.endpoint,
                                        differencer.cfname));
             RepairJob job = activeJobs.get(differencer.cfname);
+            if (job == null)
+            {
+                assert terminated;
+                return;
+            }
+
             if (job.completedSynchronization(differencer))
             {
                 activeJobs.remove(differencer.cfname);


Reply via email to