This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a136517211342621be2dadd5d53affb2e4fbf583 Merge: 32a15f0 35446dc Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Mon Oct 4 14:58:00 2021 -0500 Merge branch 'cassandra-3.0' into cassandra-3.11 .../cassandra/db/commitlog/AbstractCommitLogSegmentManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index df2e4f3,0000000..18b4374 mode 100755,000000..100755 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@@ -1,563 -1,0 +1,565 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.SimpleCachedBufferPool; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public abstract class AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); + + /** + * Segment that is ready to be used. The management thread fills this and blocks until consumed. + * + * A single management thread produces this, and consumers are already synchronizing to make sure other work is + * performed atomically with consuming this. Volatile to make sure writes by the management thread become + * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must + * synchronize on 'this'. + */ + private volatile CommitLogSegment availableSegment = null; + + private final WaitQueue segmentPrepared = new WaitQueue(); + + /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); + + /** + * The segment we are currently allocating commit log records to. + * + * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value. + */ + private volatile CommitLogSegment allocatingFrom = null; + + final String storageDirectory; + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + private Thread managerThread; + protected final CommitLog commitLog; + private volatile boolean shutdown; + private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown; + private final WaitQueue managerThreadWaitQueue = new WaitQueue(); + + private volatile SimpleCachedBufferPool bufferPool; + + AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) + { + this.commitLog = commitLog; + this.storageDirectory = storageDirectory; + } + + void start() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (!shutdown) + { + try + { + assert availableSegment == null; + logger.trace("No segments in reserve; creating a fresh one"); + availableSegment = createSegment(); + if (shutdown) + { + // If shutdown() started and finished during segment creation, we are now left with a + // segment that no one will consume. Discard it. + discardAvailableSegment(); + return; + } + + segmentPrepared.signalAll(); + Thread.yield(); + + if (availableSegment == null && !atSegmentBufferLimit()) + // Writing threads need another segment now. + continue; + + // Writing threads are not waiting for new segments, we can spend time on other tasks. + // flush old Cfs if we're full + maybeFlushToReclaim(); + } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // If we offered a segment, wait for it to be taken before reentering the loop. + // There could be a new segment in next not offered, but only on failure to discard it while + // shutting down-- nothing more can or needs to be done in that case. + } + + WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue); + } + } + }; + + // For encrypted segments we want to keep the compression buffers on-heap as we need those bytes for encryption, + // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs + BufferType bufferType = commitLog.configuration.useEncryption() || !commitLog.configuration.useCompression() + ? BufferType.ON_HEAP + : commitLog.configuration.getCompressor().preferredBufferType(); + + this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), + DatabaseDescriptor.getCommitLogSegmentSize(), + bufferType); + + shutdown = false; + managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + + // for simplicity, ensure the first segment is allocated before continuing + advanceAllocatingFrom(null); + } + + private boolean atSegmentBufferLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + + private void maybeFlushToReclaim() + { + long unused = unusedCapacity(); + if (unused < 0) + { + long flushingSize = 0; + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + flushingSize += segment.onDiskSize(); + segmentsToRecycle.add(segment); + if (flushingSize + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + } + + + /** + * Allocate a segment within this CLSM. Should either succeed or throw. + */ + public abstract Allocation allocate(Mutation mutation, int size); + + /** + * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM + * decide what to do with those segments on disk after they've been replayed. + */ + abstract void handleReplayedSegment(final File file); + + /** + * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit + * to segment manager so it's performed on segment management thread. + */ + abstract CommitLogSegment createSegment(); + + /** + * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment + * manager so it's performend on segment management thread, or perform while segment management thread is shutdown + * during testing resets. + * + * @param segment segment to be discarded + * @param delete whether or not the segment is safe to be deleted. + */ + abstract void discard(CommitLogSegment segment, boolean delete); + + /** + * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. + * + * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. + */ + @DontInline + void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + synchronized (this) + { + // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments + if (allocatingFrom != old) + return; + + // If a segment is ready, take it now, otherwise wait for the management thread to construct it. + if (availableSegment != null) + { + // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving + // the critical section. + activeSegments.add(allocatingFrom = availableSegment); + availableSegment = null; + break; + } + } + + awaitAvailableSegment(old); + } + + // Signal the management thread to prepare a new segment. + wakeManager(); + + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); + } + + void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) + { + do + { + WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + if (availableSegment == null && allocatingFrom == currentAllocatingFrom) + prepared.awaitUninterruptibly(); + else + prepared.cancel(); + } + while (availableSegment == null && allocatingFrom == currentAllocatingFrom); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * - * Flushes any dirty CFs for this segment and any older segments, and then discards the segments ++ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments. ++ * This is necessary to avoid resurrecting data during replay if a user creates a new table with ++ * the same name and ID. See CASSANDRA-16986 for more details. + */ + void forceRecycleAll(Iterable<UUID> droppedCfs) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // to complete + Keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future<?> future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) + segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + { + if (segment.isUnused()) + archiveAndDiscard(segment); + } + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be discarded. + * + * @param segment segment that is no longer in use + */ + void archiveAndDiscard(final CommitLogSegment segment) + { + boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); + if (!activeSegments.remove(segment)) + return; // already discarded + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script"); + discard(segment, archiveSuccess); + } + + /** + * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. + * @param addedSize + */ + void addSize(long addedSize) + { + size.addAndGet(addedSize); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long onDiskSize() + { + return size.get(); + } + + private long unusedCapacity() + { + long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; + long currentSize = size.get(); + logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); + return total - currentSize; + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); + segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + * Only call this after the AbstractCommitLogService is shut down. + */ + public void stopUnsafe(boolean deleteSegments) + { + logger.debug("CLSM closing and clearing existing commit log segments..."); + + shutdown(); + try + { + awaitTermination(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); + + size.set(0L); + + logger.trace("CLSM done with closing and clearing existing commit log segments."); + } + + /** + * To be used by tests only. Not safe if mutation slots are being allocated concurrently. + */ + void awaitManagementTasksCompletion() + { + if (availableSegment == null && !atSegmentBufferLimit()) + { + awaitAvailableSegment(allocatingFrom); + } + } + + /** + * Explicitly for use only during resets in unit testing. + */ + private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) + { + try + { + discard(segment, delete); + } + catch (AssertionError ignored) + { + // segment file does not exist + } + } + + /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + assert !shutdown; + shutdown = true; + + // Release the management thread and delete prepared segment. + // Do not block as another thread may claim the segment (this can happen during unit test initialization). + discardAvailableSegment(); + wakeManager(); + } + + private void discardAvailableSegment() + { + CommitLogSegment next = null; + synchronized (this) + { + next = availableSegment; + availableSegment = null; + } + if (next != null) + next.discard(true); + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + managerThread = null; + + for (CommitLogSegment segment : activeSegments) + segment.close(); + + if (bufferPool != null) + bufferPool.emptyBufferPool(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } + + /** + * @return the current CommitLogPosition of the active segment we're allocating from + */ + CommitLogPosition getCurrentPosition() + { + return allocatingFrom.getCurrentCommitLogPosition(); + } + + /** + * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk. + * + * @param flush Request that the sync operation flush the file to disk. + */ + public void sync(boolean flush) throws IOException + { + CommitLogSegment current = allocatingFrom; + for (CommitLogSegment segment : getActiveSegments()) + { + // Do not sync segments that became active after sync started. + if (segment.id > current.id) + return; + segment.sync(flush); + } + } + + /** + * Used by compressed and encrypted segments to share a buffer pool across the CLSM. + */ + SimpleCachedBufferPool getBufferPool() + { + return bufferPool; + } + + void wakeManager() + { + managerThreadWaitQueue.signalAll(); + } + + /** + * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for + * a buffer to become available. + */ + void notifyBufferFreed() + { + wakeManager(); + } + + /** Read-only access to current segment for subclasses. */ + CommitLogSegment allocatingFrom() + { + return allocatingFrom; + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org