Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 975284cd5 -> be6e6ea66 refs/heads/cassandra-3.0 45d017629 -> 695065e27 refs/heads/cassandra-3.X 47c473ae3 -> 316e1cd7b refs/heads/trunk b9191871c -> a333a2f3b
Fix leak errors and execution rejected exceptions when draining Patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-12457 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be6e6ea6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be6e6ea6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be6e6ea6 Branch: refs/heads/cassandra-2.2 Commit: be6e6ea662b7da556a9e4ba5fd402b7451bdde10 Parents: 975284c Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri Aug 19 12:07:41 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Fri Oct 7 16:49:00 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DebuggableScheduledThreadPoolExecutor.java | 2 +- .../concurrent/ScheduledExecutors.java | 4 - .../db/compaction/CompactionManager.java | 127 +++++++++++-------- .../db/lifecycle/LifecycleTransaction.java | 13 +- .../io/sstable/format/SSTableReader.java | 15 ++- .../apache/cassandra/net/MessagingService.java | 3 +- .../cassandra/service/StorageService.java | 20 +-- .../org/apache/cassandra/utils/ExpiringMap.java | 4 +- 9 files changed, 118 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 97bc70a..54425fa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.9 + * Fix leak errors and execution rejected exceptions when draining (CASSANDRA-12457) * Fix merkle tree depth calculation (CASSANDRA-12580) * Make Collections deserialization more robust (CASSANDRA-12618) http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java index a722b87..ea0715c 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java @@ -54,7 +54,7 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx if (task instanceof Future) ((Future) task).cancel(false); - logger.trace("ScheduledThreadPoolExecutor has shut down as part of C* shutdown"); + logger.debug("ScheduledThreadPoolExecutor has shut down as part of C* shutdown"); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java index 5935669..5962db9 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java +++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java @@ -31,10 +31,6 @@ public class ScheduledExecutors * This executor is used for tasks that can have longer execution times, and usually are non periodic. */ public static final DebuggableScheduledThreadPoolExecutor nonPeriodicTasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks"); - static - { - nonPeriodicTasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - } /** * This executor is used for tasks that do not need to be waited for on shutdown/drain. http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 78fa23c..626bd27 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -165,16 +165,14 @@ public class CompactionManager implements CompactionManagerMBean cfs.keyspace.getName(), cfs.name, cfs.getCompactionStrategy().getName()); - List<Future<?>> futures = new ArrayList<>(); - // we must schedule it at least once, otherwise compaction will stop for a CF until next flush - if (executor.isShutdown()) + + List<Future<?>> futures = new ArrayList<>(1); + Future<?> fut = executor.submitIfRunning(new BackgroundCompactionCandidate(cfs), "background task"); + if (!fut.isCancelled()) { - logger.info("Executor has shut down, not submitting background task"); - return Collections.emptyList(); + compactingCF.add(cfs); + futures.add(fut); } - compactingCF.add(cfs); - futures.add(executor.submit(new BackgroundCompactionCandidate(cfs))); - return futures; } @@ -209,7 +207,8 @@ public class CompactionManager implements CompactionManagerMBean { try { - exec.awaitTermination(1, TimeUnit.MINUTES); + if (!exec.awaitTermination(1, TimeUnit.MINUTES)) + logger.warn("Failed to wait for compaction executors shutdown"); } catch (InterruptedException e) { @@ -286,16 +285,10 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.SUCCESSFUL; } - List<Future<Object>> futures = new ArrayList<>(); + List<Future<?>> futures = new ArrayList<>(); for (final SSTableReader sstable : sstables) { - if (executor.isShutdown()) - { - logger.info("Executor has shut down, not submitting task"); - return AllSSTableOpStatus.ABORTED; - } - final LifecycleTransaction txn = compacting.split(singleton(sstable)); transactions.add(txn); Callable<Object> callable = new Callable<Object>() @@ -307,7 +300,12 @@ public class CompactionManager implements CompactionManagerMBean return this; } }; - futures.add(executor.submit(callable)); + Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation"); + if (!fut.isCancelled()) + futures.add(fut); + else + return AllSSTableOpStatus.ABORTED; + if (jobs > 0 && futures.size() == jobs) { FBUtilities.waitOnFutures(futures); @@ -472,16 +470,18 @@ public class CompactionManager implements CompactionManagerMBean performAnticompaction(cfs, ranges, sstables, modifier, repairedAt); } }; - if (executor.isShutdown()) + + ListenableFuture<?> ret = null; + try { - logger.info("Compaction executor has shut down, not submitting anticompaction"); - sstables.release(); - return Futures.immediateCancelledFuture(); + ret = executor.submitIfRunning(runnable, "anticompaction"); + return ret; + } + finally + { + if (ret == null || ret.isCancelled()) + sstables.release(); } - - ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null); - executor.submit(task); - return task; } /** @@ -599,12 +599,10 @@ public class CompactionManager implements CompactionManagerMBean task.execute(metrics); } }; - if (executor.isShutdown()) - { - logger.info("Compaction executor has shut down, not submitting task"); - return Collections.emptyList(); - } - futures.add(executor.submit(runnable)); + + Future<?> fut = executor.submitIfRunning(runnable, "maximal task"); + if (!fut.isCancelled()) + futures.add(fut); } if (nonEmptyTasks > 1) logger.info("Cannot perform a full major compaction as repaired and unrepaired sstables cannot be compacted together. These two set of sstables will be compacted separately."); @@ -671,13 +669,8 @@ public class CompactionManager implements CompactionManagerMBean } } }; - if (executor.isShutdown()) - { - logger.info("Compaction executor has shut down, not submitting task"); - return Futures.immediateCancelledFuture(); - } - return executor.submit(runnable); + return executor.submitIfRunning(runnable, "user defined task"); } // This acquire a reference on the sstable @@ -695,7 +688,7 @@ public class CompactionManager implements CompactionManagerMBean /** * Does not mutate data, so is not scheduled. */ - public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final Validator validator) + public Future<?> submitValidation(final ColumnFamilyStore cfStore, final Validator validator) { Callable<Object> callable = new Callable<Object>() { @@ -714,7 +707,8 @@ public class CompactionManager implements CompactionManagerMBean return this; } }; - return validationExecutor.submit(callable); + + return validationExecutor.submitIfRunning(callable, "validation"); } /* Used in tests. */ @@ -1344,13 +1338,8 @@ public class CompactionManager implements CompactionManagerMBean } } }; - if (executor.isShutdown()) - { - logger.info("Compaction executor has shut down, not submitting index build"); - return null; - } - return executor.submit(runnable); + return executor.submitIfRunning(runnable, "index build"); } public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer) @@ -1382,12 +1371,8 @@ public class CompactionManager implements CompactionManagerMBean } } }; - if (executor.isShutdown()) - { - logger.info("Executor has shut down, not submitting background task"); - Futures.immediateCancelledFuture(); - } - return executor.submit(runnable); + + return executor.submitIfRunning(runnable, "cache write"); } public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException @@ -1509,6 +1494,46 @@ public class CompactionManager implements CompactionManagerMBean // unmap those segments which could free up a snapshot for successful deletion. SnapshotDeletingTask.rescheduleFailedTasks(); } + + public ListenableFuture<?> submitIfRunning(Runnable task, String name) + { + return submitIfRunning(Executors.callable(task, null), name); + } + + /** + * Submit the task but only if the executor has not been shutdown.If the executor has + * been shutdown, or in case of a rejected execution exception return a cancelled future. + * + * @param task - the task to submit + * @param name - the task name to use in log messages + * + * @return the future that will deliver the task result, or a future that has already been + * cancelled if the task could not be submitted. + */ + public ListenableFuture<?> submitIfRunning(Callable<?> task, String name) + { + if (isShutdown()) + { + logger.info("Executor has been shut down, not submitting {}", name); + return Futures.immediateCancelledFuture(); + } + + try + { + ListenableFutureTask ret = ListenableFutureTask.create(task); + submit(ret); + return ret; + } + catch (RejectedExecutionException ex) + { + if (isShutdown()) + logger.info("Executor has shut down, could not submit {}", name); + else + logger.error("Failed to submit {}", name, ex); + + return Futures.immediateCancelledFuture(); + } + } } private static class ValidationExecutor extends CompactionExecutor http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 59cee50..a95c4a8 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -83,6 +83,12 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional update.clear(); obsolete.clear(); } + + @Override + public String toString() + { + return String.format("[obsolete: %s, update: %s]", obsolete, update); + } } public final Tracker tracker; @@ -150,7 +156,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional { assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit"; - logger.trace("Committing update:{}, obsolete:{}", staged.update, staged.obsolete); + if (logger.isTraceEnabled()) + logger.trace("Committing transaction over {} staged: {}, logged: {}", originals, staged, logged); // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size @@ -168,7 +175,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional public Throwable doAbort(Throwable accumulate) { if (logger.isTraceEnabled()) - logger.trace("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete); + logger.trace("Aborting transaction over {} staged: {}, logged: {}", originals, staged, logged); if (logged.isEmpty() && staged.isEmpty()) return accumulate; @@ -225,7 +232,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional private Throwable checkpoint(Throwable accumulate) { if (logger.isTraceEnabled()) - logger.trace("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete); + logger.trace("Checkpointing staged {}", staged); if (staged.isEmpty()) return accumulate; http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index c303975..fddf058 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -2037,7 +2037,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private DescriptorTypeTidy type; private GlobalTidy global; - private boolean setup; + private volatile boolean setup; void setup(SSTableReader reader, boolean trackHotness) { @@ -2062,6 +2062,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public void tidy() { + if (logger.isTraceEnabled()) + logger.trace("Running instance tidier for {} with setup {}", descriptor, setup); + // don't try to cleanup if the sstablereader was never fully constructed if (!setup) return; @@ -2080,8 +2083,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { public void run() { + if (logger.isTraceEnabled()) + logger.trace("Async instance tidier for {}, before barrier", descriptor); + if (barrier != null) barrier.await(); + + if (logger.isTraceEnabled()) + logger.trace("Async instance tidier for {}, after barrier", descriptor); + if (bf != null) bf.close(); if (summary != null) @@ -2093,6 +2103,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (ifile != null) ifile.close(); typeRef.release(); + + if (logger.isTraceEnabled()) + logger.trace("Async instance tidier for {}, completed", descriptor); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index fb0c9ca..f0e2fbf 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -769,7 +769,8 @@ public final class MessagingService implements MessagingServiceMBean assert !StageManager.getStage(Stage.MUTATION).isShutdown(); // the important part - callbacks.shutdownBlocking(); + if (!callbacks.shutdownBlocking()) + logger.warn("Failed to wait for messaging service callbacks shutdown"); // attempt to humor tests that try to stop and restart MS try http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0b6e851..db86294 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3982,8 +3982,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE FBUtilities.waitOnFuture(f); remainingCFs--; } - // flush the system ones after all the rest are done, just in case flushing modifies any system state - // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. + + BatchlogManager.shutdown(); + + // Interrupt on going compaction and shutdown to prevent further compaction + CompactionManager.instance.forceShutdown(); + + // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state + // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny. + // Flush system tables after stopping the batchlog manager and compactions since they both modify + // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update + // system tables, see SSTableReader.GlobalTidy) flushes.clear(); for (Keyspace keyspace : Keyspace.system()) { @@ -3992,11 +4001,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } FBUtilities.waitOnFutures(flushes); - BatchlogManager.shutdown(); - - // Interrupt on going compaction and shutdown to prevent further compaction - CompactionManager.instance.forceShutdown(); - // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure // there are no segments to replay, so we force the recycling of any remaining (should be at most one) CommitLog.instance.forceRecycleAllSegments(); @@ -4006,7 +4010,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // wait for miscellaneous tasks like sstable and commitlog segment deletion ScheduledExecutors.nonPeriodicTasks.shutdown(); if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); + logger.warn("Failed to wait for non periodic tasks to shutdown"); ColumnFamilyStore.shutdownPostFlushExecutor(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index e7b626c..8359918 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -105,12 +105,12 @@ public class ExpiringMap<K, V> service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS); } - public void shutdownBlocking() + public boolean shutdownBlocking() { service.shutdown(); try { - service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS); + return service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS); } catch (InterruptedException e) {