Author: brandonwilliams Date: Tue Oct 4 21:20:41 2011 New Revision: 1178957
URL: http://svn.apache.org/viewvc?rev=1178957&view=rev Log: Nodetool closes JMX connections to avoid leaking timer threads. Patch by vijay, reviewed by brandonwilliams for CASSANDRA-3309 Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java cassandra/branches/cassandra-1.0.0/CHANGES.txt cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/tools/NodeCmd.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1178957&r1=1178956&r2=1178957&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Oct 4 21:20:41 2011 @@ -21,6 +21,7 @@ * Fix bug where the failure detector can take too long to mark a host down (CASSANDRA-3273) * Fix stress COUNTER_GET option (CASSANDRA-3301) + * Nodetool no longer leaks threads and closes JMX connections (CASSANDRA-3309) 0.8.6 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1178957&r1=1178956&r2=1178957&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Oct 4 21:20:41 2011 @@ -585,132 +585,145 @@ public class NodeCmd { err(ioe, "Error connection to remote JMX agent!"); } - - NodeCommand command = null; - try { - command = cmd.getCommand(); - } - catch (IllegalArgumentException e) - { - badUse(e.getMessage()); - } - + NodeCommand command = null; - NodeCmd nodeCmd = new NodeCmd(probe); - - // Execute the requested command. - String[] arguments = cmd.getCommandArguments(); + try + { + command = cmd.getCommand(); + } + catch (IllegalArgumentException e) + { + badUse(e.getMessage()); + } - switch (command) - { - case RING : nodeCmd.printRing(System.out); break; - case INFO : nodeCmd.printInfo(System.out); break; - case CFSTATS : nodeCmd.printColumnFamilyStats(System.out); break; - case DECOMMISSION : probe.decommission(); break; - case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break; - case VERSION : nodeCmd.printReleaseVersion(System.out); break; - case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break; - case DISABLEGOSSIP : probe.stopGossiping(); break; - case ENABLEGOSSIP : probe.startGossiping(); break; - case DISABLETHRIFT : probe.stopThriftServer(); break; - case ENABLETHRIFT : probe.startThriftServer(); break; - case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; - case DRAIN : - try { probe.drain(); } - catch (ExecutionException ee) { err(ee, "Error occured during flushing"); } - break; + NodeCmd nodeCmd = new NodeCmd(probe); - case NETSTATS : - if (arguments.length > 0) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[0]), System.out); } - else { nodeCmd.printNetworkStats(null, System.out); } - break; + // Execute the requested command. + String[] arguments = cmd.getCommandArguments(); - case SNAPSHOT : - case CLEARSNAPSHOT : - String tag = cmd.getOptionValue(TAG_OPT.left); - handleSnapshots(command, tag, arguments, probe); - break; + switch (command) + { + case RING : nodeCmd.printRing(System.out); break; + case INFO : nodeCmd.printInfo(System.out); break; + case CFSTATS : nodeCmd.printColumnFamilyStats(System.out); break; + case DECOMMISSION : probe.decommission(); break; + case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break; + case VERSION : nodeCmd.printReleaseVersion(System.out); break; + case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break; + case DISABLEGOSSIP : probe.stopGossiping(); break; + case ENABLEGOSSIP : probe.startGossiping(); break; + case DISABLETHRIFT : probe.stopThriftServer(); break; + case ENABLETHRIFT : probe.startThriftServer(); break; + case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; + + case DRAIN : + try { probe.drain(); } + catch (ExecutionException ee) { err(ee, "Error occured during flushing"); } + break; - case MOVE : - if (arguments.length != 1) { badUse("Missing token argument for move."); } - probe.move(arguments[0]); - break; + case NETSTATS : + if (arguments.length > 0) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[0]), System.out); } + else { nodeCmd.printNetworkStats(null, System.out); } + break; - case JOIN: - if (probe.isJoined()) - { - System.err.println("This node has already joined the ring."); - System.exit(1); - } + case SNAPSHOT : + case CLEARSNAPSHOT : + String tag = cmd.getOptionValue(TAG_OPT.left); + handleSnapshots(command, tag, arguments, probe); + break; - probe.joinRing(); - break; + case MOVE : + if (arguments.length != 1) { badUse("Missing token argument for move."); } + probe.move(arguments[0]); + break; - case SETCOMPACTIONTHROUGHPUT : - if (arguments.length != 1) { badUse("Missing value argument."); } - probe.setCompactionThroughput(Integer.valueOf(arguments[0])); - break; + case JOIN: + if (probe.isJoined()) + { + System.err.println("This node has already joined the ring."); + System.exit(1); + } - case REMOVETOKEN : - if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } - else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } - else if (arguments[0].equals("force")) { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); } - else { probe.removeToken(arguments[0]); } - break; + probe.joinRing(); + break; - case CLEANUP : - case COMPACT : - case REPAIR : - case FLUSH : - case SCRUB : - case INVALIDATEKEYCACHE : - case INVALIDATEROWCACHE : - optionalKSandCFs(command, arguments, probe); - break; + case SETCOMPACTIONTHROUGHPUT : + if (arguments.length != 1) { badUse("Missing value argument."); } + probe.setCompactionThroughput(Integer.valueOf(arguments[0])); + break; - case GETCOMPACTIONTHRESHOLD : - if (arguments.length != 2) { badUse("getcompactionthreshold requires ks and cf args."); } - probe.getCompactionThreshold(System.out, arguments[0], arguments[1]); - break; + case REMOVETOKEN : + if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } + else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } + else if (arguments[0].equals("force")) { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); } + else { probe.removeToken(arguments[0]); } + break; - case CFHISTOGRAMS : - if (arguments.length != 2) { badUse("cfhistograms requires ks and cf args"); } - nodeCmd.printCfHistograms(arguments[0], arguments[1], System.out); - break; + case CLEANUP : + case COMPACT : + case REPAIR : + case FLUSH : + case SCRUB : + case INVALIDATEKEYCACHE : + case INVALIDATEROWCACHE : + optionalKSandCFs(command, arguments, probe); + break; - case SETCACHECAPACITY : - if (arguments.length != 4) { badUse("setcachecapacity requires ks, cf, keycachecap, and rowcachecap args."); } - probe.setCacheCapacities(arguments[0], arguments[1], Integer.parseInt(arguments[2]), Integer.parseInt(arguments[3])); - break; + case GETCOMPACTIONTHRESHOLD : + if (arguments.length != 2) { badUse("getcompactionthreshold requires ks and cf args."); } + probe.getCompactionThreshold(System.out, arguments[0], arguments[1]); + break; - case SETCOMPACTIONTHRESHOLD : - if (arguments.length != 4) { badUse("setcompactionthreshold requires ks, cf, min, and max threshold args."); } - int minthreshold = Integer.parseInt(arguments[2]); - int maxthreshold = Integer.parseInt(arguments[3]); - if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds must be positive integers"); } - if (minthreshold > maxthreshold) { badUse("Min threshold cannot be greater than max."); } - if (minthreshold < 2 && maxthreshold != 0) { badUse("Min threshold must be at least 2"); } - probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold); - break; + case CFHISTOGRAMS : + if (arguments.length != 2) { badUse("cfhistograms requires ks and cf args"); } + nodeCmd.printCfHistograms(arguments[0], arguments[1], System.out); + break; - case GETENDPOINTS : - if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); } - nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); - break; + case SETCACHECAPACITY : + if (arguments.length != 4) { badUse("setcachecapacity requires ks, cf, keycachecap, and rowcachecap args."); } + probe.setCacheCapacities(arguments[0], arguments[1], Integer.parseInt(arguments[2]), Integer.parseInt(arguments[3])); + break; - case REFRESH: - if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); } - probe.loadNewSSTables(arguments[0], arguments[1]); - break; + case SETCOMPACTIONTHRESHOLD : + if (arguments.length != 4) { badUse("setcompactionthreshold requires ks, cf, min, and max threshold args."); } + int minthreshold = Integer.parseInt(arguments[2]); + int maxthreshold = Integer.parseInt(arguments[3]); + if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds must be positive integers"); } + if (minthreshold > maxthreshold) { badUse("Min threshold cannot be greater than max."); } + if (minthreshold < 2 && maxthreshold != 0) { badUse("Min threshold must be at least 2"); } + probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold); + break; - default : - throw new RuntimeException("Unreachable code."); + case GETENDPOINTS : + if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); } + nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); + break; + case REFRESH: + if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); } + probe.loadNewSSTables(arguments[0], arguments[1]); + break; + default : + throw new RuntimeException("Unreachable code."); + } + } + finally + { + if (probe != null) + { + try + { + probe.close(); + } + catch (IOException ex) + { + // swallow the exception so the user will see the real one. + } + } } - System.exit(0); } Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1178957&r1=1178956&r2=1178957&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Tue Oct 4 21:20:41 2011 @@ -15,6 +15,7 @@ (CASSANDRA-3295) * Fix missing fields in CLI `show schema` output (CASSANDRA-3304) * Fix broken CompressedRandomAccessReaderTest (CASSANDRA-3298) + * Nodetool no longer leaks threads and closes JMX connections (CASSANDRA-3309) 1.0.0-rc2 * Log a meaningful warning when a node receives a message for a repair session Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1178957&r1=1178956&r2=1178957&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Oct 4 21:20:41 2011 @@ -588,134 +588,148 @@ public class NodeCmd { err(ioe, "Error connection to remote JMX agent!"); } - - NodeCommand command = null; - try { - command = cmd.getCommand(); - } - catch (IllegalArgumentException e) - { - badUse(e.getMessage()); - } - + NodeCommand command = null; - NodeCmd nodeCmd = new NodeCmd(probe); + try + { + command = cmd.getCommand(); + } + catch (IllegalArgumentException e) + { + badUse(e.getMessage()); + } - // Execute the requested command. - String[] arguments = cmd.getCommandArguments(); - switch (command) - { - case RING : nodeCmd.printRing(System.out); break; - case INFO : nodeCmd.printInfo(System.out); break; - case CFSTATS : nodeCmd.printColumnFamilyStats(System.out); break; - case DECOMMISSION : probe.decommission(); break; - case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break; - case VERSION : nodeCmd.printReleaseVersion(System.out); break; - case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break; - case DISABLEGOSSIP : probe.stopGossiping(); break; - case ENABLEGOSSIP : probe.startGossiping(); break; - case DISABLETHRIFT : probe.stopThriftServer(); break; - case ENABLETHRIFT : probe.startThriftServer(); break; - case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; + NodeCmd nodeCmd = new NodeCmd(probe); - case DRAIN : - try { probe.drain(); } - catch (ExecutionException ee) { err(ee, "Error occured during flushing"); } - break; + // Execute the requested command. + String[] arguments = cmd.getCommandArguments(); - case NETSTATS : - if (arguments.length > 0) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[0]), System.out); } - else { nodeCmd.printNetworkStats(null, System.out); } - break; + switch (command) + { + case RING : nodeCmd.printRing(System.out); break; + case INFO : nodeCmd.printInfo(System.out); break; + case CFSTATS : nodeCmd.printColumnFamilyStats(System.out); break; + case DECOMMISSION : probe.decommission(); break; + case TPSTATS : nodeCmd.printThreadPoolStats(System.out); break; + case VERSION : nodeCmd.printReleaseVersion(System.out); break; + case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break; + case DISABLEGOSSIP : probe.stopGossiping(); break; + case ENABLEGOSSIP : probe.startGossiping(); break; + case DISABLETHRIFT : probe.stopThriftServer(); break; + case ENABLETHRIFT : probe.startThriftServer(); break; + case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; + + case DRAIN : + try { probe.drain(); } + catch (ExecutionException ee) { err(ee, "Error occured during flushing"); } + break; - case SNAPSHOT : - case CLEARSNAPSHOT : - String tag = cmd.getOptionValue(TAG_OPT.left); - handleSnapshots(command, tag, arguments, probe); - break; + case NETSTATS : + if (arguments.length > 0) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[0]), System.out); } + else { nodeCmd.printNetworkStats(null, System.out); } + break; - case MOVE : - if (arguments.length != 1) { badUse("Missing token argument for move."); } - probe.move(arguments[0]); - break; + case SNAPSHOT : + case CLEARSNAPSHOT : + String tag = cmd.getOptionValue(TAG_OPT.left); + handleSnapshots(command, tag, arguments, probe); + break; - case JOIN: - if (probe.isJoined()) - { - System.err.println("This node has already joined the ring."); - System.exit(1); - } + case MOVE : + if (arguments.length != 1) { badUse("Missing token argument for move."); } + probe.move(arguments[0]); + break; - probe.joinRing(); - break; + case JOIN: + if (probe.isJoined()) + { + System.err.println("This node has already joined the ring."); + System.exit(1); + } - case SETCOMPACTIONTHROUGHPUT : - if (arguments.length != 1) { badUse("Missing value argument."); } - probe.setCompactionThroughput(Integer.valueOf(arguments[0])); - break; + probe.joinRing(); + break; - case REMOVETOKEN : - if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } - else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } - else if (arguments[0].equals("force")) { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); } - else { probe.removeToken(arguments[0]); } - break; + case SETCOMPACTIONTHROUGHPUT : + if (arguments.length != 1) { badUse("Missing value argument."); } + probe.setCompactionThroughput(Integer.valueOf(arguments[0])); + break; - case CLEANUP : - case COMPACT : - case REPAIR : - case FLUSH : - case SCRUB : - case INVALIDATEKEYCACHE : - case INVALIDATEROWCACHE : - optionalKSandCFs(command, cmd, arguments, probe); - break; + case REMOVETOKEN : + if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } + else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } + else if (arguments[0].equals("force")) { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); } + else { probe.removeToken(arguments[0]); } + break; - case GETCOMPACTIONTHRESHOLD : - if (arguments.length != 2) { badUse("getcompactionthreshold requires ks and cf args."); } - probe.getCompactionThreshold(System.out, arguments[0], arguments[1]); - break; + case CLEANUP : + case COMPACT : + case REPAIR : + case FLUSH : + case SCRUB : + case INVALIDATEKEYCACHE : + case INVALIDATEROWCACHE : + optionalKSandCFs(command, cmd, arguments, probe); + break; - case CFHISTOGRAMS : - if (arguments.length != 2) { badUse("cfhistograms requires ks and cf args"); } - nodeCmd.printCfHistograms(arguments[0], arguments[1], System.out); - break; + case GETCOMPACTIONTHRESHOLD : + if (arguments.length != 2) { badUse("getcompactionthreshold requires ks and cf args."); } + probe.getCompactionThreshold(System.out, arguments[0], arguments[1]); + break; - case SETCACHECAPACITY : - if (arguments.length != 4) { badUse("setcachecapacity requires ks, cf, keycachecap, and rowcachecap args."); } - probe.setCacheCapacities(arguments[0], arguments[1], Integer.parseInt(arguments[2]), Integer.parseInt(arguments[3])); - break; + case CFHISTOGRAMS : + if (arguments.length != 2) { badUse("cfhistograms requires ks and cf args"); } + nodeCmd.printCfHistograms(arguments[0], arguments[1], System.out); + break; - case SETCOMPACTIONTHRESHOLD : - if (arguments.length != 4) { badUse("setcompactionthreshold requires ks, cf, min, and max threshold args."); } - int minthreshold = Integer.parseInt(arguments[2]); - int maxthreshold = Integer.parseInt(arguments[3]); - if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds must be positive integers"); } - if (minthreshold > maxthreshold) { badUse("Min threshold cannot be greater than max."); } - if (minthreshold < 2 && maxthreshold != 0) { badUse("Min threshold must be at least 2"); } - probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold); - break; + case SETCACHECAPACITY : + if (arguments.length != 4) { badUse("setcachecapacity requires ks, cf, keycachecap, and rowcachecap args."); } + probe.setCacheCapacities(arguments[0], arguments[1], Integer.parseInt(arguments[2]), Integer.parseInt(arguments[3])); + break; - case GETENDPOINTS : - if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); } - nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); - break; + case SETCOMPACTIONTHRESHOLD : + if (arguments.length != 4) { badUse("setcompactionthreshold requires ks, cf, min, and max threshold args."); } + int minthreshold = Integer.parseInt(arguments[2]); + int maxthreshold = Integer.parseInt(arguments[3]); + if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds must be positive integers"); } + if (minthreshold > maxthreshold) { badUse("Min threshold cannot be greater than max."); } + if (minthreshold < 2 && maxthreshold != 0) { badUse("Min threshold must be at least 2"); } + probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold); + break; - case REFRESH: - if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); } - probe.loadNewSSTables(arguments[0], arguments[1]); - break; + case GETENDPOINTS : + if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); } + nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); + break; - case GOSSIPINFO : nodeCmd.printGossipInfo(System.out); break; + case REFRESH: + if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); } + probe.loadNewSSTables(arguments[0], arguments[1]); + break; - default : - throw new RuntimeException("Unreachable code."); + case GOSSIPINFO : nodeCmd.printGossipInfo(System.out); break; + default : + throw new RuntimeException("Unreachable code."); + } + } + finally + { + if (probe != null) + { + try + { + probe.close(); + } + catch (IOException ex) + { + // swallow the exception so the user will see the real one. + } + } } - System.exit(0); }