Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 621899355 -> 351e35b64
Move all hints related tasks to hints internal executor patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-8285 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351e35b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351e35b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351e35b6 Branch: refs/heads/cassandra-2.0 Commit: 351e35b64ac7f364bd4149da399f86b7f36f5ae9 Parents: 6218993 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Dec 19 20:27:32 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Dec 19 20:27:32 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DebuggableScheduledThreadPoolExecutor.java | 5 + .../JMXEnabledScheduledThreadPoolExecutor.java | 137 +++++++++++++++++++ ...EnabledScheduledThreadPoolExecutorMBean.java | 26 ++++ .../cassandra/db/HintedHandOffManager.java | 25 ++-- 5 files changed, 179 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 516b4a2..1ad2de5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.12: + * Move all hints related tasks to hints internal executor (CASSANDRA-8285) * Fix paging for multi-partition IN queries (CASSANDRA-8408) * Fix MOVED_NODE topology event never being emitted when a node moves its token (CASSANDRA-8373) http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java index a41df54..1699c0f 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java @@ -33,6 +33,11 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx super(corePoolSize, new NamedThreadFactory(threadPoolName, priority)); } + public DebuggableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) + { + super(corePoolSize, threadFactory); + } + public DebuggableScheduledThreadPoolExecutor(String threadPoolName) { this(1, threadPoolName, Thread.NORM_PRIORITY); http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java new file mode 100644 index 0000000..64d9267 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +import java.lang.management.ManagementFactory; +import java.util.List; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.cassandra.metrics.ThreadPoolMetrics; + +/** + * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor. + */ +public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean +{ + private final String mbeanName; + private final ThreadPoolMetrics metrics; + + public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath) + { + super(corePoolSize, threadFactory); + + metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id; + + try + { + mbs.registerMBean(this, new ObjectName(mbeanName)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private void unregisterMBean() + { + try + { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + // release metrics + metrics.release(); + } + + @Override + public synchronized void shutdown() + { + // synchronized, because there is no way to access super.mainLock, which would be + // the preferred way to make this threadsafe + if (!isShutdown()) + unregisterMBean(); + + super.shutdown(); + } + + @Override + public synchronized List<Runnable> shutdownNow() + { + // synchronized, because there is no way to access super.mainLock, which would be + // the preferred way to make this threadsafe + if (!isShutdown()) + unregisterMBean(); + + return super.shutdownNow(); + } + + /** + * Get the number of completed tasks + */ + public long getCompletedTasks() + { + return getCompletedTaskCount(); + } + + /** + * Get the number of tasks waiting to be executed + */ + public long getPendingTasks() + { + return getTaskCount() - getCompletedTaskCount(); + } + + public int getTotalBlockedTasks() + { + return (int) metrics.totalBlocked.count(); + } + + public int getCurrentlyBlockedTasks() + { + return (int) metrics.currentBlocked.count(); + } + + public int getCoreThreads() + { + return getCorePoolSize(); + } + + public void setCoreThreads(int number) + { + setCorePoolSize(number); + } + + public int getMaximumThreads() + { + return getMaximumPoolSize(); + } + + public void setMaximumThreads(int number) + { + setMaximumPoolSize(number); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java new file mode 100644 index 0000000..d9c45e3 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +/** + * @see org.apache.cassandra.metrics.ThreadPoolMetrics + */ +@Deprecated +public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean +{ +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 87260b2..c8c3845 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -34,20 +34,16 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -106,12 +102,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>(); - private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(), - Integer.MAX_VALUE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), - "internal"); + private final JMXEnabledScheduledThreadPoolExecutor executor = + new JMXEnabledScheduledThreadPoolExecutor( + DatabaseDescriptor.getMaxHintsThread(), + new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), + "internal"); private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF); @@ -174,7 +169,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean metrics.log(); } }; - StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); + executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) @@ -225,7 +220,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } }; - StorageService.optionalTasks.submit(runnable); + executor.submit(runnable); } //foobar @@ -246,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } }; - StorageService.optionalTasks.submit(runnable).get(); + executor.submit(runnable).get(); }