smiklosovic commented on code in PR #3374:
URL: https://github.com/apache/cassandra/pull/3374#discussion_r1874395344
##########
src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java:
##########
@@ -17,156 +17,541 @@
*/
package org.apache.cassandra.service.snapshot;
-
-import java.util.Collection;
-import java.util.PriorityQueue;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import javax.management.openmbean.TabularData;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Directories;
-
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-
-import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.TableDroppedNotification;
+import org.apache.cassandra.notifications.TablePreScrubNotification;
+import org.apache.cassandra.notifications.TruncationNotification;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
-import static java.util.Comparator.comparing;
-import static java.util.stream.Collectors.toList;
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static org.apache.cassandra.utils.FBUtilities.now;
+import static
org.apache.cassandra.service.snapshot.ClearSnapshotTask.getClearSnapshotPredicate;
+import static
org.apache.cassandra.service.snapshot.ClearSnapshotTask.getPredicateForCleanedSnapshots;
-public class SnapshotManager {
+public class SnapshotManager implements SnapshotManagerMBean,
INotificationConsumer, AutoCloseable
+{
+ private static final Logger logger =
LoggerFactory.getLogger(SnapshotManager.class);
- private static final ScheduledExecutorPlus executor =
executorFactory().scheduled(false, "SnapshotCleanup");
+ private ScheduledExecutorPlus snapshotCleanupExecutor;
- private static final Logger logger =
LoggerFactory.getLogger(SnapshotManager.class);
+ public static final SnapshotManager instance = new SnapshotManager();
private final long initialDelaySeconds;
private final long cleanupPeriodSeconds;
- private final SnapshotLoader snapshotLoader;
- @VisibleForTesting
- protected volatile ScheduledFuture<?> cleanupTaskFuture;
+ private volatile ScheduledFuture<?> cleanupTaskFuture;
+
+ private final String[] dataDirs;
+
+ private volatile boolean started = false;
/**
- * Expiring snapshots ordered by expiration date, to allow only iterating
over snapshots
- * that need to be removed on {@link this#clearExpiredSnapshots()}
+ * We read / list snapshots way more often than write / create them so COW
is ideal to use here.
+ * This enables us to not submit listing tasks or tasks computing snapshot
sizes to any executor's queue as they
+ * can be just run concurrently which gives way better throughput in case
+ * of excessive listing from clients (dashboards and similar) where
snapshot metrics are gathered.
*/
- private final PriorityQueue<TableSnapshot> expiringSnapshots = new
PriorityQueue<>(comparing(TableSnapshot::getExpiresAt));
+ private final List<TableSnapshot> snapshots = new CopyOnWriteArrayList<>();
- public SnapshotManager()
+ private SnapshotManager()
{
this(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS.getInt(),
-
CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.getInt());
+
CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS.getInt(),
+ DatabaseDescriptor.getAllDataFileLocations());
}
@VisibleForTesting
- protected SnapshotManager(long initialDelaySeconds, long
cleanupPeriodSeconds)
+ SnapshotManager(long initialDelaySeconds, long cleanupPeriodSeconds,
String[] dataDirs)
{
this.initialDelaySeconds = initialDelaySeconds;
this.cleanupPeriodSeconds = cleanupPeriodSeconds;
- snapshotLoader = new
SnapshotLoader(DatabaseDescriptor.getAllDataFileLocations());
+ this.dataDirs = dataDirs;
+ this.snapshotCleanupExecutor = createSnapshotCleanupExecutor();
}
- public Collection<TableSnapshot> getExpiringSnapshots()
+ public void registerMBean()
{
- return expiringSnapshots;
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
- public synchronized void start()
+ public void unregisterMBean()
{
- addSnapshots(loadSnapshots());
- resumeSnapshotCleanup();
+ MBeanWrapper.instance.unregisterMBean(MBEAN_NAME);
}
- public synchronized void stop() throws InterruptedException,
TimeoutException
+ public synchronized SnapshotManager start(boolean
runPeriodicSnapshotCleaner)
+ {
+ if (started)
+ return this;
+
+ if (snapshotCleanupExecutor == null)
+ snapshotCleanupExecutor = createSnapshotCleanupExecutor();
+
+ executeTask(new ReloadSnapshotsTask(dataDirs));
+
+ if (runPeriodicSnapshotCleaner)
+ resumeSnapshotCleanup();
+
+ started = true;
+ return this;
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ if (!started)
+ return;
+
+ pauseSnapshotCleanup();
+
+ shutdownAndWait(1, TimeUnit.MINUTES);
+ snapshots.clear();
+
+ started = false;
+ }
+
+ public synchronized void shutdownAndWait(long timeout, TimeUnit unit)
+ {
+ try
+ {
+ ExecutorUtils.shutdownNowAndWait(timeout, unit,
snapshotCleanupExecutor);
+ }
+ catch (InterruptedException | TimeoutException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ finally
+ {
+ snapshotCleanupExecutor = null;
+ }
+ }
+
+ public synchronized void restart(boolean runPeriodicSnapshotCleaner)
+ {
+ if (!started)
+ return;
+
+ logger.debug("Restarting SnapshotManager");
+ close();
+ start(runPeriodicSnapshotCleaner);
+ logger.debug("SnapshotManager restarted");
+ }
+
+
+ public synchronized void restart()
+ {
+ restart(true);
+ }
+
+ private static class ReloadSnapshotsTask extends
AbstractSnapshotTask<Set<TableSnapshot>>
+ {
+ private final String[] dataDirs;
+
+ public ReloadSnapshotsTask(String[] dataDirs)
+ {
+ super(null);
+ this.dataDirs = dataDirs;
+ }
+
+ @Override
+ public Set<TableSnapshot> call()
+ {
+ Set<TableSnapshot> tableSnapshots = new
SnapshotLoader(dataDirs).loadSnapshots();
+ new ClearSnapshotTask(SnapshotManager.instance, snapshot -> true,
false).call();
+ for (TableSnapshot snapshot : tableSnapshots)
+ SnapshotManager.instance.addSnapshot(snapshot);
+
+ return tableSnapshots;
+ }
+
+ @Override
+ public SnapshotTaskType getTaskType()
+ {
+ return SnapshotTaskType.RELOAD;
+ }
+ }
+
+ void addSnapshot(TableSnapshot snapshot)
+ {
+ logger.debug("Adding snapshot {}", snapshot);
+ snapshots.add(snapshot);
+ }
+
+ List<TableSnapshot> getSnapshots()
+ {
+ return snapshots;
+ }
+
+ public void resumeSnapshotCleanup()
+ {
+ if (cleanupTaskFuture == null)
+ {
+ logger.info("Scheduling expired snapshots cleanup with
initialDelaySeconds={} and cleanupPeriodSeconds={}",
+ initialDelaySeconds, cleanupPeriodSeconds);
+
+ cleanupTaskFuture =
snapshotCleanupExecutor.scheduleWithFixedDelay(SnapshotManager.instance::clearExpiredSnapshots,
+
initialDelaySeconds,
+
cleanupPeriodSeconds,
+
SECONDS);
+ }
+ }
+
+ private void pauseSnapshotCleanup()
{
- expiringSnapshots.clear();
if (cleanupTaskFuture != null)
{
cleanupTaskFuture.cancel(false);
cleanupTaskFuture = null;
}
}
- public synchronized void addSnapshot(TableSnapshot snapshot)
+ /**
+ * Deletes snapshot and removes it from manager.
+ *
+ * @param snapshot snapshot to clear
+ */
+ void clearSnapshot(TableSnapshot snapshot)
+ {
+ executeTask(new ClearSnapshotTask(this, s -> s.equals(snapshot),
true));
+ }
+
+ /**
+ * Returns list of snapshots of given keyspace
+ *
+ * @param keyspace keyspace of a snapshot
+ * @return list of snapshots of given keyspace.
+ */
+ public List<TableSnapshot> getSnapshots(String keyspace)
+ {
+ return getSnapshots(snapshot ->
snapshot.getKeyspaceName().equals(keyspace));
+ }
+
+ /**
+ * Return snapshots based on given parameters.
+ *
+ * @param skipExpiring if expiring snapshots should be skipped
+ * @param includeEphemeral if ephemeral snapshots should be included
+ * @return snapshots based on given parameters
+ */
+ public List<TableSnapshot> getSnapshots(boolean skipExpiring, boolean
includeEphemeral)
{
- // We currently only care about expiring snapshots
- if (snapshot.isExpiring())
+ return getSnapshots(s -> (!skipExpiring || !s.isExpiring()) &&
(includeEphemeral || !s.isEphemeral()));
+ }
+
+ /**
+ * Returns all snapshots passing the given predicate.
+ *
+ * @param predicate predicate to filter all snapshots of
+ * @return list of snapshots passing the predicate
+ */
+ public List<TableSnapshot> getSnapshots(Predicate<TableSnapshot> predicate)
+ {
+ return new GetSnapshotsTask(this, predicate, true).call();
+ }
+
+ /**
+ * Returns a snapshot or empty optional based on the given parameters.
+ *
+ * @param keyspace keyspace of a snapshot
+ * @param table table of a snapshot
+ * @param tag name of a snapshot
+ * @return empty optional if there is not such snapshot, non-empty
otherwise
+ */
+ public Optional<TableSnapshot> getSnapshot(String keyspace, String table,
String tag)
+ {
+ List<TableSnapshot> foundSnapshots = new GetSnapshotsTask(this,
+ snapshot ->
snapshot.getKeyspaceName().equals(keyspace) &&
+
snapshot.getTableName().equals(table) &&
+
snapshot.getTag().equals(tag) || (tag != null && tag.isEmpty()),
+ true).call();
+
+ if (foundSnapshots.isEmpty())
+ return Optional.empty();
+ else
+ return Optional.of(foundSnapshots.get(0));
+ }
+
+ /**
+ * Clear snapshots of given tag from given keyspace. Does not remove
ephemeral snapshots.
+ * <p>
+ *
+ * @param tag snapshot name
+ * @param keyspace keyspace to clear all snapshots of a given tag of
+ */
+ public void clearSnapshots(String tag, String keyspace)
+ {
+ clearSnapshots(tag, Set.of(keyspace),
Clock.Global.currentTimeMillis());
+ }
+
+ /**
+ * Removes a snapshot.
+ * <p>
+ *
+ * @param keyspace keyspace of a snapshot to remove
+ * @param table table of a snapshot to remove
+ * @param tag name of a snapshot to remove.
+ */
+ public void clearSnapshot(String keyspace, String table, String tag)
+ {
+ executeTask(new ClearSnapshotTask(this,
+ snapshot ->
snapshot.getKeyspaceName().equals(keyspace)
+ &&
snapshot.getTableName().equals(table)
+ &&
snapshot.getTag().equals(tag),
+ true));
+ }
+
+ /**
+ * Removes all snapshots for given keyspace and table.
+ *
+ * @param keyspace keyspace to remove snapshots for
+ * @param table table in a given keyspace to remove snapshots for
+ */
+ public void clearAllSnapshots(String keyspace, String table)
+ {
+ executeTask(new ClearSnapshotTask(this,
+ snapshot ->
snapshot.getKeyspaceName().equals(keyspace)
+ &&
snapshot.getTableName().equals(table),
+ true));
+ }
+
+ /**
+ * Clears all snapshots, expiring and ephemeral as well.
+ */
+ public void clearAllSnapshots()
+ {
+ executeTask(new ClearSnapshotTask(this, snapshot -> true, true));
+ }
+
+ /**
+ * Clear snapshots based on a given predicate
+ *
+ * @param predicate predicate to filter snapshots on
+ */
+ public void clearSnapshot(Predicate<TableSnapshot> predicate)
+ {
+ executeTask(new ClearSnapshotTask(this, predicate, true));
+ }
+
+ /**
+ * Clears all ephemeral snapshots in a node.
+ */
+ public void clearEphemeralSnapshots()
+ {
+ executeTask(new ClearSnapshotTask(this, TableSnapshot::isEphemeral,
true));
+ }
+
+ /**
+ * Clears all expired snapshots in a node.
+ */
+ public void clearExpiredSnapshots()
+ {
+ Instant now = FBUtilities.now();
+ executeTask(new ClearSnapshotTask(this, s -> s.isExpired(now), true));
+ }
+
+ /**
+ * Clear snapshots of given tag from given keyspaces.
+ * <p>
+ * If tag is not present / is empty, all snapshots are considered to be
cleared.
+ * If keyspaces are empty, all snapshots of given tag and older than
maxCreatedAt are removed.
+ *
+ * @param tag optional tag of snapshot to clear
+ * @param keyspaces keyspaces to remove snapshots for
+ * @param maxCreatedAt clear all such snapshots which were created before
this timestamp
+ */
+ private void clearSnapshots(String tag, Set<String> keyspaces, long
maxCreatedAt)
+ {
+ executeTask(new ClearSnapshotTask(this, getClearSnapshotPredicate(tag,
keyspaces, maxCreatedAt, false), true));
+ }
+
+ public List<TableSnapshot> takeSnapshot(SnapshotOptions options)
+ {
+ return executeTask(new TakeSnapshotTask(this, options));
+ }
+
+ // Super methods
+
+ @Override
+ public void takeSnapshot(String tag, String... entities)
+ {
+ takeSnapshot(SnapshotOptions.userSnapshot(tag, Map.of(), entities));
+ }
+
+ @Override
+ public void takeSnapshot(String tag, Map<String, String> optMap, String...
entities) throws IOException
+ {
+ try
+ {
+ takeSnapshot(SnapshotOptions.userSnapshot(tag, optMap, entities));
+ }
+ catch (SnapshotException ex)
{
- logger.debug("Adding expiring snapshot {}", snapshot);
- expiringSnapshots.add(snapshot);
+ // to be compatible with deprecated methods in StorageService
+ throw new IOException(ex);
}
}
- public synchronized Set<TableSnapshot> loadSnapshots(String keyspace)
+ @Override
+ public void clearSnapshot(String tag, Map<String, Object> options,
String... keyspaceNames)
{
- return snapshotLoader.loadSnapshots(keyspace);
+ executeTask(new ClearSnapshotTask(this,
getPredicateForCleanedSnapshots(tag, options, keyspaceNames), true));
}
- public synchronized Set<TableSnapshot> loadSnapshots()
+ @Override
+ public Map<String, TabularData> listSnapshots(Map<String, String> options)
{
- return snapshotLoader.loadSnapshots();
+ return new ListSnapshotsTask(this, options, true).call();
}
- @VisibleForTesting
- protected synchronized void addSnapshots(Collection<TableSnapshot>
snapshots)
+ @Override
+ public long getTrueSnapshotSize()
{
- logger.debug("Adding snapshots: {}.", Joiner.on(",
").join(snapshots.stream().map(TableSnapshot::getId).collect(toList())));
- snapshots.forEach(this::addSnapshot);
+ return new TrueSnapshotSizeTask(this, s -> true).call();
}
- // TODO: Support pausing snapshot cleanup
- @VisibleForTesting
- synchronized void resumeSnapshotCleanup()
+ @Override
+ public long getTrueSnapshotsSize(String keyspace)
{
- if (cleanupTaskFuture == null)
+ return new TrueSnapshotSizeTask(this, s ->
s.getKeyspaceName().equals(keyspace)).call();
+ }
+
+ @Override
+ public long getTrueSnapshotsSize(String keyspace, String table)
+ {
+ return new TrueSnapshotSizeTask(this, s ->
s.getKeyspaceName().equals(keyspace) && s.getTableName().equals(table)).call();
+ }
+
+ @Override
+ public void setSnapshotLinksPerSecond(long throttle)
+ {
+ logger.info("Setting snapshot throttle to {}", throttle);
+ DatabaseDescriptor.setSnapshotLinksPerSecond(throttle);
+ }
+
+ @Override
+ public long getSnapshotLinksPerSecond()
+ {
+ return DatabaseDescriptor.getSnapshotLinksPerSecond();
+ }
+
+ @Override
+ public void handleNotification(INotification notification, Object sender)
+ {
+ if (notification instanceof TruncationNotification)
{
- logger.info("Scheduling expired snapshot cleanup with
initialDelaySeconds={} and cleanupPeriodSeconds={}",
- initialDelaySeconds, cleanupPeriodSeconds);
- cleanupTaskFuture =
executor.scheduleWithFixedDelay(this::clearExpiredSnapshots,
initialDelaySeconds,
-
cleanupPeriodSeconds, TimeUnit.SECONDS);
+ TruncationNotification truncationNotification =
(TruncationNotification) notification;
+ ColumnFamilyStore cfs = truncationNotification.cfs;
+ if (!truncationNotification.disableSnapshot &&
cfs.isAutoSnapshotEnabled())
+ {
+ SnapshotOptions opts =
SnapshotOptions.systemSnapshot(cfs.name, SnapshotType.TRUNCATE,
cfs.getKeyspaceTableName())
Review Comment:
@frankgh
Actually, no ... it is good as it is, for dropping we need to add it, but
for truncation we do not.
The reason we need it in case of dropping a table is that if we have not
passed it there like that, then later on when we are getting CFS in
`TakeSnapshotTask`, that CFS would not be found. Look into `dropCf` method in
`Keyspace`. There we are removing cfs from `columnFamilyStores` by `tableId`.
But later on, when we get CFS in `TakeSnapshotTask`, we would try to get that
from the very same map - which would return null to us. So just for dropping
case, we pass just removed cfs into `TableDroppedNotification`.
On the other hand, I think we do not need it in `TablePreScrubNotification`
case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]