Fix leak errors and execution rejected exceptions when draining

Patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-12457


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be6e6ea6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be6e6ea6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be6e6ea6

Branch: refs/heads/cassandra-3.0
Commit: be6e6ea662b7da556a9e4ba5fd402b7451bdde10
Parents: 975284c
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Fri Aug 19 12:07:41 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Fri Oct 7 16:49:00 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DebuggableScheduledThreadPoolExecutor.java  |   2 +-
 .../concurrent/ScheduledExecutors.java          |   4 -
 .../db/compaction/CompactionManager.java        | 127 +++++++++++--------
 .../db/lifecycle/LifecycleTransaction.java      |  13 +-
 .../io/sstable/format/SSTableReader.java        |  15 ++-
 .../apache/cassandra/net/MessagingService.java  |   3 +-
 .../cassandra/service/StorageService.java       |  20 +--
 .../org/apache/cassandra/utils/ExpiringMap.java |   4 +-
 9 files changed, 118 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97bc70a..54425fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix leak errors and execution rejected exceptions when draining 
(CASSANDRA-12457)
  * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
 
b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index a722b87..ea0715c 100644
--- 
a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ 
b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -54,7 +54,7 @@ public class DebuggableScheduledThreadPoolExecutor extends 
ScheduledThreadPoolEx
                 if (task instanceof Future)
                     ((Future) task).cancel(false);
 
-                logger.trace("ScheduledThreadPoolExecutor has shut down as 
part of C* shutdown");
+                logger.debug("ScheduledThreadPoolExecutor has shut down as 
part of C* shutdown");
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java 
b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 5935669..5962db9 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -31,10 +31,6 @@ public class ScheduledExecutors
      * This executor is used for tasks that can have longer execution times, 
and usually are non periodic.
      */
     public static final DebuggableScheduledThreadPoolExecutor nonPeriodicTasks 
= new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
-    static
-    {
-        
nonPeriodicTasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-    }
 
     /**
      * This executor is used for tasks that do not need to be waited for on 
shutdown/drain.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 78fa23c..626bd27 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -165,16 +165,14 @@ public class CompactionManager implements 
CompactionManagerMBean
                      cfs.keyspace.getName(),
                      cfs.name,
                      cfs.getCompactionStrategy().getName());
-        List<Future<?>> futures = new ArrayList<>();
-        // we must schedule it at least once, otherwise compaction will stop 
for a CF until next flush
-        if (executor.isShutdown())
+
+        List<Future<?>> futures = new ArrayList<>(1);
+        Future<?> fut = executor.submitIfRunning(new 
BackgroundCompactionCandidate(cfs), "background task");
+        if (!fut.isCancelled())
         {
-            logger.info("Executor has shut down, not submitting background 
task");
-            return Collections.emptyList();
+            compactingCF.add(cfs);
+            futures.add(fut);
         }
-        compactingCF.add(cfs);
-        futures.add(executor.submit(new BackgroundCompactionCandidate(cfs)));
-
         return futures;
     }
 
@@ -209,7 +207,8 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             try
             {
-                exec.awaitTermination(1, TimeUnit.MINUTES);
+                if (!exec.awaitTermination(1, TimeUnit.MINUTES))
+                    logger.warn("Failed to wait for compaction executors 
shutdown");
             }
             catch (InterruptedException e)
             {
@@ -286,16 +285,10 @@ public class CompactionManager implements 
CompactionManagerMBean
                 return AllSSTableOpStatus.SUCCESSFUL;
             }
 
-            List<Future<Object>> futures = new ArrayList<>();
+            List<Future<?>> futures = new ArrayList<>();
 
             for (final SSTableReader sstable : sstables)
             {
-                if (executor.isShutdown())
-                {
-                    logger.info("Executor has shut down, not submitting task");
-                    return AllSSTableOpStatus.ABORTED;
-                }
-
                 final LifecycleTransaction txn = 
compacting.split(singleton(sstable));
                 transactions.add(txn);
                 Callable<Object> callable = new Callable<Object>()
@@ -307,7 +300,12 @@ public class CompactionManager implements 
CompactionManagerMBean
                         return this;
                     }
                 };
-                futures.add(executor.submit(callable));
+                Future<?> fut = executor.submitIfRunning(callable, "paralell 
sstable operation");
+                if (!fut.isCancelled())
+                    futures.add(fut);
+                else
+                    return AllSSTableOpStatus.ABORTED;
+
                 if (jobs > 0 && futures.size() == jobs)
                 {
                     FBUtilities.waitOnFutures(futures);
@@ -472,16 +470,18 @@ public class CompactionManager implements 
CompactionManagerMBean
                 performAnticompaction(cfs, ranges, sstables, modifier, 
repairedAt);
             }
         };
-        if (executor.isShutdown())
+
+        ListenableFuture<?> ret = null;
+        try
         {
-            logger.info("Compaction executor has shut down, not submitting 
anticompaction");
-            sstables.release();
-            return Futures.immediateCancelledFuture();
+            ret = executor.submitIfRunning(runnable, "anticompaction");
+            return ret;
+        }
+        finally
+        {
+            if (ret == null || ret.isCancelled())
+                sstables.release();
         }
-
-        ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, 
null);
-        executor.submit(task);
-        return task;
     }
 
     /**
@@ -599,12 +599,10 @@ public class CompactionManager implements 
CompactionManagerMBean
                     task.execute(metrics);
                 }
             };
-            if (executor.isShutdown())
-            {
-                logger.info("Compaction executor has shut down, not submitting 
task");
-                return Collections.emptyList();
-            }
-            futures.add(executor.submit(runnable));
+
+            Future<?> fut = executor.submitIfRunning(runnable, "maximal task");
+            if (!fut.isCancelled())
+                futures.add(fut);
         }
         if (nonEmptyTasks > 1)
             logger.info("Cannot perform a full major compaction as repaired 
and unrepaired sstables cannot be compacted together. These two set of sstables 
will be compacted separately.");
@@ -671,13 +669,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 }
             }
         };
-        if (executor.isShutdown())
-        {
-            logger.info("Compaction executor has shut down, not submitting 
task");
-            return Futures.immediateCancelledFuture();
-        }
 
-        return executor.submit(runnable);
+        return executor.submitIfRunning(runnable, "user defined task");
     }
 
     // This acquire a reference on the sstable
@@ -695,7 +688,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     /**
      * Does not mutate data, so is not scheduled.
      */
-    public Future<Object> submitValidation(final ColumnFamilyStore cfStore, 
final Validator validator)
+    public Future<?> submitValidation(final ColumnFamilyStore cfStore, final 
Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {
@@ -714,7 +707,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 return this;
             }
         };
-        return validationExecutor.submit(callable);
+
+        return validationExecutor.submitIfRunning(callable, "validation");
     }
 
     /* Used in tests. */
@@ -1344,13 +1338,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 }
             }
         };
-        if (executor.isShutdown())
-        {
-            logger.info("Compaction executor has shut down, not submitting 
index build");
-            return null;
-        }
 
-        return executor.submit(runnable);
+        return executor.submitIfRunning(runnable, "index build");
     }
 
     public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
@@ -1382,12 +1371,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 }
             }
         };
-        if (executor.isShutdown())
-        {
-            logger.info("Executor has shut down, not submitting background 
task");
-            Futures.immediateCancelledFuture();
-        }
-        return executor.submit(runnable);
+
+        return executor.submitIfRunning(runnable, "cache write");
     }
 
     public List<SSTableReader> 
runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws 
IOException
@@ -1509,6 +1494,46 @@ public class CompactionManager implements 
CompactionManagerMBean
             // unmap those segments which could free up a snapshot for 
successful deletion.
             SnapshotDeletingTask.rescheduleFailedTasks();
         }
+
+        public ListenableFuture<?> submitIfRunning(Runnable task, String name)
+        {
+            return submitIfRunning(Executors.callable(task, null), name);
+        }
+
+        /**
+         * Submit the task but only if the executor has not been shutdown.If 
the executor has
+         * been shutdown, or in case of a rejected execution exception return 
a cancelled future.
+         *
+         * @param task - the task to submit
+         * @param name - the task name to use in log messages
+         *
+         * @return the future that will deliver the task result, or a future 
that has already been
+         *         cancelled if the task could not be submitted.
+         */
+        public ListenableFuture<?> submitIfRunning(Callable<?> task, String 
name)
+        {
+            if (isShutdown())
+            {
+                logger.info("Executor has been shut down, not submitting {}", 
name);
+                return Futures.immediateCancelledFuture();
+            }
+
+            try
+            {
+                ListenableFutureTask ret = ListenableFutureTask.create(task);
+                submit(ret);
+                return ret;
+            }
+            catch (RejectedExecutionException ex)
+            {
+                if (isShutdown())
+                    logger.info("Executor has shut down, could not submit {}", 
name);
+                else
+                    logger.error("Failed to submit {}", name, ex);
+
+                return Futures.immediateCancelledFuture();
+            }
+        }
     }
 
     private static class ValidationExecutor extends CompactionExecutor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 59cee50..a95c4a8 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -83,6 +83,12 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
             update.clear();
             obsolete.clear();
         }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[obsolete: %s, update: %s]", obsolete, 
update);
+        }
     }
 
     public final Tracker tracker;
@@ -150,7 +156,8 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
     {
         assert staged.isEmpty() : "must be no actions introduced between 
prepareToCommit and a commit";
 
-        logger.trace("Committing update:{}, obsolete:{}", staged.update, 
staged.obsolete);
+        if (logger.isTraceEnabled())
+            logger.trace("Committing transaction over {} staged: {}, logged: 
{}", originals, staged, logged);
 
         // this is now the point of no return; we cannot safely rollback, so 
we ignore exceptions until we're done
         // we restore state by obsoleting our obsolete files, releasing our 
references to them, and updating our size
@@ -168,7 +175,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
     public Throwable doAbort(Throwable accumulate)
     {
         if (logger.isTraceEnabled())
-            logger.trace("Aborting transaction over {}, with ({},{}) logged 
and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, 
staged.obsolete);
+            logger.trace("Aborting transaction over {} staged: {}, logged: 
{}", originals, staged, logged);
 
         if (logged.isEmpty() && staged.isEmpty())
             return accumulate;
@@ -225,7 +232,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
     private Throwable checkpoint(Throwable accumulate)
     {
         if (logger.isTraceEnabled())
-            logger.trace("Checkpointing update:{}, obsolete:{}", 
staged.update, staged.obsolete);
+            logger.trace("Checkpointing staged {}", staged);
 
         if (staged.isEmpty())
             return accumulate;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c303975..fddf058 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2037,7 +2037,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
         private DescriptorTypeTidy type;
         private GlobalTidy global;
 
-        private boolean setup;
+        private volatile boolean setup;
 
         void setup(SSTableReader reader, boolean trackHotness)
         {
@@ -2062,6 +2062,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
 
         public void tidy()
         {
+            if (logger.isTraceEnabled())
+                logger.trace("Running instance tidier for {} with setup {}", 
descriptor, setup);
+
             // don't try to cleanup if the sstablereader was never fully 
constructed
             if (!setup)
                 return;
@@ -2080,8 +2083,15 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
             {
                 public void run()
                 {
+                    if (logger.isTraceEnabled())
+                        logger.trace("Async instance tidier for {}, before 
barrier", descriptor);
+
                     if (barrier != null)
                         barrier.await();
+
+                    if (logger.isTraceEnabled())
+                        logger.trace("Async instance tidier for {}, after 
barrier", descriptor);
+
                     if (bf != null)
                         bf.close();
                     if (summary != null)
@@ -2093,6 +2103,9 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                     if (ifile != null)
                         ifile.close();
                     typeRef.release();
+
+                    if (logger.isTraceEnabled())
+                        logger.trace("Async instance tidier for {}, 
completed", descriptor);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index fb0c9ca..f0e2fbf 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -769,7 +769,8 @@ public final class MessagingService implements 
MessagingServiceMBean
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
         // the important part
-        callbacks.shutdownBlocking();
+        if (!callbacks.shutdownBlocking())
+            logger.warn("Failed to wait for messaging service callbacks 
shutdown");
 
         // attempt to humor tests that try to stop and restart MS
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0b6e851..db86294 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3982,8 +3982,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             FBUtilities.waitOnFuture(f);
             remainingCFs--;
         }
-        // flush the system ones after all the rest are done, just in case 
flushing modifies any system state
-        // like CASSANDRA-5151. don't bother with progress tracking since 
system data is tiny.
+
+        BatchlogManager.shutdown();
+
+        // Interrupt on going compaction and shutdown to prevent further 
compaction
+        CompactionManager.instance.forceShutdown();
+
+        // Flush the system tables after all other tables are flushed, just in 
case flushing modifies any system state
+        // like CASSANDRA-5151. Don't bother with progress tracking since 
system data is tiny.
+        // Flush system tables after stopping the batchlog manager and 
compactions since they both modify
+        // system tables (for example compactions can obsolete sstables and 
the tidiers in SSTableReader update
+        // system tables, see SSTableReader.GlobalTidy)
         flushes.clear();
         for (Keyspace keyspace : Keyspace.system())
         {
@@ -3992,11 +4001,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
-        BatchlogManager.shutdown();
-
-        // Interrupt on going compaction and shutdown to prevent further 
compaction
-        CompactionManager.instance.forceShutdown();
-
         // whilst we've flushed all the CFs, which will have recycled all 
completed segments, we want to ensure
         // there are no segments to replay, so we force the recycling of any 
remaining (should be at most one)
         CommitLog.instance.forceRecycleAllSegments();
@@ -4006,7 +4010,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // wait for miscellaneous tasks like sstable and commitlog segment 
deletion
         ScheduledExecutors.nonPeriodicTasks.shutdown();
         if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
TimeUnit.MINUTES))
-            logger.warn("Miscellaneous task executor still busy after one 
minute; proceeding with shutdown");
+            logger.warn("Failed to wait for non periodic tasks to shutdown");
 
         ColumnFamilyStore.shutdownPostFlushExecutor();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be6e6ea6/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java 
b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index e7b626c..8359918 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -105,12 +105,12 @@ public class ExpiringMap<K, V>
         service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, 
defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public void shutdownBlocking()
+    public boolean shutdownBlocking()
     {
         service.shutdown();
         try
         {
-            service.awaitTermination(defaultExpiration * 2, 
TimeUnit.MILLISECONDS);
+            return service.awaitTermination(defaultExpiration * 2, 
TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {

Reply via email to