Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3a1a4fa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3a1a4fa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3a1a4fa Branch: refs/heads/trunk Commit: c3a1a4fa8083b2c4c6a5454551979954a0a71339 Parents: d577918 05cb556 Author: Jason Brown <jasedbr...@gmail.com> Authored: Tue Dec 5 05:07:18 2017 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Dec 5 05:08:22 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 10 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../AbstractCommitLogSegmentManager.java | 8 +- .../db/commitlog/AbstractCommitLogService.java | 84 ++++++++++++++--- .../cassandra/db/commitlog/CommitLog.java | 4 +- .../db/commitlog/CommitLogSegment.java | 85 +++++++++++------ .../db/commitlog/CompressedSegment.java | 4 +- .../db/commitlog/EncryptedSegment.java | 3 - .../db/commitlog/FileDirectSegment.java | 14 +++ .../db/commitlog/MemoryMappedSegment.java | 4 + .../db/commitlog/PeriodicCommitLogService.java | 2 +- .../commitlog/CommitLogChainedMarkersTest.java | 98 ++++++++++++++++++++ .../CommitLogSegmentBackpressureTest.java | 4 +- .../cassandra/db/commitlog/CommitLogTest.java | 10 +- .../db/commitlog/CommitLogTestReplayer.java | 2 +- 17 files changed, 281 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index e26aeb8,2683dc2..7c0af91 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,12 -1,5 +1,13 @@@ -3.0.16 +3.11.2 + * Remove OpenJDK log warning (CASSANDRA-13916) + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079) + * Cache disk boundaries (CASSANDRA-13215) + * Add asm jar to build.xml for maven builds (CASSANDRA-11193) + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) + * Update jackson JSON jars (CASSANDRA-13949) + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930) +Merged from 3.0: + * More frequent commitlog chained markers (CASSANDRA-13987) * Fix serialized size of DataLimits (CASSANDRA-14057) * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975) * Fix SSTableLoader logger message (CASSANDRA-14003) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index a01203c,0796183..5fe752e --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -198,8 -190,9 +198,9 @@@ public class Confi public String commitlog_directory; public Integer commitlog_total_space_in_mb; public CommitLogSync commitlog_sync; - public Double commitlog_sync_batch_window_in_ms; - public Integer commitlog_sync_period_in_ms; - public Integer commitlog_marker_period_in_ms = 0; + public double commitlog_sync_batch_window_in_ms = Double.NaN; + public int commitlog_sync_period_in_ms; ++ public int commitlog_marker_period_in_ms = 0; public int commitlog_segment_size_in_mb = 32; public ParameterizedClass commitlog_compression; public int commitlog_max_compression_buffers_in_pool = 3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 7f3c9f8,0000000..2c324aa mode 100755,000000..100755 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@@ -1,550 -1,0 +1,552 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public abstract class AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); + + /** + * Segment that is ready to be used. The management thread fills this and blocks until consumed. + * + * A single management thread produces this, and consumers are already synchronizing to make sure other work is + * performed atomically with consuming this. Volatile to make sure writes by the management thread become + * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must + * synchronize on 'this'. + */ + private volatile CommitLogSegment availableSegment = null; + + private final WaitQueue segmentPrepared = new WaitQueue(); + + /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); + + /** + * The segment we are currently allocating commit log records to. + * + * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value. + */ + private volatile CommitLogSegment allocatingFrom = null; + + final String storageDirectory; + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + private Thread managerThread; + protected final CommitLog commitLog; + private volatile boolean shutdown; + private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown; + private final WaitQueue managerThreadWaitQueue = new WaitQueue(); + + private static final SimpleCachedBufferPool bufferPool = + new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); + + AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) + { + this.commitLog = commitLog; + this.storageDirectory = storageDirectory; + } + + void start() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (!shutdown) + { + try + { + assert availableSegment == null; + logger.debug("No segments in reserve; creating a fresh one"); + availableSegment = createSegment(); + if (shutdown) + { + // If shutdown() started and finished during segment creation, we are now left with a + // segment that no one will consume. Discard it. + discardAvailableSegment(); + return; + } + + segmentPrepared.signalAll(); + Thread.yield(); + + if (availableSegment == null && !atSegmentBufferLimit()) + // Writing threads need another segment now. + continue; + + // Writing threads are not waiting for new segments, we can spend time on other tasks. + // flush old Cfs if we're full + maybeFlushToReclaim(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // If we offered a segment, wait for it to be taken before reentering the loop. + // There could be a new segment in next not offered, but only on failure to discard it while + // shutting down-- nothing more can or needs to be done in that case. + } + + WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue); + } + } + }; + + shutdown = false; + managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + + // for simplicity, ensure the first segment is allocated before continuing + advanceAllocatingFrom(null); + } + + private boolean atSegmentBufferLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + + private void maybeFlushToReclaim() + { + long unused = unusedCapacity(); + if (unused < 0) + { + long flushingSize = 0; + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + flushingSize += segment.onDiskSize(); + segmentsToRecycle.add(segment); + if (flushingSize + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + } + + + /** + * Allocate a segment within this CLSM. Should either succeed or throw. + */ + public abstract Allocation allocate(Mutation mutation, int size); + + /** + * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM + * decide what to do with those segments on disk after they've been replayed. + */ + abstract void handleReplayedSegment(final File file); + + /** + * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit + * to segment manager so it's performed on segment management thread. + */ + abstract CommitLogSegment createSegment(); + + /** + * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment + * manager so it's performend on segment management thread, or perform while segment management thread is shutdown + * during testing resets. + * + * @param segment segment to be discarded + * @param delete whether or not the segment is safe to be deleted. + */ + abstract void discard(CommitLogSegment segment, boolean delete); + + /** + * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. + * + * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. + */ + @DontInline + void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + synchronized (this) + { + // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments + if (allocatingFrom != old) + return; + + // If a segment is ready, take it now, otherwise wait for the management thread to construct it. + if (availableSegment != null) + { + // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving + // the critical section. + activeSegments.add(allocatingFrom = availableSegment); + availableSegment = null; + break; + } + } + + awaitAvailableSegment(old); + } + + // Signal the management thread to prepare a new segment. + wakeManager(); + + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); + } + + void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) + { + do + { + WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + if (availableSegment == null && allocatingFrom == currentAllocatingFrom) + prepared.awaitUninterruptibly(); + else + prepared.cancel(); + } + while (availableSegment == null && allocatingFrom == currentAllocatingFrom); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * + * Flushes any dirty CFs for this segment and any older segments, and then discards the segments + */ + void forceRecycleAll(Iterable<UUID> droppedCfs) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // to complete + Keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future<?> future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) + segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + { + if (segment.isUnused()) + archiveAndDiscard(segment); + } + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be discarded. + * + * @param segment segment that is no longer in use + */ + void archiveAndDiscard(final CommitLogSegment segment) + { + boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); + if (!activeSegments.remove(segment)) + return; // already discarded + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script"); + discard(segment, archiveSuccess); + } + + /** + * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. + * @param addedSize + */ + void addSize(long addedSize) + { + size.addAndGet(addedSize); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long onDiskSize() + { + return size.get(); + } + + private long unusedCapacity() + { + long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; + long currentSize = size.get(); + logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); + return total - currentSize; + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); + segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + * Only call this after the AbstractCommitLogService is shut down. + */ + public void stopUnsafe(boolean deleteSegments) + { + logger.debug("CLSM closing and clearing existing commit log segments..."); + + shutdown(); + try + { + awaitTermination(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); + + size.set(0L); + + logger.trace("CLSM done with closing and clearing existing commit log segments."); + } + + /** + * To be used by tests only. Not safe if mutation slots are being allocated concurrently. + */ + void awaitManagementTasksCompletion() + { + if (availableSegment == null && !atSegmentBufferLimit()) + { + awaitAvailableSegment(allocatingFrom); + } + } + + /** + * Explicitly for use only during resets in unit testing. + */ + private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) + { + try + { + discard(segment, delete); + } + catch (AssertionError ignored) + { + // segment file does not exist + } + } + + /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + assert !shutdown; + shutdown = true; + + // Release the management thread and delete prepared segment. + // Do not block as another thread may claim the segment (this can happen during unit test initialization). + discardAvailableSegment(); + wakeManager(); + } + + private void discardAvailableSegment() + { + CommitLogSegment next = null; + synchronized (this) + { + next = availableSegment; + availableSegment = null; + } + if (next != null) + next.discard(true); + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + managerThread = null; + + for (CommitLogSegment segment : activeSegments) + segment.close(); + + bufferPool.shutdown(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } + + /** + * @return the current CommitLogPosition of the active segment we're allocating from + */ + CommitLogPosition getCurrentPosition() + { + return allocatingFrom.getCurrentCommitLogPosition(); + } + + /** - * Forces a disk flush on the commit log files that need it. Blocking. ++ * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk. ++ * ++ * @param flush Request that the sync operation flush the file to disk. + */ - public void sync() throws IOException ++ public void sync(boolean flush) throws IOException + { + CommitLogSegment current = allocatingFrom; + for (CommitLogSegment segment : getActiveSegments()) + { + // Do not sync segments that became active after sync started. + if (segment.id > current.id) + return; - segment.sync(); ++ segment.sync(flush); + } + } + + /** + * Used by compressed and encrypted segments to share a buffer pool across the CLSM. + */ + SimpleCachedBufferPool getBufferPool() + { + return bufferPool; + } + + void wakeManager() + { + managerThreadWaitQueue.signalAll(); + } + + /** + * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for + * a buffer to become available. + */ + void notifyBufferFreed() + { + wakeManager(); + } + + /** Read-only access to current segment for subclasses. */ + CommitLogSegment allocatingFrom() + { + return allocatingFrom; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 71100a3,8a03b2f..0410650 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@@ -17,16 -17,9 +17,18 @@@ */ package org.apache.cassandra.db.commitlog; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer.Context; + import org.apache.cassandra.concurrent.NamedThreadFactory; + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.WaitQueue; @@@ -48,7 -47,25 +50,24 @@@ public abstract class AbstractCommitLog final CommitLog commitLog; private final String name; - private final long pollIntervalNanos; + + /** + * The duration between syncs to disk. + */ - private final long syncIntervalMillis; ++ private final long syncIntervalNanos; + + /** + * The duration between updating the chained markers in the the commit log file. This value should be - * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}. ++ * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}. + */ - private final long markerIntervalMillis; ++ private final long markerIntervalNanos; + + /** + * A flag that callers outside of the sync thread can use to signal they want the commitlog segments + * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires + * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}. + */ + private volatile boolean syncRequested; private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class); @@@ -62,15 -90,30 +92,30 @@@ { this.commitLog = commitLog; this.name = name; - this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS); - this.syncIntervalMillis = syncIntervalMillis; ++ this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS); + - // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers ++ // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers + // faster than the sync interval - if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression()) ++ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption()) + markerIntervalMillis = syncIntervalMillis; + + // apply basic bounds checking on the marker interval + if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis) + { + logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval", + markerIntervalMillis, syncIntervalMillis); + markerIntervalMillis = syncIntervalMillis; + } + - this.markerIntervalMillis = markerIntervalMillis; ++ this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS); } // Separated into individual method to ensure relevant objects are constructed before this is started. void start() { - if (pollIntervalNanos < 1) - if (syncIntervalMillis < 1) ++ if (syncIntervalNanos < 1) throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", - pollIntervalNanos * 1e-6)); - syncIntervalMillis * 1e-6)); ++ syncIntervalNanos * 1e-6)); Runnable runnable = new Runnable() { @@@ -82,25 -125,34 +127,33 @@@ int lagCount = 0; int syncCount = 0; - boolean run = true; - while (run) + while (true) { + // always run once after shutdown signalled + boolean shutdownRequested = shutdown; + try { - // always run once after shutdown signalled - run = !shutdown; - // sync and signal - long syncStarted = System.nanoTime(); - // This is a target for Byteman in CommitLogSegmentManagerTest - commitLog.sync(); - lastSyncedAt = syncStarted; - syncComplete.signalAll(); - - long pollStarted = System.currentTimeMillis(); - if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested) ++ long pollStarted = System.nanoTime(); ++ if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested) + { + // in this branch, we want to flush the commit log to disk - commitLog.sync(shutdown, true); ++ commitLog.sync(true); + syncRequested = false; + lastSyncedAt = pollStarted; + syncComplete.signalAll(); + } + else + { + // in this branch, just update the commit log sync headers - commitLog.sync(false, false); ++ commitLog.sync(false); + } // sleep any time we have left before the next one is due - long now = System.currentTimeMillis(); - long sleep = pollStarted + markerIntervalMillis - now; - if (sleep < 0) + long now = System.nanoTime(); - long wakeUpAt = syncStarted + pollIntervalNanos; ++ long wakeUpAt = pollStarted + markerIntervalNanos; + if (wakeUpAt < now) { // if we have lagged noticeably, update our lag counter if (firstLagAt == 0) @@@ -143,7 -200,14 +196,7 @@@ break; // sleep for full poll-interval after an error, so we don't spam the log file - LockSupport.parkNanos(pollIntervalNanos); - try - { - haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } ++ LockSupport.parkNanos(markerIntervalNanos); } } } @@@ -166,11 -229,14 +219,12 @@@ protected abstract void maybeWaitForSync(Allocation alloc); /** - * Sync immediately, but don't block for the sync to cmplete + * Request an additional sync cycle without blocking. */ - public void requestExtraSync() - public WaitQueue.Signal requestExtraSync() ++ void requestExtraSync() { + syncRequested = true; - WaitQueue.Signal signal = syncComplete.register(); - haveWork.release(1); - return signal; + LockSupport.unpark(thread); } public void shutdown() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 750fabc,ff1b712..da29258 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@@ -221,9 -220,15 +221,9 @@@ public class CommitLog implements Commi /** * Forces a disk flush on the commit log files that need it. Blocking. */ - public void sync() throws IOException - public void sync(boolean syncAllSegments, boolean flush) ++ public void sync(boolean flush) throws IOException { - segmentManager.sync(); - CommitLogSegment current = allocator.allocatingFrom(); - for (CommitLogSegment segment : allocator.getActiveSegments()) - { - if (!syncAllSegments && segment.id > current.id) - return; - segment.sync(flush); - } ++ segmentManager.sync(flush); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index a618d0b,8834c8c..7c02892 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@@ -169,26 -170,12 +175,27 @@@ public abstract class CommitLogSegmen } buffer = createBuffer(commitLog); - // write the header - CommitLogDescriptor.writeHeader(buffer, descriptor); + } + + /** + * Deferred writing of the commit log header until subclasses have had a chance to initialize + */ + void writeLogHeader() + { + CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters()); endOfBuffer = buffer.capacity(); - lastSyncedOffset = buffer.position(); + + lastSyncedOffset = lastMarkerOffset = buffer.position(); allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE); + headerWritten = true; + } + + /** + * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}. + */ + protected Map<String, String> additionalHeaderParameters() + { + return Collections.<String, String>emptyMap(); } abstract ByteBuffer createBuffer(CommitLog commitLog); @@@ -291,15 -278,18 +298,20 @@@ } /** - * Forces a disk flush for this segment file. + * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file. + * + * @param flush true if the segment should flush to disk; else, false for just updating the chained markers. */ - synchronized void sync() + synchronized void sync(boolean flush) { + if (!headerWritten) + throw new IllegalStateException("commit log header has not been written"); - boolean close = false; + assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d", + lastMarkerOffset, lastSyncedOffset); // check we have more work to do - if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE) + final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE; + final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset; + if (!(needToMarkData || hasDataToFlush)) return; // Note: Even if the very first allocation of this sync section failed, we still want to enter this // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer, http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 967db15,8e05112..d5e6113 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@@ -17,27 -17,51 +17,26 @@@ */ package org.apache.cassandra.db.commitlog; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.compress.ICompressor; -import org.apache.cassandra.io.util.FileUtils; --import org.apache.cassandra.utils.SyncUtil; -/* +/** * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written * section of the buffer and writes it to the destination channel. + * + * The format of the compressed commit log is as follows: + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) + * - a series of 'sync segments' that are written every time the commit log is sync()'ed - * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)} ++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)} + * -- total plain text length for this section + * -- a block of compressed data */ -public class CompressedSegment extends CommitLogSegment +public class CompressedSegment extends FileDirectSegment { - private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() { - protected ByteBuffer initialValue() - { - return ByteBuffer.allocate(0); - } - }; - static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>(); - - /** - * The number of buffers in use - */ - private static AtomicInteger usedBuffers = new AtomicInteger(0); - - - /** - * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that - * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use - * more, depending on how soon the sync policy stops all writing threads. - */ - static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(); - static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4; final ICompressor compressor; - final Runnable onClose; - - volatile long lastWrittenPos = 0; /** * Constructs a new segment file. http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index 87825ab,0000000..21b7c11 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@@ -1,159 -1,0 +1,156 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import javax.crypto.Cipher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.utils.Hex; - import org.apache.cassandra.utils.SyncUtil; + +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE; + +/** + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into + * the encryption algorithms. + * + * The format of the encrypted commit log is as follows: + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) + * - a series of 'sync segments' that are written every time the commit log is sync()'ed + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)} + * -- total plain text length for this section + * -- a series of encrypted data blocks, each of which contains: + * --- the length of the encrypted block (cipher text) + * --- the length of the unencrypted data (compressed text) + * --- the encrypted block, which contains: + * ---- the length of the plain text (raw) data + * ---- block of compressed data + * + * Notes: + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding + * to the output buffer, and we need to ignore that padding when processing. + */ +public class EncryptedSegment extends FileDirectSegment +{ + private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class); + + private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4; + + private final EncryptionContext encryptionContext; + private final Cipher cipher; + + public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) + { + super(commitLog, manager); + this.encryptionContext = commitLog.configuration.getEncryptionContext(); + + try + { + cipher = encryptionContext.getEncryptor(); + } + catch (IOException e) + { + throw new FSWriteError(e, logFile); + } + logger.debug("created a new encrypted commit log segment: {}", logFile); + // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption + manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP); + } + + protected Map<String, String> additionalHeaderParameters() + { + Map<String, String> map = encryptionContext.toHeaderParameters(); + map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV())); + return map; + } + + ByteBuffer createBuffer(CommitLog commitLog) + { + // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, + // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs + return manager.getBufferPool().createBuffer(BufferType.ON_HEAP); + } + + void write(int startMarker, int nextMarker) + { + int contentStart = startMarker + SYNC_MARKER_SIZE; + final int length = nextMarker - contentStart; + // The length may be 0 when the segment is being closed. + assert length > 0 || length == 0 && !isStillAllocating(); + + final ICompressor compressor = encryptionContext.getCompressor(); + final int blockSize = encryptionContext.getChunkLength(); + try + { + ByteBuffer inputBuffer = buffer.duplicate(); + inputBuffer.limit(contentStart + length).position(contentStart); + ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize()); + + // save space for the sync marker at the beginning of this section + final long syncMarkerPosition = lastWrittenPos; + channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE); + + // loop over the segment data in encryption buffer sized chunks + while (contentStart < nextMarker) + { + int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart; + ByteBuffer slice = inputBuffer.duplicate(); + slice.limit(contentStart + nextBlockSize).position(contentStart); + + buffer = EncryptionUtils.compress(slice, buffer, true, compressor); + + // reuse the same buffer for the input and output of the encryption operation + buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher); + + contentStart += nextBlockSize; + manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); + } + + lastWrittenPos = channel.position(); + + // rewind to the beginning of the section and write out the sync marker + buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); + writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); + buffer.putInt(SYNC_MARKER_SIZE, length); + buffer.rewind(); + manager.addSize(buffer.limit()); + + channel.position(syncMarkerPosition); + channel.write(buffer); - - SyncUtil.force(channel, true); + } + catch (Exception e) + { + throw new FSWriteError(e, getPath()); + } + } + + public long onDiskSize() + { + return lastWrittenPos; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java index 55084be,0000000..d5431f8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java @@@ -1,66 -1,0 +1,80 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.io.FSWriteError; ++import org.apache.cassandra.utils.SyncUtil; + +/** + * Writes to the backing commit log file only on sync, allowing transformations of the mutations, + * such as compression or encryption, before writing out to disk. + */ +public abstract class FileDirectSegment extends CommitLogSegment +{ + volatile long lastWrittenPos = 0; + + FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) + { + super(commitLog, manager); + } + + @Override + void writeLogHeader() + { + super.writeLogHeader(); + try + { + channel.write((ByteBuffer) buffer.duplicate().flip()); + manager.addSize(lastWrittenPos = buffer.position()); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + + @Override + protected void internalClose() + { + try + { + manager.getBufferPool().releaseBuffer(buffer); + super.internalClose(); + } + finally + { + manager.notifyBufferFreed(); + } + } ++ ++ @Override ++ protected void flush(int startMarker, int nextMarker) ++ { ++ try ++ { ++ SyncUtil.force(channel, true); ++ } ++ catch (Exception e) ++ { ++ throw new FSWriteError(e, getPath()); ++ } ++ } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java index 0000000,e2b9f72..663e7af mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java @@@ -1,0 -1,128 +1,98 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.commitlog; + + import java.io.File; + import java.io.IOException; + import java.nio.ByteBuffer; ++import java.util.ArrayList; + import java.util.Random; + + import org.junit.Assert; + import org.junit.Test; + import org.junit.runner.RunWith; + + import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.Mutation; + import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.db.marshal.AsciiType; + import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.rows.SerializationHelper; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.RebufferingInputStream; + import org.apache.cassandra.schema.KeyspaceParams; + import org.jboss.byteman.contrib.bmunit.BMRule; + import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + + /** + * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers + * in the commit log segment but do not flush the file to disk. + */ + @RunWith(BMUnitRunner.class) + public class CommitLogChainedMarkersTest + { + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "CommitLogChainedMarkersTest"; + + @Test + @BMRule(name = "force all calls to sync() to not flush to disk", + targetClass = "CommitLogSegment", + targetMethod = "sync(boolean)", + action = "$flush = false") + public void replayCommitLogWithoutFlushing() throws IOException + { ++ // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods ++ DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setCommitLogSegmentSize(5); + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000); + DatabaseDescriptor.setCommitLogMarkerPeriod(1); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + byte[] entropy = new byte[1024]; + new Random().nextBytes(entropy); + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + int samples = 10000; + for (int i = 0; i < samples; i++) + CommitLog.instance.add(m); + - CommitLog.instance.sync(false, true); ++ CommitLog.instance.sync(false); + - Replayer replayer = new Replayer(cfs1.metadata); - File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); - replayer.recover(commitLogDir.listFiles()); - Assert.assertEquals(samples, replayer.count); - } - - private static class Replayer extends CommitLogReplayer - { - private final CFMetaData cfm; - private int count; - - Replayer(CFMetaData cfm) - { - super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create()); - this.cfm = cfm; - } - - @Override - void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) - { - RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); - try - { - Mutation mutation = Mutation.serializer.deserialize(bufIn, - desc.getMessagingVersion(), - SerializationHelper.Flag.LOCAL); ++ ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs(); ++ CommitLogReader reader = new CommitLogReader(); ++ CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata); ++ for (File f : toCheck) ++ reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false); + - if (cfm == null || mutation.get(cfm) != null) - count++; - } - catch (IOException e) - { - // Test fails. - throw new AssertionError(e); - } - } ++ Assert.assertEquals(samples, testHandler.seenMutationCount()); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java index 3956de5,a1999ef..46a3fb0 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java @@@ -67,12 -66,12 +67,12 @@@ public class CommitLogSegmentBackpressu @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", targetClass = "AbstractCommitLogService$1", targetMethod = "run", - targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", - targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", ++ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"), @BMRule(name = "Release Semaphore after sync", targetClass = "AbstractCommitLogService$1", targetMethod = "run", - targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", - targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)", ++ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)", action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")}) public void testCompressedCommitLogBackpressure() throws Throwable { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 267813e,b8f68ed..215ad6c --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -360,10 -339,10 +360,10 @@@ public class CommitLogTes // "Flush": this won't delete anything UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.sync(); - CommitLog.instance.sync(true, true); - CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext()); ++ CommitLog.instance.sync(true); + CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") @@@ -678,114 -612,6 +678,114 @@@ } @Test + public void replaySimple() throws IOException + { + int cellCount = 0; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + CommitLog.instance.add(rm1); + + final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + CommitLog.instance.add(rm2); + - CommitLog.instance.sync(); ++ CommitLog.instance.sync(true); + + SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata); + List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); + replayer.replayFiles(files); + + assertEquals(cellCount, replayer.cells); + } + + @Test + public void replayWithDiscard() throws IOException + { + int cellCount = 0; + int max = 1024; + int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay + CommitLogPosition commitLogPosition = null; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + for (int i = 0; i < max; i++) + { + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1) + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + CommitLogPosition position = CommitLog.instance.add(rm1); + + if (i == discardPosition) + commitLogPosition = position; + if (i > discardPosition) + { + cellCount += 1; + } + } + - CommitLog.instance.sync(); ++ CommitLog.instance.sync(true); + + SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata); + List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); + replayer.replayFiles(files); + + assertEquals(cellCount, replayer.cells); + } + + class SimpleCountingReplayer extends CommitLogReplayer + { + private final CommitLogPosition filterPosition; + private final CFMetaData metadata; + int cells; + int skipped; + + SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm) + { + super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create()); + this.filterPosition = filterPosition; + this.metadata = cfm; + } + + @SuppressWarnings("resource") + @Override + public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) + { + // Filter out system writes that could flake the test. + if (!KEYSPACE1.equals(m.getKeyspaceName())) + return; + + if (entryLocation <= filterPosition.position) + { + // Skip over this mutation. + skipped++; + return; + } + for (PartitionUpdate partitionUpdate : m.getPartitionUpdates()) + { + // Only process mutations for the CF's we're testing against, since we can't deterministically predict + // whether or not system keyspaces will be mutated during a test. + if (partitionUpdate.metadata().cfName.equals(metadata.cfName)) + { + for (Row row : partitionUpdate) + cells += Iterables.size(row.cells()); + } + } + } + } + public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException { CommitLog.instance.resetUnsafe(true); @@@ -826,7 -652,7 +826,7 @@@ DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); } - CommitLog.instance.sync(); - CommitLog.instance.sync(true, true); ++ CommitLog.instance.sync(true); System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog. // If retries work subsequent flushes should clear up error and this should change to expect 0. @@@ -859,7 -685,7 +859,7 @@@ for (SSTableReader reader : cfs.getLiveSSTables()) reader.reloadSSTableMetadata(); - CommitLog.instance.sync(); - CommitLog.instance.sync(true, true); ++ CommitLog.instance.sync(true); System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have // persisted all data in the commit log. Because we know there was an error, there must be something left to http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 7f43378,36973f2..9a22b04 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@@ -35,44 -36,44 +35,44 @@@ import org.apache.cassandra.io.util.Reb */ public class CommitLogTestReplayer extends CommitLogReplayer { - public static void examineCommitLog(Predicate<Mutation> processor) throws IOException - { - CommitLog.instance.sync(true, true); - - CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor); - File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); - replayer.recover(commitLogDir.listFiles()); - } - - final private Predicate<Mutation> processor; + private final Predicate<Mutation> processor; - public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor) + public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException { - this(log, ReplayPosition.NONE, processor); + super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create()); - CommitLog.instance.sync(); ++ CommitLog.instance.sync(true); + + this.processor = processor; + commitLogReader = new CommitLogTestReader(); } - public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor) + public void examineCommitLog() throws IOException { - super(log, discardedPos, null, ReplayFilter.create()); - this.processor = processor; + replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()); } - @Override - void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) + private class CommitLogTestReader extends CommitLogReader { - RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); - Mutation mutation; - try - { - mutation = Mutation.serializer.deserialize(bufIn, - desc.getMessagingVersion(), - SerializationHelper.Flag.LOCAL); - Assert.assertTrue(processor.apply(mutation)); - } - catch (IOException e) + @Override + protected void readMutation(CommitLogReadHandler handler, + byte[] inputBuffer, + int size, + CommitLogPosition minPosition, + final int entryLocation, + final CommitLogDescriptor desc) throws IOException { - // Test fails. - throw new AssertionError(e); + RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); + Mutation mutation; + try + { + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + Assert.assertTrue(processor.apply(mutation)); + } + catch (IOException e) + { + // Test fails. + throw new AssertionError(e); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org