Author: jbellis Date: Thu Dec 8 20:33:15 2011 New Revision: 1212092 URL: http://svn.apache.org/viewvc?rev=1212092&view=rev Log: improve UserInterruptedException encapsulation (and renamed to CompactionInterruptedException) patch by jbellis; reviewed by slebresne for CASSANDRA-3582
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java Removed: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/UserInterruptedException.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212092&r1=1212091&r2=1212092&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Dec 8 20:33:15 2011 @@ -1,6 +1,6 @@ 1.1-dev * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577) - * add command to stop compactions (CASSANDRA-1740, 3566) + * add command to stop compactions (CASSANDRA-1740, 3566, 3582) * multithreaded streaming (CASSANDRA-3494) * removed in-tree redhat spec (CASSANDRA-3567) * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503) Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1212092&r1=1212091&r2=1212092&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Dec 8 20:33:15 2011 @@ -23,7 +23,6 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.*; -import org.apache.cassandra.db.compaction.UserInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +116,23 @@ public class DebuggableThreadPoolExecuto public static void logExceptionsAfterExecute(Runnable r, Throwable t) { + if (t == null) + t = extractThrowable(r); + + if (t != null) + handleOrLog(t); + } + + public static void handleOrLog(Throwable t) + { + if (Thread.getDefaultUncaughtExceptionHandler() == null) + logger.error("Error in ThreadPoolExecutor", t); + else + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); + } + + public static Throwable extractThrowable(Runnable r) + { // Check for exceptions wrapped by FutureTask. We do this by calling get(), which will // cause it to throw any saved exception. // @@ -138,23 +154,11 @@ public class DebuggableThreadPoolExecuto } catch (ExecutionException e) { - Throwable actualException = e.getCause(); - if (actualException instanceof UserInterruptedException) - logger.info("Task interrupted by user: " + actualException); - else if (Thread.getDefaultUncaughtExceptionHandler() == null) - logger.error("Error in ThreadPoolExecutor", actualException); - else - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), actualException); + return e.getCause(); } } - // exceptions for non-FutureTask runnables [i.e., added via execute() instead of submit()] - if (t != null) - { - if (Thread.getDefaultUncaughtExceptionHandler() == null) - logger.error("Error in ThreadPoolExecutor", t); - else - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); - } + return null; } + } Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java?rev=1212092&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java Thu Dec 8 20:33:15 2011 @@ -0,0 +1,11 @@ +package org.apache.cassandra.db.compaction; + +public class CompactionInterruptedException extends RuntimeException +{ + private static final long serialVersionUID = -8651427062512310398L; + + public CompactionInterruptedException(CompactionInfo info) + { + super("Compaction interrupted: " + info); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1212092&r1=1212091&r2=1212092&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Dec 8 20:33:15 2011 @@ -488,7 +488,7 @@ public class CompactionManager implement while (!dataFile.isEOF()) { if (scrubInfo.isStopped()) - throw new UserInterruptedException(scrubInfo.getCompactionInfo()); + throw new CompactionInterruptedException(scrubInfo.getCompactionInfo()); long rowStart = dataFile.getFilePointer(); if (logger.isDebugEnabled()) logger.debug("Reading row at " + rowStart); @@ -696,7 +696,7 @@ public class CompactionManager implement while (scanner.hasNext()) { if (ci.isStopped()) - throw new UserInterruptedException(ci.getCompactionInfo()); + throw new CompactionInterruptedException(ci.getCompactionInfo()); SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); if (Range.isInRanges(row.getKey().token, ranges)) { @@ -827,7 +827,7 @@ public class CompactionManager implement while (nni.hasNext()) { if (ci.isStopped()) - throw new UserInterruptedException(ci.getCompactionInfo()); + throw new CompactionInterruptedException(ci.getCompactionInfo()); AbstractCompactedRow row = nni.next(); validator.add(row); } @@ -975,7 +975,7 @@ public class CompactionManager implement return CompactionExecutor.compactions.size(); } - private static class CompactionExecutor extends DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector + private static class CompactionExecutor extends ThreadPoolExecutor implements CompactionExecutorStatsCollector { // a synchronized identity set of running tasks to their compaction info private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>())); @@ -983,6 +983,7 @@ public class CompactionManager implement protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue) { super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY)); + allowCoreThreadTimeOut(true); } private CompactionExecutor(int threadCount, String name) @@ -1009,6 +1010,29 @@ public class CompactionManager implement { return new ArrayList<CompactionInfo.Holder>(compactions); } + + // modified from DebuggableThreadPoolExecutor so that CompactionInterruptedExceptions are not logged + @Override + public void afterExecute(Runnable r, Throwable t) + { + super.afterExecute(r,t); + + if (t == null) + t = DebuggableThreadPoolExecutor.extractThrowable(r); + + if (t != null) + { + if (t instanceof CompactionInterruptedException) + { + logger.info(t.getMessage()); + logger.debug("Full interruption stack trace:", t); + } + else + { + DebuggableThreadPoolExecutor.handleOrLog(t); + } + } + } } private static class ValidationExecutor extends CompactionExecutor Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1212092&r1=1212091&r2=1212092&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Dec 8 20:33:15 2011 @@ -153,7 +153,7 @@ public class CompactionTask extends Abst while (nni.hasNext()) { if (ci.isStopped()) - throw new UserInterruptedException(ci.getCompactionInfo()); + throw new CompactionInterruptedException(ci.getCompactionInfo()); AbstractCompactedRow row = nni.next(); if (row.isEmpty()) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java?rev=1212092&r1=1212091&r2=1212092&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java Thu Dec 8 20:33:15 2011 @@ -26,7 +26,7 @@ import org.apache.cassandra.db.Decorated import org.apache.cassandra.db.Table; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.compaction.UserInterruptedException; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; /** @@ -60,7 +60,7 @@ public class SecondaryIndexBuilder exten while (iter.hasNext()) { if (isStopped()) - throw new UserInterruptedException(getCompactionInfo()); + throw new CompactionInterruptedException(getCompactionInfo()); DecoratedKey<?> key = iter.next(); Table.indexRow(key, cfs, columns); }