Repository: cassandra Updated Branches: refs/heads/trunk 6b0247576 -> d14a9266c
Automatic sstable upgrades Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14197 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d14a9266 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d14a9266 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d14a9266 Branch: refs/heads/trunk Commit: d14a9266c7ddff0589fdbe7a1836217b8bb8b394 Parents: 6b02475 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Mar 15 09:25:23 2018 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed May 9 08:17:33 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + conf/cassandra.yaml | 6 + .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 35 +++++ .../db/compaction/CompactionManager.java | 76 ++++++++++- .../db/compaction/CompactionManagerMBean.java | 21 +++ .../compaction/CompactionStrategyManager.java | 35 ++++- .../apache/cassandra/metrics/TableMetrics.java | 13 ++ .../org/apache/cassandra/tools/NodeProbe.java | 1 + .../tools/nodetool/stats/StatsTable.java | 1 + .../tools/nodetool/stats/TableStatsHolder.java | 1 + .../tools/nodetool/stats/TableStatsPrinter.java | 1 + .../CompactionStrategyManagerTest.java | 131 ++++++++++++++++++- .../cassandra/io/sstable/LegacySSTableTest.java | 37 ++++++ .../nodetool/stats/TableStatsPrinterTest.java | 6 + .../nodetool/stats/TableStatsTestBase.java | 1 + 17 files changed, 362 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 25c237f..cad0e28 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Automatic sstable upgrades (CASSANDRA-14197) * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431) * cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426) * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index a13f633..4885a12 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool. New features ------------ + - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable + either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See + CASSANDRA-14197. - `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719 for details - An experimental option to compare all merkle trees together has been added - for example, in http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 7e4b2c2..7cc9e32 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1178,3 +1178,9 @@ back_pressure_strategy: # The full query log will recrusively delete the contents of this path at # times. Don't place links in this directory to other parts of the filesystem. #full_query_log_dir: /tmp/cassandrafullquerylog + +# Automatically upgrade sstables after upgrade - if there is no ordinary compaction to do, the +# oldest non-upgraded sstable will get upgraded to the latest version +# automatic_sstable_upgrade: false +# Limit the number of concurrent sstable upgrades +# max_concurrent_automatic_sstable_upgrades: 1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index aa4b028..2c28796 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -377,6 +377,8 @@ public class Config // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive public int block_for_peers_percentage = 70; public int block_for_peers_timeout_in_secs = 10; + public volatile boolean automatic_sstable_upgrade = false; + public volatile int max_concurrent_automatic_sstable_upgrades = 1; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c738971..6b11974 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -748,6 +748,8 @@ public class DatabaseDescriptor if (conf.otc_coalescing_enough_coalesced_messages <= 0) throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false); + + validateMaxConcurrentAutoUpgradeTasksConf(conf.max_concurrent_automatic_sstable_upgrades); } private static String storagedirFor(String type) @@ -2544,4 +2546,37 @@ public class DatabaseDescriptor { return conf.block_for_peers_timeout_in_secs; } + + public static boolean automaticSSTableUpgrade() + { + return conf.automatic_sstable_upgrade; + } + + public static void setAutomaticSSTableUpgradeEnabled(boolean enabled) + { + if (conf.automatic_sstable_upgrade != enabled) + logger.debug("Changing automatic_sstable_upgrade to {}", enabled); + conf.automatic_sstable_upgrade = enabled; + } + + public static int maxConcurrentAutoUpgradeTasks() + { + return conf.max_concurrent_automatic_sstable_upgrades; + } + + public static void setMaxConcurrentAutoUpgradeTasks(int value) + { + if (conf.max_concurrent_automatic_sstable_upgrades != value) + logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value); + validateMaxConcurrentAutoUpgradeTasksConf(value); + conf.max_concurrent_automatic_sstable_upgrades = value; + } + + private static void validateMaxConcurrentAutoUpgradeTasksConf(int value) + { + if (value < 0) + throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative"); + if (value > getConcurrentCompactors()) + logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 831d8ca..5602aab 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongPredicate; import java.util.stream.Collectors; import javax.management.MBeanServer; @@ -41,6 +42,7 @@ import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.repair.ValidationPartitionIterator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -94,6 +96,9 @@ public class CompactionManager implements CompactionManagerMBean private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class); public static final CompactionManager instance; + @VisibleForTesting + public final AtomicInteger currentlyBackgroundUpgrading = new AtomicInteger(0); + public static final int NO_GC = Integer.MIN_VALUE; public static final int GC_ALL = Integer.MAX_VALUE; @@ -256,6 +261,7 @@ public class CompactionManager implements CompactionManagerMBean // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables // are created between task submission and execution, we execute against the most up-to-date information + @VisibleForTesting class BackgroundCompactionCandidate implements Runnable { private final ColumnFamilyStore cfs; @@ -268,6 +274,7 @@ public class CompactionManager implements CompactionManagerMBean public void run() { + boolean ranCompaction = false; try { logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name); @@ -281,19 +288,53 @@ public class CompactionManager implements CompactionManagerMBean AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds())); if (task == null) { - logger.trace("No tasks available"); - return; + if (DatabaseDescriptor.automaticSSTableUpgrade()) + ranCompaction = maybeRunUpgradeTask(strategy); + } + else + { + task.execute(metrics); + ranCompaction = true; } - task.execute(metrics); } finally { compactingCF.remove(cfs); } - submitBackground(cfs); + if (ranCompaction) // only submit background if we actually ran a compaction - otherwise we end up in an infinite loop submitting noop background tasks + submitBackground(cfs); + } + + boolean maybeRunUpgradeTask(CompactionStrategyManager strategy) + { + logger.debug("Checking for upgrade tasks {}.{}", cfs.keyspace.getName(), cfs.getTableName()); + try + { + if (currentlyBackgroundUpgrading.incrementAndGet() <= DatabaseDescriptor.maxConcurrentAutoUpgradeTasks()) + { + AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask(); + if (upgradeTask != null) + { + upgradeTask.execute(metrics); + return true; + } + } + } + finally + { + currentlyBackgroundUpgrading.decrementAndGet(); + } + logger.trace("No tasks available"); + return false; } } + @VisibleForTesting + public BackgroundCompactionCandidate getBackgroundCompactionCandidate(ColumnFamilyStore cfs) + { + return new BackgroundCompactionCandidate(cfs); + } + /** * Run an operation over all sstables using jobs threads * @@ -1834,6 +1875,33 @@ public class CompactionManager implements CompactionManagerMBean viewBuildExecutor.setMaximumPoolSize(number); } + public boolean getAutomaticSSTableUpgradeEnabled() + { + return DatabaseDescriptor.automaticSSTableUpgrade(); + } + + public void setAutomaticSSTableUpgradeEnabled(boolean enabled) + { + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(enabled); + } + + public int getMaxConcurrentAutoUpgradeTasks() + { + return DatabaseDescriptor.maxConcurrentAutoUpgradeTasks(); + } + + public void setMaxConcurrentAutoUpgradeTasks(int value) + { + try + { + DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(value); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e.getMessage()); + } + } + /** * Try to stop all of the compactions for given ColumnFamilies. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java index b98b371..e4d5392 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java @@ -138,4 +138,25 @@ public interface CompactionManagerMBean * @param number New maximum of view build threads */ public void setMaximumViewBuildThreads(int number); + + /** + * Get automatic sstable upgrade enabled + */ + public boolean getAutomaticSSTableUpgradeEnabled(); + /** + * Set if automatic sstable upgrade should be enabled + */ + public void setAutomaticSSTableUpgradeEnabled(boolean enabled); + + /** + * Get the number of concurrent sstable upgrade tasks we should run + * when automatic sstable upgrades are enabled + */ + public int getMaxConcurrentAutoUpgradeTasks(); + + /** + * Set the number of concurrent sstable upgrade tasks we should run + * when automatic sstable upgrades are enabled + */ + public void setMaxConcurrentAutoUpgradeTasks(int value); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 61f0630..c77ed92 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; +import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.Callable; @@ -29,7 +30,9 @@ import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DiskBoundaries; import org.apache.cassandra.index.Index; @@ -38,12 +41,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -202,6 +205,36 @@ public class CompactionStrategyManager implements INotificationConsumer } } + /** + * finds the oldest (by modification date) non-latest-version sstable on disk and creates an upgrade task for it + * @return + */ + @VisibleForTesting + AbstractCompactionTask findUpgradeSSTableTask() + { + if (!isEnabled() || !DatabaseDescriptor.automaticSSTableUpgrade()) + return null; + Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); + List<SSTableReader> potentialUpgrade = cfs.getLiveSSTables() + .stream() + .filter(s -> !compacting.contains(s) && !s.descriptor.version.isLatestVersion()) + .sorted((o1, o2) -> { + File f1 = new File(o1.descriptor.filenameFor(Component.DATA)); + File f2 = new File(o2.descriptor.filenameFor(Component.DATA)); + return Longs.compare(f1.lastModified(), f2.lastModified()); + }).collect(Collectors.toList()); + for (SSTableReader sstable : potentialUpgrade) + { + LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.UPGRADE_SSTABLES); + if (txn != null) + { + logger.debug("Running automatic sstable upgrade for {}", sstable); + return getCompactionStrategyFor(sstable).getCompactionTask(txn, Integer.MIN_VALUE, Long.MAX_VALUE); + } + } + return null; + } + public boolean isEnabled() { return enabled && isActive; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index d8cb18e..c0e7f7b 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -95,6 +95,8 @@ public class TableMetrics public final Gauge<Integer> pendingCompactions; /** Number of SSTables on disk for this CF */ public final Gauge<Integer> liveSSTableCount; + /** Number of SSTables with old version on disk for this CF */ + public final Gauge<Integer> oldVersionSSTableCount; /** Disk space used by SSTables belonging to this table */ public final Counter liveDiskSpaceUsed; /** Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd */ @@ -541,6 +543,17 @@ public class TableMetrics return cfs.getTracker().getView().liveSSTables().size(); } }); + oldVersionSSTableCount = createTableGauge("OldVersionSSTableCount", new Gauge<Integer>() + { + public Integer getValue() + { + int count = 0; + for (SSTableReader sstable : cfs.getLiveSSTables()) + if (!sstable.descriptor.version.isLatestVersion()) + count++; + return count; + } + }); liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed"); totalDiskSpaceUsed = createTableCounter("TotalDiskSpaceUsed"); minPartitionSize = createTableGauge("MinPartitionSize", "MinRowSize", new Gauge<Long>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 4fdb563..e221e11 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1399,6 +1399,7 @@ public class NodeProbe implements AutoCloseable case "EstimatedPartitionCount": case "KeyCacheHitRate": case "LiveSSTableCount": + case "OldVersionSSTableCount": case "MaxPartitionSize": case "MeanPartitionSize": case "MemtableColumnsCount": http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java index 908c856..01d2164 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java @@ -29,6 +29,7 @@ public class StatsTable public boolean isIndex; public boolean isLeveledSstable = false; public Object sstableCount; + public Object oldSSTableCount; public String spaceUsedLive; public String spaceUsedTotal; public String spaceUsedBySnapshotsTotal; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java index 79531c1..624484f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java @@ -209,6 +209,7 @@ public class TableStatsHolder implements StatsHolder statsTable.tableName = tableName; statsTable.isIndex = tableName.contains("."); statsTable.sstableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "LiveSSTableCount"); + statsTable.oldSSTableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "OldVersionSSTableCount"); int[] leveledSStables = table.getSSTableCountPerLevel(); if (leveledSStables != null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java index 4ea7562..b166803 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java @@ -75,6 +75,7 @@ public class TableStatsPrinter { out.println(indent + "Table" + (table.isIndex ? " (index): " + table.tableName : ": ") + tableDisplayName); out.println(indent + "SSTable count: " + table.sstableCount); + out.println(indent + "Old SSTable count: " + table.oldSSTableCount); if (table.isLeveledSstable) out.println(indent + "SSTables in each level: [" + String.join(", ", table.sstablesInEachLevel) + "]"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java index c315fb9..6f2551d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import com.google.common.collect.Sets; @@ -76,6 +78,10 @@ public class CompactionStrategyManagerTest * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)} */ originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + SchemaLoader.createKeyspace(KS_PREFIX, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX) + .compaction(CompactionParams.scts(Collections.emptyMap()))); } @AfterClass @@ -90,10 +96,6 @@ public class CompactionStrategyManagerTest { // Creates 100 SSTables with keys 0-99 int numSSTables = 100; - SchemaLoader.createKeyspace(KS_PREFIX, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX) - .compaction(CompactionParams.scts(Collections.emptyMap()))); ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); cfs.disableAutoCompaction(); Set<SSTableReader> previousSSTables = cfs.getLiveSSTables(); @@ -177,6 +179,81 @@ public class CompactionStrategyManagerTest } } + + + @Test + public void testAutomaticUpgradeConcurrency() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); + DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1); + + // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask + // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make + // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger upgradeTaskCount = new AtomicInteger(0); + MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); + + CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); + CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); + // basic idea is that we start a thread which will be able to get in to the currentlyBackgroundUpgrading-guarded + // code in CompactionManager, then we try to run a bunch more of the upgrade tasks which should return false + // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks + Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr)); + t.start(); + Thread.sleep(100); // let the thread start and grab the task + assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get()); + assertFalse(r.maybeRunUpgradeTask(mgr)); + assertFalse(r.maybeRunUpgradeTask(mgr)); + latch.countDown(); + t.join(); + assertEquals(1, upgradeTaskCount.get()); // we should only call findUpgradeSSTableTask once when concurrency = 1 + assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get()); + + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); + } + + @Test + public void testAutomaticUpgradeConcurrency2() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); + DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(2); + // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask + // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make + // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger upgradeTaskCount = new AtomicInteger(); + MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); + + CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); + CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); + + // basic idea is that we start 2 threads who will be able to get in to the currentlyBackgroundUpgrading-guarded + // code in CompactionManager, then we try to run a bunch more of the upgrade task which should return false + // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks + Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr)); + t.start(); + Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr)); + t2.start(); + Thread.sleep(100); // let the threads start and grab the task + assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get()); + assertFalse(r.maybeRunUpgradeTask(mgr)); + assertFalse(r.maybeRunUpgradeTask(mgr)); + assertFalse(r.maybeRunUpgradeTask(mgr)); + assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get()); + latch.countDown(); + t.join(); + t2.join(); + assertEquals(2, upgradeTaskCount.get()); + assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get()); + + DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1); + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); + } + + private MockCFS createJBODMockCFS(int disks) { // Create #disks data directories to simulate JBOD @@ -306,4 +383,50 @@ public class CompactionStrategyManagerTest super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); } } + + private static class MockCFSForCSM extends ColumnFamilyStore + { + private final CountDownLatch latch; + private final AtomicInteger upgradeTaskCount; + + private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) + { + super(cfs.keyspace, cfs.name, 10, cfs.metadata, cfs.getDirectories(), true, false, false); + this.latch = latch; + this.upgradeTaskCount = upgradeTaskCount; + } + @Override + public CompactionStrategyManager getCompactionStrategyManager() + { + return new MockCSM(this, latch, upgradeTaskCount); + } + } + + private static class MockCSM extends CompactionStrategyManager + { + private final CountDownLatch latch; + private final AtomicInteger upgradeTaskCount; + + private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) + { + super(cfs); + this.latch = latch; + this.upgradeTaskCount = upgradeTaskCount; + } + + @Override + public AbstractCompactionTask findUpgradeSSTableTask() + { + try + { + latch.await(); + upgradeTaskCount.incrementAndGet(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 8dd8197..13d3eac 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -42,6 +42,8 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.AbstractCompactionTask; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Verifier; import org.apache.cassandra.db.streaming.CassandraOutgoingFile; import org.apache.cassandra.dht.IPartitioner; @@ -61,6 +63,8 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -234,6 +238,39 @@ public class LegacySSTableTest } } + @Test + public void testAutomaticUpgrade() throws Exception + { + for (String legacyVersion : legacyVersions) + { + logger.info("Loading legacy version: {}", legacyVersion); + truncateLegacyTables(legacyVersion); + loadLegacyTables(legacyVersion); + ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); + AbstractCompactionTask act = cfs.getCompactionStrategyManager().getNextBackgroundTask(0); + // there should be no compactions to run with auto upgrades disabled: + assertEquals(null, act); + } + + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); + for (String legacyVersion : legacyVersions) + { + logger.info("Loading legacy version: {}", legacyVersion); + truncateLegacyTables(legacyVersion); + loadLegacyTables(legacyVersion); + ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); + if (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion())) + assertTrue(cfs.metric.oldVersionSSTableCount.getValue() > 0); + while (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion())) + { + CompactionManager.instance.submitBackground(cfs); + Thread.sleep(100); + } + assertTrue(cfs.metric.oldVersionSSTableCount.getValue() == 0); + } + DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); + } + private void streamLegacyTables(String legacyVersion) throws Exception { logger.info("Streaming legacy version {}", legacyVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java index 26b2ff6..32b0b62 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java @@ -35,6 +35,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable1Output = "\tTable: %s\n" + "\tSSTable count: 60000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 0\n" + "\tSpace used (total): 9001\n" + "\tSpace used by snapshots (total): 1111\n" + @@ -65,6 +66,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable2Output = "\tTable: %s\n" + "\tSSTable count: 3000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 22\n" + "\tSpace used (total): 1024\n" + "\tSpace used by snapshots (total): 222\n" + @@ -100,6 +102,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable3Output = "\tTable: %s\n" + "\tSSTable count: 50000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 0\n" + "\tSpace used (total): 512\n" + "\tSpace used by snapshots (total): 0\n" + @@ -130,6 +133,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable4Output = "\tTable: %s\n" + "\tSSTable count: 2000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 4444\n" + "\tSpace used (total): 256\n" + "\tSpace used by snapshots (total): 44\n" + @@ -165,6 +169,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable5Output = "\tTable: %s\n" + "\tSSTable count: 40000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 55555\n" + "\tSpace used (total): 64\n" + "\tSpace used by snapshots (total): 55555\n" + @@ -195,6 +200,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase public static final String expectedDefaultTable6Output = "\tTable: %s\n" + "\tSSTable count: 1000\n" + + "\tOld SSTable count: 0\n" + "\tSpace used (live): 666666\n" + "\tSpace used (total): 0\n" + "\tSpace used by snapshots (total): 0\n" + http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java index bb56ef8..b2f1663 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java @@ -70,6 +70,7 @@ public class TableStatsTestBase template.tableName = new String(tableName); template.isIndex = false; template.sstableCount = 0L; + template.oldSSTableCount = 0L; template.spaceUsedLive = "0"; template.spaceUsedTotal = "0"; template.spaceUsedBySnapshotsTotal = "0"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org