Author: jbellis
Date: Thu Dec  8 20:33:15 2011
New Revision: 1212092

URL: http://svn.apache.org/viewvc?rev=1212092&view=rev
Log:
improve UserInterruptedException encapsulation (and renamed to 
CompactionInterruptedException)
patch by jbellis; reviewed by slebresne for CASSANDRA-3582

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/UserInterruptedException.java
Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Dec  8 20:33:15 2011
@@ -1,6 +1,6 @@
 1.1-dev
  * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
- * add command to stop compactions (CASSANDRA-1740, 3566)
+ * add command to stop compactions (CASSANDRA-1740, 3566, 3582)
  * multithreaded streaming (CASSANDRA-3494)
  * removed in-tree redhat spec (CASSANDRA-3567)
  * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Thu Dec  8 20:33:15 2011
@@ -23,7 +23,6 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.*;
 
-import org.apache.cassandra.db.compaction.UserInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,6 +116,23 @@ public class DebuggableThreadPoolExecuto
 
     public static void logExceptionsAfterExecute(Runnable r, Throwable t)
     {
+        if (t == null)
+            t = extractThrowable(r);
+
+        if (t != null)
+            handleOrLog(t);
+    }
+
+    public static void handleOrLog(Throwable t)
+    {
+        if (Thread.getDefaultUncaughtExceptionHandler() == null)
+            logger.error("Error in ThreadPoolExecutor", t);
+        else
+            
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
 t);
+    }
+
+    public static Throwable extractThrowable(Runnable r)
+    {
         // Check for exceptions wrapped by FutureTask.  We do this by calling 
get(), which will
         // cause it to throw any saved exception.
         //
@@ -138,23 +154,11 @@ public class DebuggableThreadPoolExecuto
             }
             catch (ExecutionException e)
             {
-                Throwable actualException = e.getCause();
-                if (actualException instanceof UserInterruptedException)
-                    logger.info("Task interrupted by user: " + 
actualException);
-                else if (Thread.getDefaultUncaughtExceptionHandler() == null)
-                    logger.error("Error in ThreadPoolExecutor", 
actualException);
-                else
-                    
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
 actualException);
+                return e.getCause();
             }
         }
 
-        // exceptions for non-FutureTask runnables [i.e., added via execute() 
instead of submit()]
-        if (t != null)
-        {
-            if (Thread.getDefaultUncaughtExceptionHandler() == null)
-                logger.error("Error in ThreadPoolExecutor", t);
-            else
-                
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
 t);
-        }
+        return null;
     }
+
 }

Added: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java?rev=1212092&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
 Thu Dec  8 20:33:15 2011
@@ -0,0 +1,11 @@
+package org.apache.cassandra.db.compaction;
+
+public class CompactionInterruptedException extends RuntimeException
+{
+    private static final long serialVersionUID = -8651427062512310398L;
+
+    public CompactionInterruptedException(CompactionInfo info)
+    {
+        super("Compaction interrupted: " + info);
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Thu Dec  8 20:33:15 2011
@@ -488,7 +488,7 @@ public class CompactionManager implement
             while (!dataFile.isEOF())
             {
                 if (scrubInfo.isStopped())
-                    throw new 
UserInterruptedException(scrubInfo.getCompactionInfo());
+                    throw new 
CompactionInterruptedException(scrubInfo.getCompactionInfo());
                 long rowStart = dataFile.getFilePointer();
                 if (logger.isDebugEnabled())
                     logger.debug("Reading row at " + rowStart);
@@ -696,7 +696,7 @@ public class CompactionManager implement
                 while (scanner.hasNext())
                 {
                     if (ci.isStopped())
-                        throw new 
UserInterruptedException(ci.getCompactionInfo());
+                        throw new 
CompactionInterruptedException(ci.getCompactionInfo());
                     SSTableIdentityIterator row = (SSTableIdentityIterator) 
scanner.next();
                     if (Range.isInRanges(row.getKey().token, ranges))
                     {
@@ -827,7 +827,7 @@ public class CompactionManager implement
             while (nni.hasNext())
             {
                 if (ci.isStopped())
-                    throw new UserInterruptedException(ci.getCompactionInfo());
+                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
                 AbstractCompactedRow row = nni.next();
                 validator.add(row);
             }
@@ -975,7 +975,7 @@ public class CompactionManager implement
         return CompactionExecutor.compactions.size();
     }
 
-    private static class CompactionExecutor extends 
DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector
+    private static class CompactionExecutor extends ThreadPoolExecutor 
implements CompactionExecutorStatsCollector
     {
         // a synchronized identity set of running tasks to their compaction 
info
         private static final Set<CompactionInfo.Holder> compactions = 
Collections.synchronizedSet(Collections.newSetFromMap(new 
IdentityHashMap<CompactionInfo.Holder, Boolean>()));
@@ -983,6 +983,7 @@ public class CompactionManager implement
         protected CompactionExecutor(int minThreads, int maxThreads, String 
name, BlockingQueue<Runnable> queue)
         {
             super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new 
NamedThreadFactory(name, Thread.MIN_PRIORITY));
+            allowCoreThreadTimeOut(true);
         }
 
         private CompactionExecutor(int threadCount, String name)
@@ -1009,6 +1010,29 @@ public class CompactionManager implement
         {
             return new ArrayList<CompactionInfo.Holder>(compactions);
         }
+
+        // modified from DebuggableThreadPoolExecutor so that 
CompactionInterruptedExceptions are not logged
+        @Override
+        public void afterExecute(Runnable r, Throwable t)
+        {
+            super.afterExecute(r,t);
+
+            if (t == null)
+                t = DebuggableThreadPoolExecutor.extractThrowable(r);
+
+            if (t != null)
+            {
+                if (t instanceof CompactionInterruptedException)
+                {
+                    logger.info(t.getMessage());
+                    logger.debug("Full interruption stack trace:", t);
+                }
+                else
+                {
+                    DebuggableThreadPoolExecutor.handleOrLog(t);
+                }
+            }
+        }
     }
 
     private static class ValidationExecutor extends CompactionExecutor

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
Thu Dec  8 20:33:15 2011
@@ -153,7 +153,7 @@ public class CompactionTask extends Abst
             while (nni.hasNext())
             {
                 if (ci.isStopped())
-                    throw new UserInterruptedException(ci.getCompactionInfo());
+                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 
                 AbstractCompactedRow row = nni.next();
                 if (row.isEmpty())

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
 Thu Dec  8 20:33:15 2011
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.compaction.UserInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 
 /**
@@ -60,7 +60,7 @@ public class SecondaryIndexBuilder exten
         while (iter.hasNext())
         {
             if (isStopped())
-                throw new UserInterruptedException(getCompactionInfo());
+                throw new CompactionInterruptedException(getCompactionInfo());
             DecoratedKey<?> key = iter.next();
             Table.indexRow(key, cfs, columns);
         }


Reply via email to