Expose bulk loading progress over JMX patch by Tyler Hobbs; reviewed by Nick Bailey for CASSANDRA-4757
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd26f48c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd26f48c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd26f48c Branch: refs/heads/trunk Commit: cd26f48ce88d7db7c05f01ca6fbe82d087b3b3f7 Parents: 9fb44ee Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Feb 6 10:08:36 2014 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Feb 6 10:08:36 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 26 ++++++++++++++------ .../cassandra/service/StorageServiceMBean.java | 7 ++++++ .../org/apache/cassandra/tools/BulkLoader.java | 6 ++++- .../io/sstable/SSTableScannerTest.java | 8 +++--- 5 files changed, 34 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd26f48c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7ba8044..728c57a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.6 + * Expose bulk loading progress over JMX (CASSANDRA-4757) * Correctly handle null with IF conditions and TTL (CASSANDRA-6623) Merged from 1.2: * Fix partition and range deletes not triggering flush (CASSANDRA-6655) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd26f48c/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c222570..9a6b50f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3661,6 +3661,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void bulkLoad(String directory) { + try + { + bulkLoadInternal(directory).get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public String bulkLoadAsync(String directory) + { + return bulkLoadInternal(directory).planId.toString(); + } + + private StreamResultFuture bulkLoadInternal(String directory) + { File dir = new File(directory); if (!dir.exists() || !dir.isDirectory()) @@ -3693,14 +3710,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE }; SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput()); - try - { - loader.stream().get(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + return loader.stream(); } public int getExceptionCount() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd26f48c/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index d31e8b9..f949dcc 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -439,8 +439,15 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void rebuild(String sourceDc); + /** Starts a bulk load and blocks until it completes. */ public void bulkLoad(String directory); + /** + * Starts a bulk load asynchronously and returns the String representation of the planID for the new + * streaming session. + */ + public String bulkLoadAsync(String directory); + public void rescheduleFailedDeletions(); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd26f48c/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 4756bd3..6c157e2 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -92,7 +92,11 @@ public class BulkLoader System.err.println("Run with --debug to get full stack trace or --help to get help."); System.exit(1); } - future.addEventListener(new ProgressIndicator()); + + handler.output(String.format("Streaming session ID: %s", future.planId)); + if (!options.noProgress) + future.addEventListener(new ProgressIndicator()); + try { future.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd26f48c/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index 67d9d2c..6dca637 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -306,12 +306,10 @@ public class SSTableScannerTest extends SchemaLoader assertScanContainsRanges(fullScanner, 205, 205); // scan three ranges separately - ICompactionScanner scanner = sstable.getScanner(makeRanges( - 101, 109, - 201, 209), - null); + ICompactionScanner scanner = sstable.getScanner(makeRanges(101, 109, + 201, 209), null); - // Test for #6638 bug + // this will currently fail assertScanContainsRanges(scanner, 205, 205); } }