Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 4d90573c5 -> e80ede6d3 refs/heads/cassandra-3.0 f791c2690 -> b0eba5f9c refs/heads/cassandra-3.11 bed7fa5ef -> 37d67306a refs/heads/trunk e5f3bb6e5 -> c8d15f04f
Fix compaction and flush exception not captured issue patch by Jay Zhuang; reviewed by marcuse for CASSANDRA-13833 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e80ede6d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e80ede6d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e80ede6d Branch: refs/heads/cassandra-2.2 Commit: e80ede6d393460f22ee2b313d4bac7e3fbbfe893 Parents: 4d90573 Author: Jay Zhuang <jay.zhu...@yahoo.com> Authored: Thu Aug 31 11:07:07 2017 -0700 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Sep 4 15:01:02 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 4 +- .../db/compaction/CompactionManager.java | 4 +- .../db/compaction/CompactionExecutorTest.java | 131 +++++++++++++++++++ 4 files changed, 136 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4e68ddc..03a78fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.11 + * Fix compaction and flush exception not captured (CASSANDRA-13833) * Make BatchlogManagerMBean.forceBatchlogReplay() blocking (CASSANDRA-13809) * Uncaught exceptions in Netty pipeline (CASSANDRA-13649) * Prevent integer overflow on exabyte filesystems (CASSANDRA-13067) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2e52eb2..7e36e11 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -906,9 +906,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean logFlush(); Flush flush = new Flush(false); ListenableFutureTask<Void> flushTask = ListenableFutureTask.create(flush, null); - flushExecutor.submit(flushTask); + flushExecutor.execute(flushTask); ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush); - postFlushExecutor.submit(task); + postFlushExecutor.execute(task); @SuppressWarnings("unchecked") ListenableFuture<ReplayPosition> future = http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d21f1e8..cd50646 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1457,7 +1457,7 @@ public class CompactionManager implements CompactionManagerMBean return CompactionMetrics.getCompactions().size(); } - private static class CompactionExecutor extends JMXEnabledThreadPoolExecutor + static class CompactionExecutor extends JMXEnabledThreadPoolExecutor { protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue) { @@ -1537,7 +1537,7 @@ public class CompactionManager implements CompactionManagerMBean try { ListenableFutureTask ret = ListenableFutureTask.create(task); - submit(ret); + execute(ret); return ret; } catch (RejectedExecutionException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java new file mode 100644 index 0000000..c6feb3f --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class CompactionExecutorTest +{ + static Throwable testTaskThrowable = null; + private static class TestTaskExecutor extends CompactionManager.CompactionExecutor + { + @Override + public void afterExecute(Runnable r, Throwable t) + { + if (t == null) + { + t = DebuggableThreadPoolExecutor.extractThrowable(r); + } + testTaskThrowable = t; + } + @Override + protected void beforeExecute(Thread t, Runnable r) + { + } + } + private CompactionManager.CompactionExecutor executor; + + @Before + public void setup() + { + executor = new TestTaskExecutor(); + } + + @After + public void destroy() throws Exception + { + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + + @Test + public void testFailedRunnable() throws Exception + { + testTaskThrowable = null; + + Future<?> tt = executor.submitIfRunning( + new Runnable() + { + @Override + public void run() + { + assert false : "testFailedRunnable"; + } + }, "compactionExecutorTest" + ); + + while (!tt.isDone()) + Thread.sleep(10); + assertNotNull(testTaskThrowable); + assertEquals(testTaskThrowable.getMessage(), "testFailedRunnable"); + } + + @Test + public void testFailedCallable() throws Exception + { + testTaskThrowable = null; + Future<?> tt = executor.submitIfRunning( + new Callable<Integer>() + { + @Override + public Integer call() throws Exception + { + assert false : "testFailedCallable"; + return 1; + } + } + , "compactionExecutorTest"); + + while (!tt.isDone()) + Thread.sleep(10); + assertNotNull(testTaskThrowable); + assertEquals(testTaskThrowable.getMessage(), "testFailedCallable"); + } + + @Test + public void testExceptionRunnable() throws Exception + { + testTaskThrowable = null; + Future<?> tt = executor.submitIfRunning( + new Runnable() + { + @Override + public void run() + { + throw new RuntimeException("testExceptionRunnable"); + } + } + , "compactionExecutorTest"); + + while (!tt.isDone()) + Thread.sleep(10); + assertNotNull(testTaskThrowable); + assertEquals(testTaskThrowable.getMessage(), "testExceptionRunnable"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org