This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a136517211342621be2dadd5d53affb2e4fbf583
Merge: 32a15f0 35446dc
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Mon Oct 4 14:58:00 2021 -0500

    Merge branch 'cassandra-3.0' into cassandra-3.11

 .../cassandra/db/commitlog/AbstractCommitLogSegmentManager.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --cc 
src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index df2e4f3,0000000..18b4374
mode 100755,000000..100755
--- 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,563 -1,0 +1,565 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.BooleanSupplier;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.util.SimpleCachedBufferPool;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +
 +/**
 + * Performs eager-creation of commit log segments in a background thread. All 
the
 + * public methods are thread safe.
 + */
 +public abstract class AbstractCommitLogSegmentManager
 +{
 +    static final Logger logger = 
LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 +
 +    /**
 +     * Segment that is ready to be used. The management thread fills this and 
blocks until consumed.
 +     *
 +     * A single management thread produces this, and consumers are already 
synchronizing to make sure other work is
 +     * performed atomically with consuming this. Volatile to make sure writes 
by the management thread become
 +     * visible (ordered/lazySet would suffice). Consumers 
(advanceAllocatingFrom and discardAvailableSegment) must
 +     * synchronize on 'this'.
 +     */
 +    private volatile CommitLogSegment availableSegment = null;
 +
 +    private final WaitQueue segmentPrepared = new WaitQueue();
 +
 +    /** Active segments, containing unflushed data. The tail of this queue is 
the one we allocate writes to */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = 
new ConcurrentLinkedQueue<>();
 +
 +    /**
 +     * The segment we are currently allocating commit log records to.
 +     *
 +     * Written by advanceAllocatingFrom which synchronizes on 'this'. 
Volatile to ensure reads get current value.
 +     */
 +    private volatile CommitLogSegment allocatingFrom = null;
 +
 +    final String storageDirectory;
 +
 +    /**
 +     * Tracks commitlog size, in multiples of the segment size.  We need to 
do this so we can "promise" size
 +     * adjustments ahead of actually adding/freeing segments on disk, so that 
the "evict oldest segment" logic
 +     * can see the effect of recycling segments immediately (even though 
they're really happening asynchronously
 +     * on the manager thread, which will take a ms or two).
 +     */
 +    private final AtomicLong size = new AtomicLong();
 +
 +    private Thread managerThread;
 +    protected final CommitLog commitLog;
 +    private volatile boolean shutdown;
 +    private final BooleanSupplier managerThreadWaitCondition = () -> 
(availableSegment == null && !atSegmentBufferLimit()) || shutdown;
 +    private final WaitQueue managerThreadWaitQueue = new WaitQueue();
 +
 +    private volatile SimpleCachedBufferPool bufferPool;
 +
 +    AbstractCommitLogSegmentManager(final CommitLog commitLog, String 
storageDirectory)
 +    {
 +        this.commitLog = commitLog;
 +        this.storageDirectory = storageDirectory;
 +    }
 +
 +    void start()
 +    {
 +        // The run loop for the manager thread
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws Exception
 +            {
 +                while (!shutdown)
 +                {
 +                    try
 +                    {
 +                        assert availableSegment == null;
 +                        logger.trace("No segments in reserve; creating a 
fresh one");
 +                        availableSegment = createSegment();
 +                        if (shutdown)
 +                        {
 +                            // If shutdown() started and finished during 
segment creation, we are now left with a
 +                            // segment that no one will consume. Discard it.
 +                            discardAvailableSegment();
 +                            return;
 +                        }
 +
 +                        segmentPrepared.signalAll();
 +                        Thread.yield();
 +
 +                        if (availableSegment == null && 
!atSegmentBufferLimit())
 +                            // Writing threads need another segment now.
 +                            continue;
 +
 +                        // Writing threads are not waiting for new segments, 
we can spend time on other tasks.
 +                        // flush old Cfs if we're full
 +                        maybeFlushToReclaim();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        if (!CommitLog.handleCommitError("Failed managing 
commit log segments", t))
 +                            return;
 +                        // sleep some arbitrary period to avoid spamming CL
 +                        Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.SECONDS);
 +
 +                        // If we offered a segment, wait for it to be taken 
before reentering the loop.
 +                        // There could be a new segment in next not offered, 
but only on failure to discard it while
 +                        // shutting down-- nothing more can or needs to be 
done in that case.
 +                    }
 +
 +                    WaitQueue.waitOnCondition(managerThreadWaitCondition, 
managerThreadWaitQueue);
 +                }
 +            }
 +        };
 +
 +        // For encrypted segments we want to keep the compression buffers 
on-heap as we need those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to 
on-heap encryption APIs
 +        BufferType bufferType = commitLog.configuration.useEncryption() || 
!commitLog.configuration.useCompression()
 +                              ? BufferType.ON_HEAP
 +                              : 
commitLog.configuration.getCompressor().preferredBufferType();
 +
 +        this.bufferPool = new 
SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
 +                                                     
DatabaseDescriptor.getCommitLogSegmentSize(),
 +                                                     bufferType);
 +
 +        shutdown = false;
 +        managerThread = NamedThreadFactory.createThread(runnable, 
"COMMIT-LOG-ALLOCATOR");
 +        managerThread.start();
 +
 +        // for simplicity, ensure the first segment is allocated before 
continuing
 +        advanceAllocatingFrom(null);
 +    }
 +
 +    private boolean atSegmentBufferLimit()
 +    {
 +        return CommitLogSegment.usesBufferPool(commitLog) && 
bufferPool.atLimit();
 +    }
 +
 +    private void maybeFlushToReclaim()
 +    {
 +        long unused = unusedCapacity();
 +        if (unused < 0)
 +        {
 +            long flushingSize = 0;
 +            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment == allocatingFrom)
 +                    break;
 +                flushingSize += segment.onDiskSize();
 +                segmentsToRecycle.add(segment);
 +                if (flushingSize + unused >= 0)
 +                    break;
 +            }
 +            flushDataFrom(segmentsToRecycle, false);
 +        }
 +    }
 +
 +
 +    /**
 +     * Allocate a segment within this CLSM. Should either succeed or throw.
 +     */
 +    public abstract Allocation allocate(Mutation mutation, int size);
 +
 +    /**
 +     * The recovery and replay process replays mutations into memtables and 
flushes them to disk. Individual CLSM
 +     * decide what to do with those segments on disk after they've been 
replayed.
 +     */
 +    abstract void handleReplayedSegment(final File file);
 +
 +    /**
 +     * Hook to allow segment managers to track state surrounding creation of 
new segments. Onl perform as task submit
 +     * to segment manager so it's performed on segment management thread.
 +     */
 +    abstract CommitLogSegment createSegment();
 +
 +    /**
 +     * Indicates that a segment file has been flushed and is no longer 
needed. Only perform as task submit to segment
 +     * manager so it's performend on segment management thread, or perform 
while segment management thread is shutdown
 +     * during testing resets.
 +     *
 +     * @param segment segment to be discarded
 +     * @param delete  whether or not the segment is safe to be deleted.
 +     */
 +    abstract void discard(CommitLogSegment segment, boolean delete);
 +
 +    /**
 +     * Advances the allocatingFrom pointer to the next prepared segment, but 
only if it is currently the segment provided.
 +     *
 +     * WARNING: Assumes segment management thread always succeeds in 
allocating a new segment or kills the JVM.
 +     */
 +    @DontInline
 +    void advanceAllocatingFrom(CommitLogSegment old)
 +    {
 +        while (true)
 +        {
 +            synchronized (this)
 +            {
 +                // do this in a critical section so we can maintain the order 
of segment construction when moving to allocatingFrom/activeSegments
 +                if (allocatingFrom != old)
 +                    return;
 +
 +                // If a segment is ready, take it now, otherwise wait for the 
management thread to construct it.
 +                if (availableSegment != null)
 +                {
 +                    // Success! Change allocatingFrom and activeSegments 
(which must be kept in order) before leaving
 +                    // the critical section.
 +                    activeSegments.add(allocatingFrom = availableSegment);
 +                    availableSegment = null;
 +                    break;
 +                }
 +            }
 +
 +            awaitAvailableSegment(old);
 +        }
 +
 +        // Signal the management thread to prepare a new segment.
 +        wakeManager();
 +
 +        if (old != null)
 +        {
 +            // Now we can run the user defined command just after switching 
to the new commit log.
 +            // (Do this here instead of in the recycle call so we can get a 
head start on the archive.)
 +            commitLog.archiver.maybeArchive(old);
 +
 +            // ensure we don't continue to use the old file; not strictly 
necessary, but cleaner to enforce it
 +            old.discardUnusedTail();
 +        }
 +
 +        // request that the CL be synced out-of-band, as we've finished a 
segment
 +        commitLog.requestExtraSync();
 +    }
 +
 +    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
 +    {
 +        do
 +        {
 +            WaitQueue.Signal prepared = 
segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 +            if (availableSegment == null && allocatingFrom == 
currentAllocatingFrom)
 +                prepared.awaitUninterruptibly();
 +            else
 +                prepared.cancel();
 +        }
 +        while (availableSegment == null && allocatingFrom == 
currentAllocatingFrom);
 +    }
 +
 +    /**
 +     * Switch to a new segment, regardless of how much is left in the current 
one.
 +     *
-      * Flushes any dirty CFs for this segment and any older segments, and 
then discards the segments
++     * Flushes any dirty CFs for this segment and any older segments, and 
then discards the segments.
++     * This is necessary to avoid resurrecting data during replay if a user 
creates a new table with
++     * the same name and ID. See CASSANDRA-16986 for more details.
 +     */
 +    void forceRecycleAll(Iterable<UUID> droppedCfs)
 +    {
 +        List<CommitLogSegment> segmentsToRecycle = new 
ArrayList<>(activeSegments);
 +        CommitLogSegment last = 
segmentsToRecycle.get(segmentsToRecycle.size() - 1);
 +        advanceAllocatingFrom(last);
 +
 +        // wait for the commit log modifications
 +        last.waitForModifications();
 +
 +        // make sure the writes have materialized inside of the memtables by 
waiting for all outstanding writes
 +        // to complete
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        // flush and wait for all CFs that are dirty in segments up-to and 
including 'last'
 +        Future<?> future = flushDataFrom(segmentsToRecycle, true);
 +        try
 +        {
 +            future.get();
 +
 +            for (CommitLogSegment segment : activeSegments)
 +                for (UUID cfId : droppedCfs)
 +                    segment.markClean(cfId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
 +
 +            // now recycle segments that are unused, as we may not have 
triggered a discardCompletedSegments()
 +            // if the previous active segment was the only one to recycle 
(since an active segment isn't
 +            // necessarily dirty, and we only call dCS after a flush).
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment.isUnused())
 +                    archiveAndDiscard(segment);
 +            }
 +
 +            CommitLogSegment first;
 +            if ((first = activeSegments.peek()) != null && first.id <= 
last.id)
 +                logger.error("Failed to force-recycle all segments; at least 
one segment is still in use with dirty CFs.");
 +        }
 +        catch (Throwable t)
 +        {
 +            // for now just log the error
 +            logger.error("Failed waiting for a forced recycle of in-use 
commit log segments", t);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment is no longer in use and that it should be 
discarded.
 +     *
 +     * @param segment segment that is no longer in use
 +     */
 +    void archiveAndDiscard(final CommitLogSegment segment)
 +    {
 +        boolean archiveSuccess = 
commitLog.archiver.maybeWaitForArchiving(segment.getName());
 +        if (!activeSegments.remove(segment))
 +            return; // already discarded
 +        // if archiving (command) was not successful then leave the file 
alone. don't delete or recycle.
 +        logger.debug("Segment {} is no longer active and will be deleted {}", 
segment, archiveSuccess ? "now" : "by the archive script");
 +        discard(segment, archiveSuccess);
 +    }
 +
 +    /**
 +     * Adjust the tracked on-disk size. Called by individual segments to 
reflect writes, allocations and discards.
 +     * @param addedSize
 +     */
 +    void addSize(long addedSize)
 +    {
 +        size.addAndGet(addedSize);
 +    }
 +
 +    /**
 +     * @return the space (in bytes) used by all segment files.
 +     */
 +    public long onDiskSize()
 +    {
 +        return size.get();
 +    }
 +
 +    private long unusedCapacity()
 +    {
 +        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 
1024;
 +        long currentSize = size.get();
 +        logger.trace("Total active commitlog segment space used is {} out of 
{}", currentSize, total);
 +        return total - currentSize;
 +    }
 +
 +    /**
 +     * Force a flush on all CFs that are still dirty in @param segments.
 +     *
 +     * @return a Future that will finish when all the flushes are complete.
 +     */
 +    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean 
force)
 +    {
 +        if (segments.isEmpty())
 +            return Futures.immediateFuture(null);
 +        final CommitLogPosition maxCommitLogPosition = 
segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 +
 +        // a map of CfId -> forceFlush() to ensure we only queue one flush 
per cf
 +        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
 +
 +        for (CommitLogSegment segment : segments)
 +        {
 +            for (UUID dirtyCFId : segment.getDirtyCFIDs())
 +            {
 +                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
 +                if (pair == null)
 +                {
 +                    // even though we remove the schema entry before a final 
flush when dropping a CF,
 +                    // it's still possible for a writer to race and finish 
his append after the flush.
 +                    logger.trace("Marking clean CF {} that doesn't exist 
anymore", dirtyCFId);
 +                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, 
segment.getCurrentCommitLogPosition());
 +                }
 +                else if (!flushes.containsKey(dirtyCFId))
 +                {
 +                    String keyspace = pair.left;
 +                    final ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
 +                    // can safely call forceFlush here as we will only ever 
block (briefly) for other attempts to flush,
 +                    // no deadlock possibility since switchLock removal
 +                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : 
cfs.forceFlush(maxCommitLogPosition));
 +                }
 +            }
 +        }
 +
 +        return Futures.allAsList(flushes.values());
 +    }
 +
 +    /**
 +     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
 +     * Only call this after the AbstractCommitLogService is shut down.
 +     */
 +    public void stopUnsafe(boolean deleteSegments)
 +    {
 +        logger.debug("CLSM closing and clearing existing commit log 
segments...");
 +
 +        shutdown();
 +        try
 +        {
 +            awaitTermination();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +        activeSegments.clear();
 +
 +        size.set(0L);
 +
 +        logger.trace("CLSM done with closing and clearing existing commit log 
segments.");
 +    }
 +
 +    /**
 +     * To be used by tests only. Not safe if mutation slots are being 
allocated concurrently.
 +     */
 +    void awaitManagementTasksCompletion()
 +    {
 +        if (availableSegment == null && !atSegmentBufferLimit())
 +        {
 +            awaitAvailableSegment(allocatingFrom);
 +        }
 +    }
 +
 +    /**
 +     * Explicitly for use only during resets in unit testing.
 +     */
 +    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, 
boolean delete)
 +    {
 +        try
 +        {
 +            discard(segment, delete);
 +        }
 +        catch (AssertionError ignored)
 +        {
 +            // segment file does not exist
 +        }
 +    }
 +
 +    /**
 +     * Initiates the shutdown process for the management thread.
 +     */
 +    public void shutdown()
 +    {
 +        assert !shutdown;
 +        shutdown = true;
 +
 +        // Release the management thread and delete prepared segment.
 +        // Do not block as another thread may claim the segment (this can 
happen during unit test initialization).
 +        discardAvailableSegment();
 +        wakeManager();
 +    }
 +
 +    private void discardAvailableSegment()
 +    {
 +        CommitLogSegment next = null;
 +        synchronized (this)
 +        {
 +            next = availableSegment;
 +            availableSegment = null;
 +        }
 +        if (next != null)
 +            next.discard(true);
 +    }
 +
 +    /**
 +     * Returns when the management thread terminates.
 +     */
 +    public void awaitTermination() throws InterruptedException
 +    {
 +        managerThread.join();
 +        managerThread = null;
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            segment.close();
 +
 +        if (bufferPool != null)
 +            bufferPool.emptyBufferPool();
 +    }
 +
 +    /**
 +     * @return a read-only collection of the active commit log segments
 +     */
 +    @VisibleForTesting
 +    public Collection<CommitLogSegment> getActiveSegments()
 +    {
 +        return Collections.unmodifiableCollection(activeSegments);
 +    }
 +
 +    /**
 +     * @return the current CommitLogPosition of the active segment we're 
allocating from
 +     */
 +    CommitLogPosition getCurrentPosition()
 +    {
 +        return allocatingFrom.getCurrentCommitLogPosition();
 +    }
 +
 +    /**
 +     * Requests commit log files sync themselves, if needed. This may or may 
not involve flushing to disk.
 +     *
 +     * @param flush Request that the sync operation flush the file to disk.
 +     */
 +    public void sync(boolean flush) throws IOException
 +    {
 +        CommitLogSegment current = allocatingFrom;
 +        for (CommitLogSegment segment : getActiveSegments())
 +        {
 +            // Do not sync segments that became active after sync started.
 +            if (segment.id > current.id)
 +                return;
 +            segment.sync(flush);
 +        }
 +    }
 +
 +    /**
 +     * Used by compressed and encrypted segments to share a buffer pool 
across the CLSM.
 +     */
 +    SimpleCachedBufferPool getBufferPool()
 +    {
 +        return bufferPool;
 +    }
 +
 +    void wakeManager()
 +    {
 +        managerThreadWaitQueue.signalAll();
 +    }
 +
 +    /**
 +     * Called by commit log segments when a buffer is freed to wake the 
management thread, which may be waiting for
 +     * a buffer to become available.
 +     */
 +    void notifyBufferFreed()
 +    {
 +        wakeManager();
 +    }
 +
 +    /** Read-only access to current segment for subclasses. */
 +    CommitLogSegment allocatingFrom()
 +    {
 +        return allocatingFrom;
 +    }
 +}
 +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to