Handle JMX notification failure for repair patch by yukim; reviewed by pcmanus for CASSANDRA-6097
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe19b8a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe19b8a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe19b8a Branch: refs/heads/trunk Commit: fbe19b8ad7cd01f810109731b64acaaa4edf8db2 Parents: 1bba280 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Oct 3 11:26:10 2013 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Oct 9 10:34:12 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 57 ++++++++++++++------ 2 files changed, 42 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe19b8a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 59cc0f1..f0ee993 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Update sstablesPerReadHistogram to use biased sampling (CASSANDRA-6164) * Log UnknownColumnfamilyException when closing socket (CASSANDRA-5725) * Properly error out on CREATE INDEX for counters table (CASSANDRA-6160) + * Handle JMX notification failure for repair (CASSANDRA-6097) 1.2.10 http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe19b8a/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 7829b60..1557a6a 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import javax.management.*; +import javax.management.remote.JMXConnectionNotification; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -217,6 +218,7 @@ public class NodeProbe RepairRunner runner = new RepairRunner(out, tableName, columnFamilies); try { + jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange)) failed = true; @@ -229,9 +231,10 @@ public class NodeProbe { try { - ssProxy.removeNotificationListener(runner); + ssProxy.removeNotificationListener(runner); + jmxc.removeConnectionNotificationListener(runner); } - catch (ListenerNotFoundException ignored) {} + catch (Throwable ignored) {} } } @@ -240,6 +243,7 @@ public class NodeProbe RepairRunner runner = new RepairRunner(out, tableName, columnFamilies); try { + jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); if (!runner.repairRangeAndWait(ssProxy, isSequential, isLocal, startToken, endToken)) failed = true; @@ -253,8 +257,9 @@ public class NodeProbe try { ssProxy.removeNotificationListener(runner); + jmxc.removeConnectionNotificationListener(runner); } - catch (ListenerNotFoundException ignored) {} + catch (Throwable ignored) {} } } @@ -985,7 +990,8 @@ class RepairRunner implements NotificationListener private final String keyspace; private final String[] columnFamilies; private int cmd; - private boolean success = true; + private volatile boolean success = true; + private volatile Exception error = null; RepairRunner(PrintStream out, String keyspace, String... columnFamilies) { @@ -994,24 +1000,22 @@ class RepairRunner implements NotificationListener this.columnFamilies = columnFamilies; } - public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws InterruptedException + public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception { cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies); - if (cmd > 0) - { - condition.await(); - } - else - { - String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); - out.println(message); - } + waitForRepair(); return success; } - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws InterruptedException + public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception { cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies); + waitForRepair(); + return success; + } + + private void waitForRepair() throws Exception + { if (cmd > 0) { condition.await(); @@ -1021,7 +1025,10 @@ class RepairRunner implements NotificationListener String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); out.println(message); } - return success; + if (error != null) + { + throw error; + } } public void handleNotification(Notification notification, Object handback) @@ -1041,5 +1048,23 @@ class RepairRunner implements NotificationListener condition.signalAll(); } } + else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType())) + { + String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s", + format.format(notification.getTimeStamp()), + keyspace); + out.println(message); + success = false; + condition.signalAll(); + } + else if (JMXConnectionNotification.FAILED.equals(notification.getType()) + || JMXConnectionNotification.CLOSED.equals(notification.getType())) + { + String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s" + + "(Subsequent keyspaces are not going to be repaired).", + keyspace); + error = new IOException(message); + condition.signalAll(); + } } }