Repository: cassandra Updated Branches: refs/heads/trunk e6ba61629 -> 20d5ce8b9
When lost notifications occur and periodically, check for the parent repair status and exit if we've completed/failed Patch by Matt Byrd, reviewed by Chris Lohfink for CASSANDRA-13480 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20d5ce8b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20d5ce8b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20d5ce8b Branch: refs/heads/trunk Commit: 20d5ce8b9b587be2f0b7bc5765254e8dc6e0bd3b Parents: e6ba616 Author: Matt Byrd <matthew.l.b...@gmail.com> Authored: Fri May 26 17:03:17 2017 -0700 Committer: Jeff Jirsa <jji...@apple.com> Committed: Thu Jun 29 12:15:32 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairRunnable.java | 115 ++++++++++++------- .../cassandra/service/ActiveRepairService.java | 30 +++++ .../cassandra/service/StorageService.java | 9 ++ .../cassandra/service/StorageServiceMBean.java | 11 +- .../org/apache/cassandra/tools/NodeProbe.java | 3 + .../apache/cassandra/tools/RepairRunner.java | 73 ++++++++++-- .../service/ActiveRepairServiceTest.java | 26 ++++- 8 files changed, 214 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6957f48..211769c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,6 +79,7 @@ * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720) * Trivial format error in StorageProxy (CASSANDRA-13551) + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480) 3.11.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index a3b8f22..eca162e 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -109,7 +110,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti listeners.remove(listener); } - protected void fireProgressEvent(String tag, ProgressEvent event) + + protected void fireProgressEvent(ProgressEvent event) { for (ProgressListener listener : listeners) { @@ -117,14 +119,17 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } } - protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) + protected void fireErrorAndComplete(int progressCount, int totalProgress, String message) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, String.format("Repair command #%d finished with error", cmd))); + fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); + String completionMessage = String.format("Repair command #%d finished with error", cmd); + fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, completionMessage)); + recordFailure(message, completionMessage); } protected void runMayThrow() throws Exception { + ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of()); final TraceState traceState; final UUID parentSession = UUIDGen.getTimeUUID(); final String tag = "repair:" + cmd; @@ -142,7 +147,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti catch (IllegalArgumentException e) { logger.error("Repair failed:", e); - fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + fireErrorAndComplete(progress.get(), totalProgress, e.getMessage()); return; } @@ -160,7 +165,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2))); message = message + " tracing with " + sessionId; - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message)); Tracing.traceRepair(message); traceState.enableActivityNotification(tag); for (ProgressListener listener : listeners) @@ -171,7 +176,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } else { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message)); traceState = null; } @@ -199,7 +204,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti catch (IllegalArgumentException e) { logger.error("Repair failed:", e); - fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + fireErrorAndComplete(progress.get(), totalProgress, e.getMessage()); return; } @@ -212,7 +217,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } catch (IllegalArgumentException e) { - fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + fireErrorAndComplete(progress.get(), totalProgress, e.getMessage()); return; } @@ -240,7 +245,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { SystemDistributedKeyspace.failParentRepair(parentSession, t); } - fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); + fireErrorAndComplete(progress.get(), totalProgress, t.getMessage()); return; } @@ -352,22 +357,27 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti SyncStatSummary summary = new SyncStatSummary(true); summary.consumeSessionResults(results); + final String message; if (summary.isEmpty()) { - String message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; + message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; logger.info(message); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); } else { - String message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString(); + message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString(); logger.info(message); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); } - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, - "Repair preview completed successfully")); - complete(); + String successMessage = "Repair preview completed successfully"; + fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + successMessage)); + String completionMessage = complete(); + + ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, + ImmutableList.of(message, successMessage, completionMessage)); } catch (Throwable t) { @@ -378,20 +388,22 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onFailure(Throwable t) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); logger.error("Error completing preview repair", t); - complete(); + String completionMessage = complete(); + recordFailure(t.getMessage(), completionMessage); } - private void complete() + private String complete() { logger.debug("Preview repair {} completed", parentSession); String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true); String message = String.format("Repair preview #%d finished in %s", cmd, duration); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); executor.shutdownNow(); + return message; } }); } @@ -447,10 +459,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRanges().toString()); logger.info(message); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, - progress.incrementAndGet(), - totalProgress, - message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); } public void onFailure(Throwable t) @@ -458,10 +470,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRanges().toString(), t.getMessage()); logger.error(message, t); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, - progress.incrementAndGet(), - totalProgress, - message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, + progress.incrementAndGet(), + totalProgress, + message)); } } @@ -495,36 +507,49 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); } + final String message; if (hasFailure.get()) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, - "Some repair failed")); + message = "Some repair failed"; + fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, + message)); } else { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, - "Repair completed successfully")); + message = "Repair completed successfully"; + fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + message)); + } + String completionMessage = repairComplete(); + if (hasFailure.get()) + { + recordFailure(message, completionMessage); + } + else + { + ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, + ImmutableList.of(message, completionMessage)); } - repairComplete(); } public void onFailure(Throwable t) { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); if (!options.isPreview()) { SystemDistributedKeyspace.failParentRepair(parentSession, t); } - repairComplete(); + String completionMessage = repairComplete(); + recordFailure(t.getMessage(), completionMessage); } - private void repairComplete() + private String repairComplete() { ActiveRepairService.instance.removeParentRepairSession(parentSession); long durationMillis = System.currentTimeMillis() - startTime; - String duration = DurationFormatUtils.formatDurationWords(durationMillis, true, true); + String duration = DurationFormatUtils.formatDurationWords(durationMillis,true, true); String message = String.format("Repair command #%d finished in %s", cmd, duration); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); logger.info(message); if (options.isTraced() && traceState != null) { @@ -541,9 +566,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } executor.shutdownNow(); Keyspace.open(keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS); + return message; } } + private void recordFailure(String failureMessage, String completionMessage) + { + // Note we rely on the first message being the reason for the failure + // when inspecting this state from RepairRunner.queryForCompletedRepair + ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.FAILED, + ImmutableList.of(failureMessage, completionMessage)); + } + private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) { for (int i = 0; i < neighborRangeList.size(); i++) @@ -622,8 +656,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti if (seen[si == 0 ? 1 : 0].contains(uuid)) continue; String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); - fireProgressEvent("repair:" + cmd, - new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message)); + fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message)); } tlast = tcur; http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index aadf7c1..a397ca2 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -29,6 +29,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Predicate; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; @@ -72,6 +74,7 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; /** @@ -90,6 +93,12 @@ import org.apache.cassandra.utils.UUIDGen; */ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean { + + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + public static class ConsistentSessions { public final LocalSessions local = new LocalSessions(); @@ -118,11 +127,21 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai private final IFailureDetector failureDetector; private final Gossiper gossiper; + private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd; public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper) { this.failureDetector = failureDetector; this.gossiper = gossiper; + this.repairStatusByCmd = CacheBuilder.newBuilder() + .expireAfterWrite( + Long.getLong("cassandra.parent_repair_status_expiry_seconds", + TimeUnit.SECONDS.convert(1, TimeUnit.DAYS)), TimeUnit.SECONDS) + // using weight wouldn't work so well, since it doesn't reflect mutation of cached data + // see https://github.com/google/guava/wiki/CachesExplained + // We assume each entry is unlikely to be much more than 100 bytes, so bounding the size should be sufficient. + .maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000)) + .build(); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try @@ -230,6 +249,17 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai parentRepairSessions.clear(); } + public void recordRepairStatus(int cmd, ParentRepairStatus parentRepairStatus, List<String> messages) + { + repairStatusByCmd.put(cmd, Pair.create(parentRepairStatus, messages)); + } + + + Pair<ParentRepairStatus, List<String>> getRepairStatus(Integer cmd) + { + return repairStatusByCmd.getIfPresent(cmd); + } + /** * Return all of the neighbors with whom we share the provided range. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a7d4e9d..2b3e633 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3349,6 +3349,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE ActiveRepairService.instance.terminateSessions(); } + + @Nullable + public List<String> getParentRepairStatus(int cmd) + { + Pair<ActiveRepairService.ParentRepairStatus, List<String>> pair = ActiveRepairService.instance.getRepairStatus(cmd); + return pair == null ? null : + ImmutableList.<String>builder().add(pair.left.name()).addAll(pair.right).build(); + } + /* End of MBean interface methods */ /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index d94c263..9f66a7e 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; - +import javax.annotation.Nullable; import javax.management.NotificationEmitter; import javax.management.openmbean.TabularData; @@ -325,6 +325,15 @@ public interface StorageServiceMBean extends NotificationEmitter public void forceTerminateAllRepairSessions(); /** + * Get the status of a given parent repair session. + * @param cmd the int reference returned when issuing the repair + * @return status of parent repair from enum {@link org.apache.cassandra.repair.RepairRunnable.Status} + * followed by final message or messages of the session + */ + @Nullable + public List<String> getParentRepairStatus(int cmd); + + /** * transfer this node's data to other machines and remove it from service. * @param force Decommission even if this will reduce N to be less than RF. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index e603eae..fcae1f6 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -103,6 +103,9 @@ public class NodeProbe implements AutoCloseable private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi"; private static final String ssObjName = "org.apache.cassandra.db:type=StorageService"; private static final int defaultPort = 7199; + + static long JMX_NOTIFICATION_POLL_INTERVAL_SECONDS = Long.getLong("cassandra.nodetool.jmx_notification_poll_interval_seconds", TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES)); + final String host; final int port; private String username; http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/src/java/org/apache/cassandra/tools/RepairRunner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java index 961916a..ca894f0 100644 --- a/src/java/org/apache/cassandra/tools/RepairRunner.java +++ b/src/java/org/apache/cassandra/tools/RepairRunner.java @@ -20,9 +20,12 @@ package org.apache.cassandra.tools; import java.io.IOException; import java.io.PrintStream; import java.text.SimpleDateFormat; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.utils.concurrent.SimpleCondition; import org.apache.cassandra.utils.progress.ProgressEvent; @@ -40,7 +43,6 @@ public class RepairRunner extends JMXNotificationProgressListener private final Condition condition = new SimpleCondition(); private int cmd; - private volatile boolean hasNotificationLost; private volatile Exception error; public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String keyspace, Map<String, String> options) @@ -62,14 +64,14 @@ public class RepairRunner extends JMXNotificationProgressListener } else { - condition.await(); - if (error != null) + while (!condition.await(NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS, TimeUnit.SECONDS)) { - throw error; + queryForCompletedRepair(String.format("After waiting for poll interval of %s seconds", + NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS)); } - if (hasNotificationLost) + if (error != null) { - out.println(String.format("There were some lost notification(s). You should check server log for repair status of keyspace %s", keyspace)); + throw error; } } } @@ -83,7 +85,11 @@ public class RepairRunner extends JMXNotificationProgressListener @Override public void handleNotificationLost(long timestamp, String message) { - hasNotificationLost = true; + if (cmd > 0) + { + // Check to see if the lost notification was a completion message + queryForCompletedRepair("After receiving lost notification"); + } } @Override @@ -96,8 +102,8 @@ public class RepairRunner extends JMXNotificationProgressListener public void handleConnectionFailed(long timestamp, String message) { error = new IOException(String.format("[%s] JMX connection closed. You should check server log for repair status of keyspace %s" - + "(Subsequent keyspaces are not going to be repaired).", - format.format(timestamp), keyspace)); + + "(Subsequent keyspaces are not going to be repaired).", + format.format(timestamp), keyspace)); condition.signalAll(); } @@ -108,7 +114,7 @@ public class RepairRunner extends JMXNotificationProgressListener String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage()); if (type == ProgressEventType.PROGRESS) { - message = message + " (progress: " + (int)event.getProgressPercentage() + "%)"; + message = message + " (progress: " + (int) event.getProgressPercentage() + "%)"; } out.println(message); if (type == ProgressEventType.ERROR) @@ -120,4 +126,51 @@ public class RepairRunner extends JMXNotificationProgressListener condition.signalAll(); } } + + + private void queryForCompletedRepair(String triggeringCondition) + { + List<String> status = ssProxy.getParentRepairStatus(cmd); + String queriedString = "queried for parent session status and"; + if (status == null) + { + String message = String.format("[%s] %s %s couldn't find repair status for cmd: %s", triggeringCondition, + queriedString, format.format(System.currentTimeMillis()), cmd); + out.println(message); + } + else + { + ActiveRepairService.ParentRepairStatus parentRepairStatus = ActiveRepairService.ParentRepairStatus.valueOf(status.get(0)); + List<String> messages = status.subList(1, status.size()); + switch (parentRepairStatus) + { + case COMPLETED: + case FAILED: + out.println(String.format("[%s] %s %s discovered repair %s.", + this.format.format(System.currentTimeMillis()), triggeringCondition, + queriedString, parentRepairStatus.name().toLowerCase())); + if (parentRepairStatus == ActiveRepairService.ParentRepairStatus.FAILED) + { + error = new IOException(messages.get(0)); + } + printMessages(messages); + condition.signalAll(); + break; + case IN_PROGRESS: + break; + default: + out.println(String.format("[%s] WARNING Encountered unexpected RepairRunnable.ParentRepairStatus: %s", System.currentTimeMillis(), parentRepairStatus)); + printMessages(messages); + break; + } + } + } + + private void printMessages(List<String> messages) + { + for (String message : messages) + { + out.println(String.format("[%s] %s", this.format.format(System.currentTimeMillis()), message)); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/20d5ce8b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 9d5ccf4..57ffa7d 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.junit.Before; import org.junit.BeforeClass; @@ -45,8 +46,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public class ActiveRepairServiceTest { @@ -211,6 +212,27 @@ public class ActiveRepairServiceTest ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts); } + + @Test + public void testParentRepairStatus() throws Throwable + { + ActiveRepairService.instance.recordRepairStatus(1, ActiveRepairService.ParentRepairStatus.COMPLETED, ImmutableList.of("foo", "bar")); + List<String> res = StorageService.instance.getParentRepairStatus(1); + assertNotNull(res); + assertEquals(ActiveRepairService.ParentRepairStatus.COMPLETED, ActiveRepairService.ParentRepairStatus.valueOf(res.get(0))); + assertEquals("foo", res.get(1)); + assertEquals("bar", res.get(2)); + + List<String> emptyRes = StorageService.instance.getParentRepairStatus(44); + assertNull(emptyRes); + + ActiveRepairService.instance.recordRepairStatus(3, ActiveRepairService.ParentRepairStatus.FAILED, ImmutableList.of("some failure message", "bar")); + List<String> failed = StorageService.instance.getParentRepairStatus(3); + assertNotNull(failed); + assertEquals(ActiveRepairService.ParentRepairStatus.FAILED, ActiveRepairService.ParentRepairStatus.valueOf(failed.get(0))); + + } + Set<InetAddress> addTokens(int max) throws Throwable { TokenMetadata tmd = StorageService.instance.getTokenMetadata(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org