Author: jbellis
Date: Fri Dec  2 23:30:32 2011
New Revision: 1209779

URL: http://svn.apache.org/viewvc?rev=1209779&view=rev
Log:
fix commitlog segment recycling
patch by Rick Branson; reviewed by jbellis for CASSANDRA-3557

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Dec  2 23:30:32 2011
@@ -1,6 +1,7 @@
 1.1-dev
  * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
- * Recycle commitlog segments for improved performance (CASSANDRA-3411, 3543)
+ * Recycle commitlog segments for improved performance 
+   (CASSANDRA-3411, 3543, 3557)
  * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
  * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Fri Dec  2 23:30:32 2011
@@ -65,7 +65,7 @@ public class CommitLog implements Commit
 
     private final ICommitLogExecutorService executor;
 
-    private final CommitLogAllocator allocator;
+    public final CommitLogAllocator allocator;
 
     public static final int END_OF_SEGMENT_MARKER = 0;          // this is 
written out at the end of a segment
     public static final int END_OF_SEGMENT_MARKER_SIZE = 4;     // number of 
bytes of ^^^
@@ -388,7 +388,7 @@ public class CommitLog implements Commit
      */
     public int activeSegments()
     {
-        return allocator.activeSegments.size();
+        return allocator.getActiveSegments().size();
     }
 
     /**
@@ -427,7 +427,7 @@ public class CommitLog implements Commit
                 // flushed CF as clean, until we reach the segment file 
containing the ReplayPosition passed
                 // in the arguments. Any segments that become unused after 
they are marked clean will be
                 // recycled or discarded.
-                for (Iterator<CommitLogSegment> iter = 
allocator.activeSegments.iterator(); iter.hasNext(); )
+                for (Iterator<CommitLogSegment> iter = 
allocator.getActiveSegments().iterator(); iter.hasNext();)
                 {
                     CommitLogSegment segment = iter.next();
                     segment.markClean(cfId, context);
@@ -438,7 +438,6 @@ public class CommitLog implements Commit
                     if (segment.isUnused() && iter.hasNext())
                     {
                         logger.debug("Commit log segment {} is unused", 
segment);
-                        iter.remove();
                         allocator.recycleSegment(segment);
                     }
                     else
@@ -477,12 +476,9 @@ public class CommitLog implements Commit
      */
     public void sync() throws IOException
     {
-        for (CommitLogSegment segment : allocator.activeSegments)
+        for (CommitLogSegment segment : allocator.getActiveSegments())
         {
-            if (segment.needsSync())
-            {
-                segment.sync();
-            }
+            segment.sync();
         }
     }
 
@@ -515,12 +511,15 @@ public class CommitLog implements Commit
      */
     public void forceNewSegment() throws ExecutionException, 
InterruptedException
     {
+        logger.debug("Forcing new segment creation");
+
         Callable<?> task = new Callable()
         {
             public Object call() throws IOException
             {
                 if (activeSegment.position() > 0)
                     activateNextSegment();
+
                 return null;
             }
         };

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
 Fri Dec  2 23:30:32 2011
@@ -22,6 +22,8 @@ import java.io.File;
 
 import java.io.IOError;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -58,7 +60,7 @@ public class CommitLogAllocator
     private final BlockingQueue<Runnable> queue = new 
LinkedBlockingQueue<Runnable>();
 
     /** Active segments, containing unflushed data */
-    final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new 
ConcurrentLinkedQueue<CommitLogSegment>();
+    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new 
ConcurrentLinkedQueue<CommitLogSegment>();
 
     /**
      * Tracks commitlog size, in multiples of the segment size.  We need to do 
this so we can "promise" size
@@ -113,7 +115,7 @@ public class CommitLogAllocator
     /**
      * Fetches an empty segment file.
      *
-     * @return the next writeable segment
+     * @return the next writable segment
      */
     public CommitLogSegment fetchSegment()
     {
@@ -142,6 +144,8 @@ public class CommitLogAllocator
      */
     public void recycleSegment(final CommitLogSegment segment)
     {
+        activeSegments.remove(segment);
+
         if (isCapExceeded())
         {
             discardSegment(segment);
@@ -152,7 +156,8 @@ public class CommitLogAllocator
         {
             public void run()
             {
-                segment.recycle();
+                CommitLogSegment recycled = segment.recycle();
+                internalAddReadySegment(recycled);
             }
         });
     }
@@ -197,11 +202,11 @@ public class CommitLogAllocator
     private void discardSegment(final CommitLogSegment segment)
     {
         size.addAndGet(-CommitLog.SEGMENT_SIZE);
+
         queue.add(new Runnable()
         {
             public void run()
             {
-                activeSegments.remove(segment);
                 segment.discard();
             }
         });
@@ -253,11 +258,20 @@ public class CommitLogAllocator
         return segment;
     }
 
-    public boolean isCapExceeded()
+    /**
+     * Check to see if the speculative current size exceeds the cap.
+     *
+     * @return true if cap is exceeded
+     */
+    private boolean isCapExceeded()
     {
         return size.get() > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 
1024 * 1024;
     }
 
+    /**
+     * Throws a flag that enables the behavior of keeping at least one spare 
segment
+     * available at all times.
+     */
     public void enableReserveSegmentCreation()
     {
         createReserveSegments = true;
@@ -323,5 +337,13 @@ public class CommitLogAllocator
     {
         allocationThread.join(); 
     }
+
+    /**
+     * @return a read-only collection of the active commit log segments
+     */
+    public Collection<CommitLogSegment> getActiveSegments()
+    {
+        return Collections.unmodifiableCollection(activeSegments);
+    }
 }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 Fri Dec  2 23:30:32 2011
@@ -119,6 +119,8 @@ public class CommitLogSegment
             buffer = 
logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, (long) 0, 
(long) CommitLog.SEGMENT_SIZE);
             buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
             buffer.position(0);
+
+            needsSync = true;
         }
         catch (IOException e)
         {
@@ -178,13 +180,26 @@ public class CommitLogSegment
      * 
      * @return a new CommitLogSegment representing the newly reusable segment.
      */
-    public void recycle()
+    public CommitLogSegment recycle()
     {
         // writes an end-of-segment marker at the very beginning of the file 
and closes it
         buffer.position(0);
         buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
         buffer.position(0);
-        needsSync = true;
+
+        try
+        {
+            sync();
+        }
+        catch (IOException e)
+        {
+            // This is a best effort thing anyway
+            logger.warn("I/O error flushing " + this + " " + e);
+        }
+
+        close();
+
+        return new CommitLogSegment(getPath());
     }
 
     /**
@@ -253,8 +268,11 @@ public class CommitLogSegment
      */
     public void sync() throws IOException
     {
-        buffer.force();
-        needsSync = false;
+        if (needsSync)
+        {
+            buffer.force();
+            needsSync = false;
+        }
     }
 
     /**
@@ -346,14 +364,6 @@ public class CommitLogSegment
     }
 
     /**
-     * @return true if this segment file has unflushed writes
-     */
-    public boolean needsSync()
-    {
-        return needsSync;
-    }
-
-    /**
      * Check to see if a certain ReplayPosition is contained by this segment 
file.
      *
      * @param   context the replay position to be checked 
@@ -384,13 +394,6 @@ public class CommitLogSegment
 
     public int position()
     {
-        try
-        {
-            return (int) logFileAccessor.getFilePointer();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        return buffer.position();
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1209779&r1=1209778&r2=1209779&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Fri Dec  2 
23:30:32 2011
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import com.google.common.base.Charsets;
@@ -44,6 +46,8 @@ public class SchemaLoader
     @BeforeClass
     public static void loadSchema()
     {
+        CommitLog.instance.allocator.enableReserveSegmentCreation();
+
         Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler()
         {
             public void uncaughtException(Thread t, Throwable e)


Reply via email to