Author: jbellis
Date: Tue Jun 14 21:39:47 2011
New Revision: 1135809

URL: http://svn.apache.org/viewvc?rev=1135809&view=rev
Log:
add commitlog_total_space_in_mb
patch by Patricio Echague; reviewed by jbellis for CASSANDRA-2427

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jun 14 21:39:47 2011
@@ -1,5 +1,6 @@
 1.0-dev
  * removed binarymemtable (CASSANDRA-2692)
+ * add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
 
 
 0.8.1

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue Jun 14 
21:39:47 2011
@@ -58,6 +58,7 @@ public class Config
     
     public Integer memtable_flush_writers = null; // will get set to the 
length of data dirs in DatabaseDescriptor
     public Integer memtable_total_space_in_mb;
+    public Integer commitlog_total_space_in_mb = 4096;
 
     public Integer sliced_buffer_size_in_kb = 64;
     

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Tue Jun 14 21:39:47 2011
@@ -1075,4 +1075,9 @@ public class DatabaseDescriptor
     {
         return conf.memtable_total_space_in_mb > 0;
     }
+
+    public static long getTotalCommitlogSpaceInMB()
+    {
+        return conf.commitlog_total_space_in_mb;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
 Tue Jun 14 21:39:47 2011
@@ -32,27 +32,6 @@ public abstract class AbstractCommitLogE
 {
     protected volatile long completedTaskCount = 0;
 
-    protected static void registerMBean(Object o)
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(o, new 
ObjectName("org.apache.cassandra.db:type=Commitlog"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Get the current number of running tasks
-     */
-    public int getActiveCount()
-    {
-        return 1;
-    }
-
     /**
      * Get the number of completed tasks
      */

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
 Tue Jun 14 21:39:47 2011
@@ -28,7 +28,7 @@ import java.util.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService 
implements ICommitLogExecutorService, BatchCommitLogExecutorServiceMBean
+class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
 {
     private final BlockingQueue<CheaterFutureTask> queue;
     private final Thread appendingThread;
@@ -56,7 +56,6 @@ class BatchCommitLogExecutorService exte
         appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
         appendingThread.start();
 
-        registerMBean(this);
     }
 
     public long getPendingTasks()
@@ -194,4 +193,5 @@ class BatchCommitLogExecutorService exte
             super.set(v);
         }
     }
+
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Tue Jun 14 21:39:47 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
+import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -27,11 +28,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -49,6 +48,9 @@ import org.apache.cassandra.utils.ByteBu
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 /*
  * Commit Log tracks every write operation into the system. The aim
  * of the commit log is to be able to successfully recover data that was
@@ -77,7 +79,7 @@ import org.apache.cassandra.utils.Wrappe
  * means that either the CF was clean in the old CL or it has been flushed 
since the
  * switch in the new.)
  */
-public class CommitLog
+public class CommitLog implements CommitLogMBean
 {
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     
@@ -116,6 +118,16 @@ public class CommitLog
         executor = DatabaseDescriptor.getCommitLogSync() == 
Config.CommitLogSync.batch
                  ? new BatchCommitLogExecutorService()
                  : new PeriodicCommitLogExecutorService(this);
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=Commitlog"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public void resetUnsafe()
@@ -480,6 +492,36 @@ public class CommitLog
         currentSegment().sync();
     }
 
+    /**
+     * @return the total size occupied by the commitlog segments expressed in 
bytes.
+     */
+    public long getSize()
+    {
+        long commitlogTotalSize = 0;
+
+        for (CommitLogSegment segment : segments)
+        {
+            commitlogTotalSize += segment.length();
+        }
+
+        return commitlogTotalSize;
+    }
+
+    public long getCompletedTasks()
+    {
+        return executor.getCompletedTasks();
+    }
+
+    public long getPendingTasks()
+    {
+        return executor.getPendingTasks();
+    }
+
+    public long getTotalCommitlogSize()
+    {
+        return getSize();
+    }
+
     // TODO this should be a Runnable since it doesn't actually return 
anything, but it's difficult to do that
     // without breaking the fragile CheaterFutureTask in BatchCLES.
     class LogRecordAdder implements Callable, Runnable
@@ -501,6 +543,19 @@ public class CommitLog
                 {
                     sync();
                     segments.add(new CommitLogSegment());
+
+                    // Maintain desired CL size cap
+                    if (getSize() >= 
DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024)
+                    {
+                        // Force a flush on all CFs keeping the oldest segment 
from being removed
+                        CommitLogSegment oldestSegment = segments.peek();
+                        assert oldestSegment != null; // has to be at least 
the one we just added
+                        for (Integer dirtyCFId : oldestSegment.cfDirty)
+                        {
+                            String keypace = CFMetaData.getCF(dirtyCFId).left;
+                            
Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush();
+                        }
+                    }
                 }
             }
             catch (IOException e)

Added: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java?rev=1135809&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java 
(added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java 
Tue Jun 14 21:39:47 2011
@@ -0,0 +1,78 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+
+
+public interface CommitLogMBean
+{
+    /**
+     * Get the number of completed tasks
+     */
+    public long getCompletedTasks();
+
+    /**
+     * Get the number of tasks waiting to be executed
+     */
+    public long getPendingTasks();
+
+    /**
+     * Get the current size used by all the commitlog segments.
+     */
+    public long getTotalCommitlogSize();
+}
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+
+
+public interface CommitLogMBean
+{
+    /**
+     * Get the number of completed tasks
+     */
+    public long getCompletedTasks();
+
+    /**
+     * Get the number of tasks waiting to be executed
+     */
+    public long getPendingTasks();
+
+    /**
+     * Get the current size used by all the commitlog segments.
+     */
+    public long getTotalCommitlogSize();
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 Tue Jun 14 21:39:47 2011
@@ -47,7 +47,7 @@ public class CommitLogSegment
     private final BufferedRandomAccessFile logWriter;
 
     // cache which cf is dirty in this segment to avoid having to lookup all 
ReplayPositions to decide if we could delete this segment
-    private Set<Integer> cfDirty = new HashSet<Integer>();
+    public final Set<Integer> cfDirty = new HashSet<Integer>();
 
     public CommitLogSegment()
     {
@@ -196,5 +196,4 @@ public class CommitLogSegment
     {
         return "CommitLogSegment(" + logWriter.getPath() + ')';
     }
-
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
 Tue Jun 14 21:39:47 2011
@@ -29,8 +29,19 @@ import org.apache.cassandra.concurrent.I
 /**
  * Like ExecutorService, but customized for batch and periodic commitlog 
execution.
  */
-public interface ICommitLogExecutorService extends IExecutorMBean
+public interface ICommitLogExecutorService
 {
+    /**
+     * Get the number of completed tasks
+     */
+    public long getCompletedTasks();
+
+    /**
+     * Get the number of tasks waiting to be executed
+     */
+    public long getPendingTasks();
+
+
     public <T> Future<T> submit(Callable<T> task);
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 Tue Jun 14 21:39:47 2011
@@ -27,7 +27,7 @@ import java.util.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-class PeriodicCommitLogExecutorService implements ICommitLogExecutorService, 
PeriodicCommitLogExecutorServiceMBean
+class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 {
     private final BlockingQueue<Runnable> queue;
     protected volatile long completedTaskCount = 0;
@@ -87,7 +87,6 @@ class PeriodicCommitLogExecutorService i
             }
         }, "PERIODIC-COMMIT-LOG-SYNCER").start();
 
-        AbstractCommitLogExecutorService.registerMBean(this);
     }
 
     public void add(CommitLog.LogRecordAdder adder)
@@ -140,13 +139,9 @@ class PeriodicCommitLogExecutorService i
         return queue.size();
     }
 
-    public int getActiveCount()
-    {
-        return 1;
-    }
-
     public long getCompletedTasks()
     {
         return completedTaskCount;
     }
+
 }
\ No newline at end of file


Reply via email to