Author: jbellis Date: Fri Dec 2 23:30:32 2011 New Revision: 1209779 URL: http://svn.apache.org/viewvc?rev=1209779&view=rev Log: fix commitlog segment recycling patch by Rick Branson; reviewed by jbellis for CASSANDRA-3557
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1209779&r1=1209778&r2=1209779&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Dec 2 23:30:32 2011 @@ -1,6 +1,7 @@ 1.1-dev * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503) - * Recycle commitlog segments for improved performance (CASSANDRA-3411, 3543) + * Recycle commitlog segments for improved performance + (CASSANDRA-3411, 3543, 3557) * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407) * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005) * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1209779&r1=1209778&r2=1209779&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Dec 2 23:30:32 2011 @@ -65,7 +65,7 @@ public class CommitLog implements Commit private final ICommitLogExecutorService executor; - private final CommitLogAllocator allocator; + public final CommitLogAllocator allocator; public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^ @@ -388,7 +388,7 @@ public class CommitLog implements Commit */ public int activeSegments() { - return allocator.activeSegments.size(); + return allocator.getActiveSegments().size(); } /** @@ -427,7 +427,7 @@ public class CommitLog implements Commit // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed // in the arguments. Any segments that become unused after they are marked clean will be // recycled or discarded. - for (Iterator<CommitLogSegment> iter = allocator.activeSegments.iterator(); iter.hasNext(); ) + for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();) { CommitLogSegment segment = iter.next(); segment.markClean(cfId, context); @@ -438,7 +438,6 @@ public class CommitLog implements Commit if (segment.isUnused() && iter.hasNext()) { logger.debug("Commit log segment {} is unused", segment); - iter.remove(); allocator.recycleSegment(segment); } else @@ -477,12 +476,9 @@ public class CommitLog implements Commit */ public void sync() throws IOException { - for (CommitLogSegment segment : allocator.activeSegments) + for (CommitLogSegment segment : allocator.getActiveSegments()) { - if (segment.needsSync()) - { - segment.sync(); - } + segment.sync(); } } @@ -515,12 +511,15 @@ public class CommitLog implements Commit */ public void forceNewSegment() throws ExecutionException, InterruptedException { + logger.debug("Forcing new segment creation"); + Callable<?> task = new Callable() { public Object call() throws IOException { if (activeSegment.position() > 0) activateNextSegment(); + return null; } }; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java?rev=1209779&r1=1209778&r2=1209779&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java Fri Dec 2 23:30:32 2011 @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOError; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -58,7 +60,7 @@ public class CommitLogAllocator private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); /** Active segments, containing unflushed data */ - final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>(); + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>(); /** * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size @@ -113,7 +115,7 @@ public class CommitLogAllocator /** * Fetches an empty segment file. * - * @return the next writeable segment + * @return the next writable segment */ public CommitLogSegment fetchSegment() { @@ -142,6 +144,8 @@ public class CommitLogAllocator */ public void recycleSegment(final CommitLogSegment segment) { + activeSegments.remove(segment); + if (isCapExceeded()) { discardSegment(segment); @@ -152,7 +156,8 @@ public class CommitLogAllocator { public void run() { - segment.recycle(); + CommitLogSegment recycled = segment.recycle(); + internalAddReadySegment(recycled); } }); } @@ -197,11 +202,11 @@ public class CommitLogAllocator private void discardSegment(final CommitLogSegment segment) { size.addAndGet(-CommitLog.SEGMENT_SIZE); + queue.add(new Runnable() { public void run() { - activeSegments.remove(segment); segment.discard(); } }); @@ -253,11 +258,20 @@ public class CommitLogAllocator return segment; } - public boolean isCapExceeded() + /** + * Check to see if the speculative current size exceeds the cap. + * + * @return true if cap is exceeded + */ + private boolean isCapExceeded() { return size.get() > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; } + /** + * Throws a flag that enables the behavior of keeping at least one spare segment + * available at all times. + */ public void enableReserveSegmentCreation() { createReserveSegments = true; @@ -323,5 +337,13 @@ public class CommitLogAllocator { allocationThread.join(); } + + /** + * @return a read-only collection of the active commit log segments + */ + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1209779&r1=1209778&r2=1209779&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Fri Dec 2 23:30:32 2011 @@ -119,6 +119,8 @@ public class CommitLogSegment buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, (long) 0, (long) CommitLog.SEGMENT_SIZE); buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER); buffer.position(0); + + needsSync = true; } catch (IOException e) { @@ -178,13 +180,26 @@ public class CommitLogSegment * * @return a new CommitLogSegment representing the newly reusable segment. */ - public void recycle() + public CommitLogSegment recycle() { // writes an end-of-segment marker at the very beginning of the file and closes it buffer.position(0); buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER); buffer.position(0); - needsSync = true; + + try + { + sync(); + } + catch (IOException e) + { + // This is a best effort thing anyway + logger.warn("I/O error flushing " + this + " " + e); + } + + close(); + + return new CommitLogSegment(getPath()); } /** @@ -253,8 +268,11 @@ public class CommitLogSegment */ public void sync() throws IOException { - buffer.force(); - needsSync = false; + if (needsSync) + { + buffer.force(); + needsSync = false; + } } /** @@ -346,14 +364,6 @@ public class CommitLogSegment } /** - * @return true if this segment file has unflushed writes - */ - public boolean needsSync() - { - return needsSync; - } - - /** * Check to see if a certain ReplayPosition is contained by this segment file. * * @param context the replay position to be checked @@ -384,13 +394,6 @@ public class CommitLogSegment public int position() { - try - { - return (int) logFileAccessor.getFilePointer(); - } - catch (IOException e) - { - throw new IOError(e); - } + return buffer.position(); } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1209779&r1=1209778&r2=1209779&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Fri Dec 2 23:30:32 2011 @@ -18,9 +18,11 @@ package org.apache.cassandra; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.utils.ByteBufferUtil; import com.google.common.base.Charsets; @@ -44,6 +46,8 @@ public class SchemaLoader @BeforeClass public static void loadSchema() { + CommitLog.instance.allocator.enableReserveSegmentCreation(); + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e)