Updated Branches: refs/heads/trunk 35be1cc5a -> dbc8bd85f
use waitOnFuture more often Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbc8bd85 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbc8bd85 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbc8bd85 Branch: refs/heads/trunk Commit: dbc8bd85f23acd78de70e22b50c7168ca338dc2e Parents: 35be1cc Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue May 7 17:13:41 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue May 7 17:13:41 2013 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnFamilyStore.java | 13 +-------- .../commitlog/BatchCommitLogExecutorService.java | 14 +-------- .../apache/cassandra/db/commitlog/CommitLog.java | 14 +-------- .../PeriodicCommitLogExecutorService.java | 15 +--------- .../db/compaction/ParallelCompactionIterable.java | 17 ++--------- .../apache/cassandra/db/index/SecondaryIndex.java | 22 +++------------ .../cassandra/db/index/SecondaryIndexManager.java | 14 +-------- .../cassandra/io/sstable/SSTableDeletingTask.java | 15 ++-------- .../org/apache/cassandra/utils/FBUtilities.java | 4 +- 9 files changed, 21 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bbbb554..832189f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -804,18 +804,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void forceBlockingFlush() { - try - { - forceFlush().get(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + FBUtilities.waitOnFuture(forceFlush()); } public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java index 4434532..340dd24 100644 --- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService @@ -127,18 +128,7 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService public void add(CommitLog.LogRecordAdder adder) { - try - { - submit((Callable)adder).get(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + FBUtilities.waitOnFuture(submit((Callable)adder)); } public void shutdown() http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 88a9706..5677836 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; /* * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to @@ -246,18 +247,7 @@ public class CommitLog implements CommitLogMBean } }; - try - { - executor.submit(task).get(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + FBUtilities.waitOnFuture(executor.submit(task)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java index 94f593e..d8869a3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java @@ -67,19 +67,8 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService { while (run) { - try - { - submit(syncer).get(); - Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + FBUtilities.waitOnFuture(submit(syncer)); + FBUtilities.sleep(DatabaseDescriptor.getCommitLogSyncPeriod()); } } }, "PERIODIC-COMMIT-LOG-SYNCER").start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index c0bce30..1d380f6 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -90,20 +90,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable CompactedRowContainer container = reducer.next(); AbstractCompactedRow compactedRow; - try - { - compactedRow = container.future == null - ? container.row - : new PrecompactedRow(container.key, container.future.get()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + compactedRow = container.future == null + ? container.row + : new PrecompactedRow(container.key, FBUtilities.waitOnFuture(container.future)); return compactedRow; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/index/SecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java index 06633c2..40ff1cc 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java @@ -42,6 +42,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; /** * Abstract base class for different types of secondary indexes. @@ -183,25 +184,10 @@ public abstract class SecondaryIndex Collections.singleton(getIndexName()), new ReducingKeyIterator(sstables)); Future<?> future = CompactionManager.instance.submitIndexBuild(builder); - try - { - future.get(); - forceBlockingFlush(); + FBUtilities.waitOnFuture(future); + forceBlockingFlush(); - setIndexBuilt(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } - finally - { - SSTableReader.releaseReferences(sstables); - } + setIndexBuilt(); logger.info("Index build of " + getIndexName() + " complete"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index f949823..2550d8c 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -35,6 +35,7 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexType; +import org.apache.cassandra.utils.FBUtilities; /** * Manages all the indexes associated with a given CFS @@ -139,18 +140,7 @@ public class SecondaryIndexManager SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables)); Future<?> future = CompactionManager.instance.submitIndexBuild(builder); - try - { - future.get(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + FBUtilities.waitOnFuture(future); flushIndexesBlocking(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java index 2335b7d..23237b8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; public class SSTableDeletingTask implements Runnable { @@ -101,18 +102,8 @@ public class SSTableDeletingTask implements Runnable { } }; - try - { - StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS).get(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - throw new RuntimeException(e); - } + + FBUtilities.waitOnFuture(StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index a7e2775..fbe63d6 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -366,11 +366,11 @@ public class FBUtilities waitOnFuture(f); } - public static void waitOnFuture(Future<?> future) + public static <T> T waitOnFuture(Future<T> future) { try { - future.get(); + return future.get(); } catch (ExecutionException ee) {