smiklosovic commented on code in PR #3374:
URL: https://github.com/apache/cassandra/pull/3374#discussion_r1874395344


##########
src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java:
##########
@@ -17,156 +17,541 @@
  */
 package org.apache.cassandra.service.snapshot;
 
-
-import java.util.Collection;
-import java.util.PriorityQueue;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import javax.management.openmbean.TabularData;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Directories;
-
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-
-import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.TableDroppedNotification;
+import org.apache.cassandra.notifications.TablePreScrubNotification;
+import org.apache.cassandra.notifications.TruncationNotification;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
-import static java.util.Comparator.comparing;
-import static java.util.stream.Collectors.toList;
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static org.apache.cassandra.utils.FBUtilities.now;
+import static 
org.apache.cassandra.service.snapshot.ClearSnapshotTask.getClearSnapshotPredicate;
+import static 
org.apache.cassandra.service.snapshot.ClearSnapshotTask.getPredicateForCleanedSnapshots;
 
-public class SnapshotManager {
+public class SnapshotManager implements SnapshotManagerMBean, 
INotificationConsumer, AutoCloseable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SnapshotManager.class);
 
-    private static final ScheduledExecutorPlus executor = 
executorFactory().scheduled(false, "SnapshotCleanup");
+    private ScheduledExecutorPlus snapshotCleanupExecutor;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SnapshotManager.class);
+    public static final SnapshotManager instance = new SnapshotManager();
 
     private final long initialDelaySeconds;
     private final long cleanupPeriodSeconds;
-    private final SnapshotLoader snapshotLoader;
 
-    @VisibleForTesting
-    protected volatile ScheduledFuture<?> cleanupTaskFuture;
+    private volatile ScheduledFuture<?> cleanupTaskFuture;
+
+    private final String[] dataDirs;
+
+    private volatile boolean started = false;
 
     /**
-     * Expiring snapshots ordered by expiration date, to allow only iterating 
over snapshots
-     * that need to be removed on {@link this#clearExpiredSnapshots()}
+     * We read / list snapshots way more often than write / create them so COW 
is ideal to use here.
+     * This enables us to not submit listing tasks or tasks computing snapshot 
sizes to any executor's queue as they
+     * can be just run concurrently which gives way better throughput in case
+     * of excessive listing from clients (dashboards and similar) where 
snapshot metrics are gathered.
      */
-    private final PriorityQueue<TableSnapshot> expiringSnapshots = new 
PriorityQueue<>(comparing(TableSnapshot::getExpiresAt));
+    private final List<TableSnapshot> snapshots = new CopyOnWriteArrayList<>();
 
-    public SnapshotManager()
+    private SnapshotManager()
     {
         
this(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS.getInt(),
-             
CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.getInt());
+             
CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.getInt(),
+             DatabaseDescriptor.getAllDataFileLocations());
     }
 
     @VisibleForTesting
-    protected SnapshotManager(long initialDelaySeconds, long 
cleanupPeriodSeconds)
+    SnapshotManager(long initialDelaySeconds, long cleanupPeriodSeconds, 
String[] dataDirs)
     {
         this.initialDelaySeconds = initialDelaySeconds;
         this.cleanupPeriodSeconds = cleanupPeriodSeconds;
-        snapshotLoader = new 
SnapshotLoader(DatabaseDescriptor.getAllDataFileLocations());
+        this.dataDirs = dataDirs;
+        this.snapshotCleanupExecutor = createSnapshotCleanupExecutor();
     }
 
-    public Collection<TableSnapshot> getExpiringSnapshots()
+    public void registerMBean()
     {
-        return expiringSnapshots;
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
-    public synchronized void start()
+    public void unregisterMBean()
     {
-        addSnapshots(loadSnapshots());
-        resumeSnapshotCleanup();
+        MBeanWrapper.instance.unregisterMBean(MBEAN_NAME);
     }
 
-    public synchronized void stop() throws InterruptedException, 
TimeoutException
+    public synchronized SnapshotManager start(boolean 
runPeriodicSnapshotCleaner)
+    {
+        if (started)
+            return this;
+
+        if (snapshotCleanupExecutor == null)
+            snapshotCleanupExecutor = createSnapshotCleanupExecutor();
+
+        executeTask(new ReloadSnapshotsTask(dataDirs));
+
+        if (runPeriodicSnapshotCleaner)
+            resumeSnapshotCleanup();
+
+        started = true;
+        return this;
+    }
+
+    @Override
+    public synchronized void close()
+    {
+        if (!started)
+            return;
+
+        pauseSnapshotCleanup();
+
+        shutdownAndWait(1, TimeUnit.MINUTES);
+        snapshots.clear();
+
+        started = false;
+    }
+
+    public synchronized void shutdownAndWait(long timeout, TimeUnit unit)
+    {
+        try
+        {
+            ExecutorUtils.shutdownNowAndWait(timeout, unit, 
snapshotCleanupExecutor);
+        }
+        catch (InterruptedException | TimeoutException ex)
+        {
+            throw new RuntimeException(ex);
+        }
+        finally
+        {
+            snapshotCleanupExecutor = null;
+        }
+    }
+
+    public synchronized void restart(boolean runPeriodicSnapshotCleaner)
+    {
+        if (!started)
+            return;
+
+        logger.debug("Restarting SnapshotManager");
+        close();
+        start(runPeriodicSnapshotCleaner);
+        logger.debug("SnapshotManager restarted");
+    }
+
+
+    public synchronized void restart()
+    {
+        restart(true);
+    }
+
+    private static class ReloadSnapshotsTask extends 
AbstractSnapshotTask<Set<TableSnapshot>>
+    {
+        private final String[] dataDirs;
+
+        public ReloadSnapshotsTask(String[] dataDirs)
+        {
+            super(null);
+            this.dataDirs = dataDirs;
+        }
+
+        @Override
+        public Set<TableSnapshot> call()
+        {
+            Set<TableSnapshot> tableSnapshots = new 
SnapshotLoader(dataDirs).loadSnapshots();
+            new ClearSnapshotTask(SnapshotManager.instance, snapshot -> true, 
false).call();
+            for (TableSnapshot snapshot : tableSnapshots)
+                SnapshotManager.instance.addSnapshot(snapshot);
+
+            return tableSnapshots;
+        }
+
+        @Override
+        public SnapshotTaskType getTaskType()
+        {
+            return SnapshotTaskType.RELOAD;
+        }
+    }
+
+    void addSnapshot(TableSnapshot snapshot)
+    {
+        logger.debug("Adding snapshot {}", snapshot);
+        snapshots.add(snapshot);
+    }
+
+    List<TableSnapshot> getSnapshots()
+    {
+        return snapshots;
+    }
+
+    public void resumeSnapshotCleanup()
+    {
+        if (cleanupTaskFuture == null)
+        {
+            logger.info("Scheduling expired snapshots cleanup with 
initialDelaySeconds={} and cleanupPeriodSeconds={}",
+                        initialDelaySeconds, cleanupPeriodSeconds);
+
+            cleanupTaskFuture = 
snapshotCleanupExecutor.scheduleWithFixedDelay(SnapshotManager.instance::clearExpiredSnapshots,
+                                                                               
initialDelaySeconds,
+                                                                               
cleanupPeriodSeconds,
+                                                                               
SECONDS);
+        }
+    }
+
+    private void pauseSnapshotCleanup()
     {
-        expiringSnapshots.clear();
         if (cleanupTaskFuture != null)
         {
             cleanupTaskFuture.cancel(false);
             cleanupTaskFuture = null;
         }
     }
 
-    public synchronized void addSnapshot(TableSnapshot snapshot)
+    /**
+     * Deletes snapshot and removes it from manager.
+     *
+     * @param snapshot snapshot to clear
+     */
+    void clearSnapshot(TableSnapshot snapshot)
+    {
+        executeTask(new ClearSnapshotTask(this, s -> s.equals(snapshot), 
true));
+    }
+
+    /**
+     * Returns list of snapshots of given keyspace
+     *
+     * @param keyspace keyspace of a snapshot
+     * @return list of snapshots of given keyspace.
+     */
+    public List<TableSnapshot> getSnapshots(String keyspace)
+    {
+        return getSnapshots(snapshot -> 
snapshot.getKeyspaceName().equals(keyspace));
+    }
+
+    /**
+     * Return snapshots based on given parameters.
+     *
+     * @param skipExpiring     if expiring snapshots should be skipped
+     * @param includeEphemeral if ephemeral snapshots should be included
+     * @return snapshots based on given parameters
+     */
+    public List<TableSnapshot> getSnapshots(boolean skipExpiring, boolean 
includeEphemeral)
     {
-        // We currently only care about expiring snapshots
-        if (snapshot.isExpiring())
+        return getSnapshots(s -> (!skipExpiring || !s.isExpiring()) && 
(includeEphemeral || !s.isEphemeral()));
+    }
+
+    /**
+     * Returns all snapshots passing the given predicate.
+     *
+     * @param predicate predicate to filter all snapshots of
+     * @return list of snapshots passing the predicate
+     */
+    public List<TableSnapshot> getSnapshots(Predicate<TableSnapshot> predicate)
+    {
+        return new GetSnapshotsTask(this, predicate, true).call();
+    }
+
+    /**
+     * Returns a snapshot or empty optional based on the given parameters.
+     *
+     * @param keyspace keyspace of a snapshot
+     * @param table    table of a snapshot
+     * @param tag      name of a snapshot
+     * @return empty optional if there is not such snapshot, non-empty 
otherwise
+     */
+    public Optional<TableSnapshot> getSnapshot(String keyspace, String table, 
String tag)
+    {
+        List<TableSnapshot> foundSnapshots = new GetSnapshotsTask(this,
+                                                                  snapshot -> 
snapshot.getKeyspaceName().equals(keyspace) &&
+                                                                              
snapshot.getTableName().equals(table) &&
+                                                                              
snapshot.getTag().equals(tag) || (tag != null && tag.isEmpty()),
+                                                                  true).call();
+
+        if (foundSnapshots.isEmpty())
+            return Optional.empty();
+        else
+            return Optional.of(foundSnapshots.get(0));
+    }
+
+    /**
+     * Clear snapshots of given tag from given keyspace. Does not remove 
ephemeral snapshots.
+     * <p>
+     *
+     * @param tag      snapshot name
+     * @param keyspace keyspace to clear all snapshots of a given tag of
+     */
+    public void clearSnapshots(String tag, String keyspace)
+    {
+        clearSnapshots(tag, Set.of(keyspace), 
Clock.Global.currentTimeMillis());
+    }
+
+    /**
+     * Removes a snapshot.
+     * <p>
+     *
+     * @param keyspace keyspace of a snapshot to remove
+     * @param table    table of a snapshot to remove
+     * @param tag      name of a snapshot to remove.
+     */
+    public void clearSnapshot(String keyspace, String table, String tag)
+    {
+        executeTask(new ClearSnapshotTask(this,
+                                          snapshot -> 
snapshot.getKeyspaceName().equals(keyspace)
+                                                      && 
snapshot.getTableName().equals(table)
+                                                      && 
snapshot.getTag().equals(tag),
+                                          true));
+    }
+
+    /**
+     * Removes all snapshots for given keyspace and table.
+     *
+     * @param keyspace keyspace to remove snapshots for
+     * @param table    table in a given keyspace to remove snapshots for
+     */
+    public void clearAllSnapshots(String keyspace, String table)
+    {
+        executeTask(new ClearSnapshotTask(this,
+                                          snapshot -> 
snapshot.getKeyspaceName().equals(keyspace)
+                                                      && 
snapshot.getTableName().equals(table),
+                                          true));
+    }
+
+    /**
+     * Clears all snapshots, expiring and ephemeral as well.
+     */
+    public void clearAllSnapshots()
+    {
+        executeTask(new ClearSnapshotTask(this, snapshot -> true, true));
+    }
+
+    /**
+     * Clear snapshots based on a given predicate
+     *
+     * @param predicate predicate to filter snapshots on
+     */
+    public void clearSnapshot(Predicate<TableSnapshot> predicate)
+    {
+        executeTask(new ClearSnapshotTask(this, predicate, true));
+    }
+
+    /**
+     * Clears all ephemeral snapshots in a node.
+     */
+    public void clearEphemeralSnapshots()
+    {
+        executeTask(new ClearSnapshotTask(this, TableSnapshot::isEphemeral, 
true));
+    }
+
+    /**
+     * Clears all expired snapshots in a node.
+     */
+    public void clearExpiredSnapshots()
+    {
+        Instant now = FBUtilities.now();
+        executeTask(new ClearSnapshotTask(this, s -> s.isExpired(now), true));
+    }
+
+    /**
+     * Clear snapshots of given tag from given keyspaces.
+     * <p>
+     * If tag is not present / is empty, all snapshots are considered to be 
cleared.
+     * If keyspaces are empty, all snapshots of given tag and older than 
maxCreatedAt are removed.
+     *
+     * @param tag          optional tag of snapshot to clear
+     * @param keyspaces    keyspaces to remove snapshots for
+     * @param maxCreatedAt clear all such snapshots which were created before 
this timestamp
+     */
+    private void clearSnapshots(String tag, Set<String> keyspaces, long 
maxCreatedAt)
+    {
+        executeTask(new ClearSnapshotTask(this, getClearSnapshotPredicate(tag, 
keyspaces, maxCreatedAt, false), true));
+    }
+
+    public List<TableSnapshot> takeSnapshot(SnapshotOptions options)
+    {
+        return executeTask(new TakeSnapshotTask(this, options));
+    }
+
+    // Super methods
+
+    @Override
+    public void takeSnapshot(String tag, String... entities)
+    {
+        takeSnapshot(SnapshotOptions.userSnapshot(tag, Map.of(), entities));
+    }
+
+    @Override
+    public void takeSnapshot(String tag, Map<String, String> optMap, String... 
entities) throws IOException
+    {
+        try
+        {
+            takeSnapshot(SnapshotOptions.userSnapshot(tag, optMap, entities));
+        }
+        catch (SnapshotException ex)
         {
-            logger.debug("Adding expiring snapshot {}", snapshot);
-            expiringSnapshots.add(snapshot);
+            // to be compatible with deprecated methods in StorageService
+            throw new IOException(ex);
         }
     }
 
-    public synchronized Set<TableSnapshot> loadSnapshots(String keyspace)
+    @Override
+    public void clearSnapshot(String tag, Map<String, Object> options, 
String... keyspaceNames)
     {
-        return snapshotLoader.loadSnapshots(keyspace);
+        executeTask(new ClearSnapshotTask(this, 
getPredicateForCleanedSnapshots(tag, options, keyspaceNames), true));
     }
 
-    public synchronized Set<TableSnapshot> loadSnapshots()
+    @Override
+    public Map<String, TabularData> listSnapshots(Map<String, String> options)
     {
-        return snapshotLoader.loadSnapshots();
+        return new ListSnapshotsTask(this, options, true).call();
     }
 
-    @VisibleForTesting
-    protected synchronized void addSnapshots(Collection<TableSnapshot> 
snapshots)
+    @Override
+    public long getTrueSnapshotSize()
     {
-        logger.debug("Adding snapshots: {}.", Joiner.on(", 
").join(snapshots.stream().map(TableSnapshot::getId).collect(toList())));
-        snapshots.forEach(this::addSnapshot);
+        return new TrueSnapshotSizeTask(this, s -> true).call();
     }
 
-    // TODO: Support pausing snapshot cleanup
-    @VisibleForTesting
-    synchronized void resumeSnapshotCleanup()
+    @Override
+    public long getTrueSnapshotsSize(String keyspace)
     {
-        if (cleanupTaskFuture == null)
+        return new TrueSnapshotSizeTask(this, s -> 
s.getKeyspaceName().equals(keyspace)).call();
+    }
+
+    @Override
+    public long getTrueSnapshotsSize(String keyspace, String table)
+    {
+        return new TrueSnapshotSizeTask(this, s -> 
s.getKeyspaceName().equals(keyspace) && s.getTableName().equals(table)).call();
+    }
+
+    @Override
+    public void setSnapshotLinksPerSecond(long throttle)
+    {
+        logger.info("Setting snapshot throttle to {}", throttle);
+        DatabaseDescriptor.setSnapshotLinksPerSecond(throttle);
+    }
+
+    @Override
+    public long getSnapshotLinksPerSecond()
+    {
+        return DatabaseDescriptor.getSnapshotLinksPerSecond();
+    }
+
+    @Override
+    public void handleNotification(INotification notification, Object sender)
+    {
+        if (notification instanceof TruncationNotification)
         {
-            logger.info("Scheduling expired snapshot cleanup with 
initialDelaySeconds={} and cleanupPeriodSeconds={}",
-                        initialDelaySeconds, cleanupPeriodSeconds);
-            cleanupTaskFuture = 
executor.scheduleWithFixedDelay(this::clearExpiredSnapshots, 
initialDelaySeconds,
-                                                                
cleanupPeriodSeconds, TimeUnit.SECONDS);
+            TruncationNotification truncationNotification = 
(TruncationNotification) notification;
+            ColumnFamilyStore cfs = truncationNotification.cfs;
+            if (!truncationNotification.disableSnapshot && 
cfs.isAutoSnapshotEnabled())
+            {
+                SnapshotOptions opts = 
SnapshotOptions.systemSnapshot(cfs.name, SnapshotType.TRUNCATE, 
cfs.getKeyspaceTableName())

Review Comment:
   @frankgh 
   
   Actually, no ... it is good as it is, for dropping we need to add it, yet, 
but for truncation we do not. We also do not need that when reacting to 
`TablePreScrubNotification`. 
   
   The reason we need it only in case of dropping a table is that if we have 
not passed it there like that, then later on when we are getting CFS in 
`TakeSnapshotTask`, that CFS would not be found. Look into `dropCf` method in 
`Keyspace`. There we are removing cfs from `columnFamilyStores` by `tableId`. 
But later on, when we get CFS in `TakeSnapshotTask`, we would try to get that 
from the very same map  - which would return null to us. So just for dropping 
case, we pass just removed cfs into `TableDroppedNotification`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to