Repository: cassandra
Updated Branches:
  refs/heads/trunk e6ba61629 -> 20d5ce8b9


When lost notifications occur and periodically, check for the parent repair 
status and exit if we've completed/failed

Patch by Matt Byrd, reviewed by Chris Lohfink for CASSANDRA-13480


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20d5ce8b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20d5ce8b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20d5ce8b

Branch: refs/heads/trunk
Commit: 20d5ce8b9b587be2f0b7bc5765254e8dc6e0bd3b
Parents: e6ba616
Author: Matt Byrd <matthew.l.b...@gmail.com>
Authored: Fri May 26 17:03:17 2017 -0700
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Thu Jun 29 12:15:32 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/repair/RepairRunnable.java | 115 ++++++++++++-------
 .../cassandra/service/ActiveRepairService.java  |  30 +++++
 .../cassandra/service/StorageService.java       |   9 ++
 .../cassandra/service/StorageServiceMBean.java  |  11 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   3 +
 .../apache/cassandra/tools/RepairRunner.java    |  73 ++++++++++--
 .../service/ActiveRepairServiceTest.java        |  26 ++++-
 8 files changed, 214 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6957f48..211769c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -79,6 +79,7 @@
  * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
  * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
  * Trivial format error in StorageProxy (CASSANDRA-13551)
+ * Nodetool repair can hang forever if we lose the notification for the repair 
completing/failing (CASSANDRA-13480)
 
 
 3.11.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index a3b8f22..eca162e 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -109,7 +110,8 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         listeners.remove(listener);
     }
 
-    protected void fireProgressEvent(String tag, ProgressEvent event)
+
+    protected void fireProgressEvent(ProgressEvent event)
     {
         for (ProgressListener listener : listeners)
         {
@@ -117,14 +119,17 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         }
     }
 
-    protected void fireErrorAndComplete(String tag, int progressCount, int 
totalProgress, String message)
+    protected void fireErrorAndComplete(int progressCount, int totalProgress, 
String message)
     {
-        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, 
progressCount, totalProgress, message));
-        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, 
progressCount, totalProgress, String.format("Repair command #%d finished with 
error", cmd)));
+        fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, 
progressCount, totalProgress, message));
+        String completionMessage = String.format("Repair command #%d finished 
with error", cmd);
+        fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, 
progressCount, totalProgress, completionMessage));
+        recordFailure(message, completionMessage);
     }
 
     protected void runMayThrow() throws Exception
     {
+        ActiveRepairService.instance.recordRepairStatus(cmd, 
ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
         final TraceState traceState;
         final UUID parentSession = UUIDGen.getTimeUUID();
         final String tag = "repair:" + cmd;
@@ -142,7 +147,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         catch (IllegalArgumentException e)
         {
             logger.error("Repair failed:", e);
-            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
+            fireErrorAndComplete(progress.get(), totalProgress, 
e.getMessage());
             return;
         }
 
@@ -160,7 +165,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             traceState = Tracing.instance.begin("repair", 
ImmutableMap.of("keyspace", keyspace, "columnFamilies",
                                                                           
cfsb.substring(2)));
             message = message + " tracing with " + sessionId;
-            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 
0, 100, message));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 
100, message));
             Tracing.traceRepair(message);
             traceState.enableActivityNotification(tag);
             for (ProgressListener listener : listeners)
@@ -171,7 +176,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         }
         else
         {
-            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 
0, 100, message));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 
100, message));
             traceState = null;
         }
 
@@ -199,7 +204,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         catch (IllegalArgumentException e)
         {
             logger.error("Repair failed:", e);
-            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
+            fireErrorAndComplete(progress.get(), totalProgress, 
e.getMessage());
             return;
         }
 
@@ -212,7 +217,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         }
         catch (IllegalArgumentException e)
         {
-            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
+            fireErrorAndComplete(progress.get(), totalProgress, 
e.getMessage());
             return;
         }
 
@@ -240,7 +245,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             {
                 SystemDistributedKeyspace.failParentRepair(parentSession, t);
             }
-            fireErrorAndComplete(tag, progress.get(), totalProgress, 
t.getMessage());
+            fireErrorAndComplete(progress.get(), totalProgress, 
t.getMessage());
             return;
         }
 
@@ -352,22 +357,27 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                     SyncStatSummary summary = new SyncStatSummary(true);
                     summary.consumeSessionResults(results);
 
+                    final String message;
                     if (summary.isEmpty())
                     {
-                        String message = previewKind == PreviewKind.REPAIRED ? 
"Repaired data is in sync" : "Previewed data was in sync";
+                        message = previewKind == PreviewKind.REPAIRED ? 
"Repaired data is in sync" : "Previewed data was in sync";
                         logger.info(message);
-                        fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
+                        fireProgressEvent(new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
                     }
                     else
                     {
-                        String message =  (previewKind == PreviewKind.REPAIRED 
? "Repaired data is inconsistent\n" : "Preview complete\n") + 
summary.toString();
+                        message =  (previewKind == PreviewKind.REPAIRED ? 
"Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
                         logger.info(message);
-                        fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
+                        fireProgressEvent(new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
                     }
 
-                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
-                                                             "Repair preview 
completed successfully"));
-                    complete();
+                    String successMessage = "Repair preview completed 
successfully";
+                    fireProgressEvent(new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+                                                        successMessage));
+                    String completionMessage = complete();
+
+                    ActiveRepairService.instance.recordRepairStatus(cmd, 
ActiveRepairService.ParentRepairStatus.COMPLETED,
+                                                           
ImmutableList.of(message, successMessage, completionMessage));
                 }
                 catch (Throwable t)
                 {
@@ -378,20 +388,22 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
             public void onFailure(Throwable t)
             {
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, 
t.getMessage()));
+                fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress, t.getMessage()));
                 logger.error("Error completing preview repair", t);
-                complete();
+                String completionMessage = complete();
+                recordFailure(t.getMessage(), completionMessage);
             }
 
-            private void complete()
+            private String complete()
             {
                 logger.debug("Preview repair {} completed", parentSession);
 
                 String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
                                                                           
true, true);
                 String message = String.format("Repair preview #%d finished in 
%s", cmd, duration);
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
+                fireProgressEvent(new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
                 executor.shutdownNow();
+                return message;
             }
         });
     }
@@ -447,10 +459,10 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             String message = String.format("Repair session %s for range %s 
finished", session.getId(),
                                            session.getRanges().toString());
             logger.info(message);
-            fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
-                                                     
progress.incrementAndGet(),
-                                                     totalProgress,
-                                                     message));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
+                                                progress.incrementAndGet(),
+                                                totalProgress,
+                                                message));
         }
 
         public void onFailure(Throwable t)
@@ -458,10 +470,10 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             String message = String.format("Repair session %s for range %s 
failed with error %s",
                                            session.getId(), 
session.getRanges().toString(), t.getMessage());
             logger.error(message, t);
-            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR,
-                                                     
progress.incrementAndGet(),
-                                                     totalProgress,
-                                                     message));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR,
+                                                progress.incrementAndGet(),
+                                                totalProgress,
+                                                message));
         }
     }
 
@@ -495,36 +507,49 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             {
                 
SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
             }
+            final String message;
             if (hasFailure.get())
             {
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
-                                                         "Some repair 
failed"));
+                message = "Some repair failed";
+                fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress,
+                                                    message));
             }
             else
             {
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
-                                                         "Repair completed 
successfully"));
+                message = "Repair completed successfully";
+                fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, 
progress.get(), totalProgress,
+                                                    message));
+            }
+            String completionMessage = repairComplete();
+            if (hasFailure.get())
+            {
+                recordFailure(message, completionMessage);
+            }
+            else
+            {
+                ActiveRepairService.instance.recordRepairStatus(cmd, 
ActiveRepairService.ParentRepairStatus.COMPLETED,
+                                                       
ImmutableList.of(message, completionMessage));
             }
-            repairComplete();
         }
 
         public void onFailure(Throwable t)
         {
-            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress, t.getMessage()));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress, t.getMessage()));
             if (!options.isPreview())
             {
                 SystemDistributedKeyspace.failParentRepair(parentSession, t);
             }
-            repairComplete();
+            String completionMessage = repairComplete();
+            recordFailure(t.getMessage(), completionMessage);
         }
 
-        private void repairComplete()
+        private String repairComplete()
         {
             
ActiveRepairService.instance.removeParentRepairSession(parentSession);
             long durationMillis = System.currentTimeMillis() - startTime;
-            String duration = 
DurationFormatUtils.formatDurationWords(durationMillis, true, true);
+            String duration = 
DurationFormatUtils.formatDurationWords(durationMillis,true, true);
             String message = String.format("Repair command #%d finished in 
%s", cmd, duration);
-            fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
+            fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, 
progress.get(), totalProgress, message));
             logger.info(message);
             if (options.isTraced() && traceState != null)
             {
@@ -541,9 +566,18 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             }
             executor.shutdownNow();
             Keyspace.open(keyspace).metric.repairTime.update(durationMillis, 
TimeUnit.MILLISECONDS);
+            return message;
         }
     }
 
+    private void recordFailure(String failureMessage, String completionMessage)
+    {
+        // Note we rely on the first message being the reason for the failure
+        // when inspecting this state from RepairRunner.queryForCompletedRepair
+        ActiveRepairService.instance.recordRepairStatus(cmd, 
ActiveRepairService.ParentRepairStatus.FAILED,
+                                               
ImmutableList.of(failureMessage, completionMessage));
+    }
+
     private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> neighborRangeList, Range<Token> range, 
Set<InetAddress> neighbors)
     {
         for (int i = 0; i < neighborRangeList.size(); i++)
@@ -622,8 +656,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                         if (seen[si == 0 ? 1 : 0].contains(uuid))
                             continue;
                         String message = String.format("%s: %s", 
r.getInetAddress("source"), r.getString("activity"));
-                        fireProgressEvent("repair:" + cmd,
-                                          new 
ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+                        fireProgressEvent(new 
ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
                     }
                     tlast = tcur;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index aadf7c1..a397ca2 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
@@ -72,6 +74,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -90,6 +93,12 @@ import org.apache.cassandra.utils.UUIDGen;
  */
 public class ActiveRepairService implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener, ActiveRepairServiceMBean
 {
+
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
     public static class ConsistentSessions
     {
         public final LocalSessions local = new LocalSessions();
@@ -118,11 +127,21 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
 
     private final IFailureDetector failureDetector;
     private final Gossiper gossiper;
+    private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> 
repairStatusByCmd;
 
     public ActiveRepairService(IFailureDetector failureDetector, Gossiper 
gossiper)
     {
         this.failureDetector = failureDetector;
         this.gossiper = gossiper;
+        this.repairStatusByCmd = CacheBuilder.newBuilder()
+                                             .expireAfterWrite(
+                                             
Long.getLong("cassandra.parent_repair_status_expiry_seconds",
+                                                          
TimeUnit.SECONDS.convert(1, TimeUnit.DAYS)), TimeUnit.SECONDS)
+                                             // using weight wouldn't work so 
well, since it doesn't reflect mutation of cached data
+                                             // see 
https://github.com/google/guava/wiki/CachesExplained
+                                             // We assume each entry is 
unlikely to be much more than 100 bytes, so bounding the size should be 
sufficient.
+                                             
.maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
+                                             .build();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -230,6 +249,17 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         parentRepairSessions.clear();
     }
 
+    public void recordRepairStatus(int cmd, ParentRepairStatus 
parentRepairStatus, List<String> messages)
+    {
+        repairStatusByCmd.put(cmd, Pair.create(parentRepairStatus, messages));
+    }
+
+
+    Pair<ParentRepairStatus, List<String>> getRepairStatus(Integer cmd)
+    {
+        return repairStatusByCmd.getIfPresent(cmd);
+    }
+
     /**
      * Return all of the neighbors with whom we share the provided range.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index a7d4e9d..2b3e633 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3349,6 +3349,15 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         ActiveRepairService.instance.terminateSessions();
     }
 
+
+    @Nullable
+    public List<String> getParentRepairStatus(int cmd)
+    {
+        Pair<ActiveRepairService.ParentRepairStatus, List<String>> pair = 
ActiveRepairService.instance.getRepairStatus(cmd);
+        return pair == null ? null :
+               
ImmutableList.<String>builder().add(pair.left.name()).addAll(pair.right).build();
+    }
+
     /* End of MBean interface methods */
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index d94c263..9f66a7e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-
+import javax.annotation.Nullable;
 import javax.management.NotificationEmitter;
 import javax.management.openmbean.TabularData;
 
@@ -325,6 +325,15 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public void forceTerminateAllRepairSessions();
 
     /**
+     * Get the status of a given parent repair session.
+     * @param cmd the int reference returned when issuing the repair
+     * @return status of parent repair from enum {@link 
org.apache.cassandra.repair.RepairRunnable.Status}
+     * followed by final message or messages of the session
+     */
+    @Nullable
+    public List<String> getParentRepairStatus(int cmd);
+
+    /**
      * transfer this node's data to other machines and remove it from service.
      * @param force Decommission even if this will reduce N to be less than RF.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index e603eae..fcae1f6 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -103,6 +103,9 @@ public class NodeProbe implements AutoCloseable
     private static final String fmtUrl = 
"service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi";
     private static final String ssObjName = 
"org.apache.cassandra.db:type=StorageService";
     private static final int defaultPort = 7199;
+
+    static long JMX_NOTIFICATION_POLL_INTERVAL_SECONDS = 
Long.getLong("cassandra.nodetool.jmx_notification_poll_interval_seconds", 
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES));
+
     final String host;
     final int port;
     private String username;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/tools/RepairRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java 
b/src/java/org/apache/cassandra/tools/RepairRunner.java
index 961916a..ca894f0 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.tools;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.text.SimpleDateFormat;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -40,7 +43,6 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     private final Condition condition = new SimpleCondition();
 
     private int cmd;
-    private volatile boolean hasNotificationLost;
     private volatile Exception error;
 
     public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String 
keyspace, Map<String, String> options)
@@ -62,14 +64,14 @@ public class RepairRunner extends 
JMXNotificationProgressListener
         }
         else
         {
-            condition.await();
-            if (error != null)
+            while 
(!condition.await(NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS, 
TimeUnit.SECONDS))
             {
-                throw error;
+                queryForCompletedRepair(String.format("After waiting for poll 
interval of %s seconds",
+                                                      
NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS));
             }
-            if (hasNotificationLost)
+            if (error != null)
             {
-                out.println(String.format("There were some lost 
notification(s). You should check server log for repair status of keyspace %s", 
keyspace));
+                throw error;
             }
         }
     }
@@ -83,7 +85,11 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     @Override
     public void handleNotificationLost(long timestamp, String message)
     {
-        hasNotificationLost = true;
+        if (cmd > 0)
+        {
+            // Check to see if the lost notification was a completion message
+            queryForCompletedRepair("After receiving lost notification");
+        }
     }
 
     @Override
@@ -96,8 +102,8 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     public void handleConnectionFailed(long timestamp, String message)
     {
         error = new IOException(String.format("[%s] JMX connection closed. You 
should check server log for repair status of keyspace %s"
-                                               + "(Subsequent keyspaces are 
not going to be repaired).",
-                                  format.format(timestamp), keyspace));
+                                              + "(Subsequent keyspaces are not 
going to be repaired).",
+                                              format.format(timestamp), 
keyspace));
         condition.signalAll();
     }
 
@@ -108,7 +114,7 @@ public class RepairRunner extends 
JMXNotificationProgressListener
         String message = String.format("[%s] %s", 
format.format(System.currentTimeMillis()), event.getMessage());
         if (type == ProgressEventType.PROGRESS)
         {
-            message = message + " (progress: " + 
(int)event.getProgressPercentage() + "%)";
+            message = message + " (progress: " + (int) 
event.getProgressPercentage() + "%)";
         }
         out.println(message);
         if (type == ProgressEventType.ERROR)
@@ -120,4 +126,51 @@ public class RepairRunner extends 
JMXNotificationProgressListener
             condition.signalAll();
         }
     }
+
+
+    private void queryForCompletedRepair(String triggeringCondition)
+    {
+        List<String> status = ssProxy.getParentRepairStatus(cmd);
+        String queriedString = "queried for parent session status and";
+        if (status == null)
+        {
+            String message = String.format("[%s] %s %s couldn't find repair 
status for cmd: %s", triggeringCondition,
+                                           queriedString, 
format.format(System.currentTimeMillis()), cmd);
+            out.println(message);
+        }
+        else
+        {
+            ActiveRepairService.ParentRepairStatus parentRepairStatus = 
ActiveRepairService.ParentRepairStatus.valueOf(status.get(0));
+            List<String> messages = status.subList(1, status.size());
+            switch (parentRepairStatus)
+            {
+                case COMPLETED:
+                case FAILED:
+                    out.println(String.format("[%s] %s %s discovered repair 
%s.",
+                                              
this.format.format(System.currentTimeMillis()), triggeringCondition,
+                                              queriedString, 
parentRepairStatus.name().toLowerCase()));
+                    if (parentRepairStatus == 
ActiveRepairService.ParentRepairStatus.FAILED)
+                    {
+                        error = new IOException(messages.get(0));
+                    }
+                    printMessages(messages);
+                    condition.signalAll();
+                    break;
+                case IN_PROGRESS:
+                    break;
+                default:
+                    out.println(String.format("[%s] WARNING Encountered 
unexpected RepairRunnable.ParentRepairStatus: %s", System.currentTimeMillis(), 
parentRepairStatus));
+                    printMessages(messages);
+                    break;
+            }
+        }
+    }
+
+    private void printMessages(List<String> messages)
+    {
+        for (String message : messages)
+        {
+            out.println(String.format("[%s] %s", 
this.format.format(System.currentTimeMillis()), message));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 9d5ccf4..57ffa7d 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.*;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -45,8 +46,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class ActiveRepairServiceTest
 {
@@ -211,6 +212,27 @@ public class ActiveRepairServiceTest
         ActiveRepairService.getNeighbors(KEYSPACE5, ranges, 
ranges.iterator().next(), null, hosts);
     }
 
+
+    @Test
+    public void testParentRepairStatus() throws Throwable
+    {
+        ActiveRepairService.instance.recordRepairStatus(1, 
ActiveRepairService.ParentRepairStatus.COMPLETED, ImmutableList.of("foo", 
"bar"));
+        List<String> res = StorageService.instance.getParentRepairStatus(1);
+        assertNotNull(res);
+        assertEquals(ActiveRepairService.ParentRepairStatus.COMPLETED, 
ActiveRepairService.ParentRepairStatus.valueOf(res.get(0)));
+        assertEquals("foo", res.get(1));
+        assertEquals("bar", res.get(2));
+
+        List<String> emptyRes = 
StorageService.instance.getParentRepairStatus(44);
+        assertNull(emptyRes);
+
+        ActiveRepairService.instance.recordRepairStatus(3, 
ActiveRepairService.ParentRepairStatus.FAILED, ImmutableList.of("some failure 
message", "bar"));
+        List<String> failed = StorageService.instance.getParentRepairStatus(3);
+        assertNotNull(failed);
+        assertEquals(ActiveRepairService.ParentRepairStatus.FAILED, 
ActiveRepairService.ParentRepairStatus.valueOf(failed.get(0)));
+
+    }
+
     Set<InetAddress> addTokens(int max) throws Throwable
     {
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to