Author: jbellis Date: Tue Jun 14 21:39:47 2011 New Revision: 1135809 URL: http://svn.apache.org/viewvc?rev=1135809&view=rev Log: add commitlog_total_space_in_mb patch by Patricio Echague; reviewed by jbellis for CASSANDRA-2427
Added: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java Removed: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/Config.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Jun 14 21:39:47 2011 @@ -1,5 +1,6 @@ 1.0-dev * removed binarymemtable (CASSANDRA-2692) + * add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427) 0.8.1 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue Jun 14 21:39:47 2011 @@ -58,6 +58,7 @@ public class Config public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor public Integer memtable_total_space_in_mb; + public Integer commitlog_total_space_in_mb = 4096; public Integer sliced_buffer_size_in_kb = 64; Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jun 14 21:39:47 2011 @@ -1075,4 +1075,9 @@ public class DatabaseDescriptor { return conf.memtable_total_space_in_mb > 0; } + + public static long getTotalCommitlogSpaceInMB() + { + return conf.commitlog_total_space_in_mb; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java Tue Jun 14 21:39:47 2011 @@ -32,27 +32,6 @@ public abstract class AbstractCommitLogE { protected volatile long completedTaskCount = 0; - protected static void registerMBean(Object o) - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(o, new ObjectName("org.apache.cassandra.db:type=Commitlog")); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - /** - * Get the current number of running tasks - */ - public int getActiveCount() - { - return 1; - } - /** * Get the number of completed tasks */ Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java Tue Jun 14 21:39:47 2011 @@ -28,7 +28,7 @@ import java.util.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.WrappedRunnable; -class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService implements ICommitLogExecutorService, BatchCommitLogExecutorServiceMBean +class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService { private final BlockingQueue<CheaterFutureTask> queue; private final Thread appendingThread; @@ -56,7 +56,6 @@ class BatchCommitLogExecutorService exte appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER"); appendingThread.start(); - registerMBean(this); } public long getPendingTasks() @@ -194,4 +193,5 @@ class BatchCommitLogExecutorService exte super.set(v); } } + } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Jun 14 21:39:47 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.db.commitlog; import java.io.*; +import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -27,11 +28,9 @@ import java.util.concurrent.atomic.Atomi import java.util.zip.CRC32; import java.util.zip.Checksum; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.cassandra.db.*; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.net.MessagingService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -49,6 +48,9 @@ import org.apache.cassandra.utils.ByteBu import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; +import javax.management.MBeanServer; +import javax.management.ObjectName; + /* * Commit Log tracks every write operation into the system. The aim * of the commit log is to be able to successfully recover data that was @@ -77,7 +79,7 @@ import org.apache.cassandra.utils.Wrappe * means that either the CF was clean in the old CL or it has been flushed since the * switch in the new.) */ -public class CommitLog +public class CommitLog implements CommitLogMBean { private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; @@ -116,6 +118,16 @@ public class CommitLog executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch ? new BatchCommitLogExecutorService() : new PeriodicCommitLogExecutorService(this); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog")); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } public void resetUnsafe() @@ -480,6 +492,36 @@ public class CommitLog currentSegment().sync(); } + /** + * @return the total size occupied by the commitlog segments expressed in bytes. + */ + public long getSize() + { + long commitlogTotalSize = 0; + + for (CommitLogSegment segment : segments) + { + commitlogTotalSize += segment.length(); + } + + return commitlogTotalSize; + } + + public long getCompletedTasks() + { + return executor.getCompletedTasks(); + } + + public long getPendingTasks() + { + return executor.getPendingTasks(); + } + + public long getTotalCommitlogSize() + { + return getSize(); + } + // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that // without breaking the fragile CheaterFutureTask in BatchCLES. class LogRecordAdder implements Callable, Runnable @@ -501,6 +543,19 @@ public class CommitLog { sync(); segments.add(new CommitLogSegment()); + + // Maintain desired CL size cap + if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024) + { + // Force a flush on all CFs keeping the oldest segment from being removed + CommitLogSegment oldestSegment = segments.peek(); + assert oldestSegment != null; // has to be at least the one we just added + for (Integer dirtyCFId : oldestSegment.cfDirty) + { + String keypace = CFMetaData.getCF(dirtyCFId).left; + Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush(); + } + } } } catch (IOException e) Added: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java?rev=1135809&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java Tue Jun 14 21:39:47 2011 @@ -0,0 +1,78 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.commitlog; + + + +public interface CommitLogMBean +{ + /** + * Get the number of completed tasks + */ + public long getCompletedTasks(); + + /** + * Get the number of tasks waiting to be executed + */ + public long getPendingTasks(); + + /** + * Get the current size used by all the commitlog segments. + */ + public long getTotalCommitlogSize(); +} +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.commitlog; + + + +public interface CommitLogMBean +{ + /** + * Get the number of completed tasks + */ + public long getCompletedTasks(); + + /** + * Get the number of tasks waiting to be executed + */ + public long getPendingTasks(); + + /** + * Get the current size used by all the commitlog segments. + */ + public long getTotalCommitlogSize(); +} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Tue Jun 14 21:39:47 2011 @@ -47,7 +47,7 @@ public class CommitLogSegment private final BufferedRandomAccessFile logWriter; // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment - private Set<Integer> cfDirty = new HashSet<Integer>(); + public final Set<Integer> cfDirty = new HashSet<Integer>(); public CommitLogSegment() { @@ -196,5 +196,4 @@ public class CommitLogSegment { return "CommitLogSegment(" + logWriter.getPath() + ')'; } - } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java Tue Jun 14 21:39:47 2011 @@ -29,8 +29,19 @@ import org.apache.cassandra.concurrent.I /** * Like ExecutorService, but customized for batch and periodic commitlog execution. */ -public interface ICommitLogExecutorService extends IExecutorMBean +public interface ICommitLogExecutorService { + /** + * Get the number of completed tasks + */ + public long getCompletedTasks(); + + /** + * Get the number of tasks waiting to be executed + */ + public long getPendingTasks(); + + public <T> Future<T> submit(Callable<T> task); /** Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java Tue Jun 14 21:39:47 2011 @@ -27,7 +27,7 @@ import java.util.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.WrappedRunnable; -class PeriodicCommitLogExecutorService implements ICommitLogExecutorService, PeriodicCommitLogExecutorServiceMBean +class PeriodicCommitLogExecutorService implements ICommitLogExecutorService { private final BlockingQueue<Runnable> queue; protected volatile long completedTaskCount = 0; @@ -87,7 +87,6 @@ class PeriodicCommitLogExecutorService i } }, "PERIODIC-COMMIT-LOG-SYNCER").start(); - AbstractCommitLogExecutorService.registerMBean(this); } public void add(CommitLog.LogRecordAdder adder) @@ -140,13 +139,9 @@ class PeriodicCommitLogExecutorService i return queue.size(); } - public int getActiveCount() - { - return 1; - } - public long getCompletedTasks() { return completedTaskCount; } + } \ No newline at end of file