Updated Branches: refs/heads/cassandra-1.1.0 3136c2092 -> b12c34f30
merge from 1.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b12c34f3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b12c34f3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b12c34f3 Branch: refs/heads/cassandra-1.1.0 Commit: b12c34f309cba15fb0d4187461a7065121f38e7b Parents: 3136c20 fbdf7b0 Author: Pavel Yaskevich <xe...@apache.org> Authored: Thu Mar 22 16:26:11 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Thu Mar 22 16:45:57 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Directories.java | 32 ++++++++++++--- .../cassandra/db/compaction/CompactionTask.java | 32 ++++++++------- 3 files changed, 44 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b12c34f3/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 70db8e5,c1e1cfe..c770868 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -38,96 -10,9 +38,97 @@@ Merged from 1.0 * don't change manifest level for cleanup, scrub, and upgradesstables operations under LeveledCompactionStrategy (CASSANDRA-3989) * fix race leading to super columns assertion failure (CASSANDRA-3957) + * ensure that directory is selected for compaction (CASSANDRA-3985) +1.1-beta1 + * (cqlsh) + + add SOURCE and CAPTURE commands, and --file option (CASSANDRA-3479) + + add ALTER COLUMNFAMILY WITH (CASSANDRA-3523) + + bundle Python dependencies with Cassandra (CASSANDRA-3507) + + added to Debian package (CASSANDRA-3458) + + display byte data instead of erroring out on decode failure + (CASSANDRA-3874) + * add nodetool rebuild_index (CASSANDRA-3583) + * add nodetool rangekeysample (CASSANDRA-2917) + * Fix streaming too much data during move operations (CASSANDRA-3639) + * Nodetool and CLI connect to localhost by default (CASSANDRA-3568) + * Reduce memory used by primary index sample (CASSANDRA-3743) + * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765) + * avoid returning internal Cassandra classes over JMX (CASSANDRA-2805) + * add row-level isolation via SnapTree (CASSANDRA-2893) + * Optimize key count estimation when opening sstable on startup + (CASSANDRA-2988) + * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577) + * add command to stop compactions (CASSANDRA-1740, 3566, 3582) + * multithreaded streaming (CASSANDRA-3494) + * removed in-tree redhat spec (CASSANDRA-3567) + * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503) + * Recycle commitlog segments for improved performance + (CASSANDRA-3411, 3543, 3557, 3615) + * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407) + * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005) + * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271) + * EACH_QUORUM is only supported for writes (CASSANDRA-3272) + * replace compactionlock use in schema migration by checking CFS.isValid + (CASSANDRA-3116) + * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445) + * Use faster bytes comparison (CASSANDRA-3434) + * Bulk loader is no longer a fat client, (HADOOP) bulk load output format + (CASSANDRA-3045) + * (Hadoop) add support for KeyRange.filter + * remove assumption that keys and token are in bijection + (CASSANDRA-1034, 3574, 3604) + * always remove endpoints from delevery queue in HH (CASSANDRA-3546) + * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547) + * fix potential race in AES when a repair fails (CASSANDRA-3548) + * Remove columns shadowed by a deleted container even when we cannot purge + (CASSANDRA-3538) + * Improve memtable slice iteration performance (CASSANDRA-3545) + * more efficient allocation of small bloom filters (CASSANDRA-3618) + * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619) + * fsync the directory after new sstable or commitlog segment are created (CASSANDRA-3250) + * fix minor issues reported by FindBugs (CASSANDRA-3658) + * global key/row caches (CASSANDRA-3143, 3849) + * optimize memtable iteration during range scan (CASSANDRA-3638) + * introduce 'crc_check_chance' in CompressionParameters to support + a checksum percentage checking chance similarly to read-repair (CASSANDRA-3611) + * a way to deactivate global key/row cache on per-CF basis (CASSANDRA-3667) + * fix LeveledCompactionStrategy broken because of generation pre-allocation + in LeveledManifest (CASSANDRA-3691) + * finer-grained control over data directories (CASSANDRA-2749) + * Fix ClassCastException during hinted handoff (CASSANDRA-3694) + * Upgrade Thrift to 0.7 (CASSANDRA-3213) + * Make stress.java insert operation to use microseconds (CASSANDRA-3725) + * Allows (internally) doing a range query with a limit of columns instead of + rows (CASSANDRA-3742) + * Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749) + * Fix BulkLoader to support new SSTable layout and add stream + throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752) + * Allow concurrent schema migrations (CASSANDRA-1391, 3832) + * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721) + * Make CFMetaData conversions to/from thrift/native schema inverses + (CASSANDRA_3559) + * Add initial code for CQL 3.0-beta (CASSANDRA-3781, 3753) + * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264) + * Allow extending CompositeType comparator (CASSANDRA-3657) + * Avoids over-paging during get_count (CASSANDRA-3798) + * Add new command to rebuild a node without (repair) merkle tree calculations + (CASSANDRA-3483, 3922) + * respect not only row cache capacity but caching mode when + trying to read data (CASSANDRA-3812) + * fix system tests (CASSANDRA-3827) + * CQL support for altering row key type in ALTER TABLE (CASSANDRA-3781) + * turn compression on by default (CASSANDRA-3871) + * make hexToBytes refuse invalid input (CASSANDRA-2851) + * Make secondary indexes CF inherit compression and compaction from their + parent CF (CASSANDRA-3877) + * Finish cleanup up tombstone purge code (CASSANDRA-3872) + * Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904) + * BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906) + * Support compression using BulkWriter (CASSANDRA-3907) + + 1.0.8 * fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712) * avoid including non-queried nodes in rangeslice read repair http://git-wip-us.apache.org/repos/asf/cassandra/blob/b12c34f3/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Directories.java index 7f383ff,0000000..d2c98c8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -1,532 -1,0 +1,552 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOError; +import java.io.IOException; +import java.util.*; + +import org.apache.commons.lang.StringUtils; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.compaction.LeveledManifest; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.MmappedSegmentedFile; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.Pair; + +/** + * Encapsulate handling of paths to the data files. + * + * The directory layout is the following: + * /<path_to_data_dir>/ks/cf1/ks-cf1-hb-1-Data.db + * /cf2/ks-cf2-hb-1-Data.db + * ... + * + * In addition, more that one 'root' data directory can be specified so that + * <path_to_data_dir> potentially represents multiple locations. + * Note that in the case of multiple locations, the manifest for the leveled + * compaction is only in one of the location. + * + * Snapshots (resp. backups) are always created along the sstables thare are + * snapshoted (resp. backuped) but inside a subdirectory named 'snapshots' + * (resp. backups) (and snapshots are furter inside a subdirectory of the name + * of the snapshot). + * + * This class abstracts all those details from the rest of the code. + */ +public class Directories +{ + private static Logger logger = LoggerFactory.getLogger(Directories.class); + + public static final String BACKUPS_SUBDIR = "backups"; + public static final String SNAPSHOT_SUBDIR = "snapshots"; + public static final char SECONDARY_INDEX_NAME_SEPARATOR = '.'; + + public static final File[] dataFileLocations; + static + { + String[] locations = DatabaseDescriptor.getAllDataFileLocations(); + dataFileLocations = new File[locations.length]; + for (int i = 0; i < locations.length; ++i) + dataFileLocations[i] = new File(locations[i]); + } + + private final String tablename; + private final String cfname; + private final File[] sstableDirectories; + + public static Directories create(String tablename, String cfname) + { + int idx = cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); + if (idx > 0) + // secondary index, goes in the same directory than the base cf + return new Directories(tablename, cfname, cfname.substring(0, idx)); + else + return new Directories(tablename, cfname, cfname); + } + + private Directories(String tablename, String cfname, String directoryName) + { + this.tablename = tablename; + this.cfname = cfname; + this.sstableDirectories = new File[dataFileLocations.length]; + for (int i = 0; i < dataFileLocations.length; ++i) + sstableDirectories[i] = new File(dataFileLocations[i], join(tablename, directoryName)); + + if (!StorageService.instance.isClientMode()) + { + try + { + for (File dir : sstableDirectories) + FileUtils.createDirectory(dir); + } + catch (IOException e) + { + throw new IOError(e); + } + } + } + + public File getDirectoryForNewSSTables(long estimatedSize) + { - File path = getLocationWithMaximumAvailableSpace(estimatedSize); ++ return getDirectoryForNewSSTables(estimatedSize, true); ++ } ++ ++ public File getDirectoryForNewSSTables(long estimatedSize, boolean ensureFreeSpace) ++ { ++ File path = getLocationWithMaximumAvailableSpace(estimatedSize, ensureFreeSpace); + // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm + if (path == null + && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap) + && !MmappedSegmentedFile.isCleanerAvailable()) + { + StorageService.instance.requestGC(); + // retry after GCing has forced unmap of compacted SSTables so they can be deleted + // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far + SSTableDeletingTask.rescheduleFailedTasks(); + try + { + Thread.sleep(10000); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } - path = getLocationWithMaximumAvailableSpace(estimatedSize); ++ path = getLocationWithMaximumAvailableSpace(estimatedSize, ensureFreeSpace); + } + return path; + } + + /* + * Loop through all the disks to see which disk has the max free space + * return the disk with max free space for compactions. If the size of the expected + * compacted file is greater than the max disk space available return null, we cannot + * do compaction in this case. + */ - public File getLocationWithMaximumAvailableSpace(long estimatedSize) ++ public File getLocationWithMaximumAvailableSpace(long estimatedSize, boolean ensureFreeSpace) + { + long maxFreeDisk = 0; + File maxLocation = null; + + for (File dir : sstableDirectories) + { + if (maxFreeDisk < dir.getUsableSpace()) + { + maxFreeDisk = dir.getUsableSpace(); + maxLocation = dir; + } + } - logger.debug(String.format("expected data files size is %d; largest free partition (%s) has %d bytes free", estimatedSize, maxLocation, maxFreeDisk)); ++ logger.debug(String.format("expected data files size is %d; largest free partition (%s) has %d bytes free", ++ estimatedSize, ++ maxLocation, ++ maxFreeDisk)); + + // Load factor of 0.9 we do not want to use the entire disk that is too risky. - maxFreeDisk = (long)(0.9 * maxFreeDisk); - return estimatedSize < maxFreeDisk ? maxLocation : null; ++ maxFreeDisk = (long) (0.9 * maxFreeDisk); ++ ++ if (!ensureFreeSpace || estimatedSize < maxFreeDisk) ++ { ++ if (estimatedSize >= maxFreeDisk) ++ logger.warn(String.format("Data file location %s only has %d free, estimated size is %d", ++ maxLocation, ++ maxFreeDisk, ++ estimatedSize)); ++ ++ return maxLocation; ++ } ++ ++ return null; + } + + public static File getSnapshotDirectory(Descriptor desc, String snapshotName) + { + return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName); + } + + public static File getBackupsDirectory(Descriptor desc) + { + return getOrCreate(desc.directory, BACKUPS_SUBDIR); + } + + public SSTableLister sstableLister() + { + return new SSTableLister(); + } + + public class SSTableLister + { + private boolean skipCompacted; + private boolean skipTemporary; + private boolean includeBackups; + private boolean onlyBackups; + private int nbFiles; + private final Map<Descriptor, Set<Component>> components = new HashMap<Descriptor, Set<Component>>(); + private boolean filtered; + + public SSTableLister skipCompacted(boolean b) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + skipCompacted = b; + return this; + } + + public SSTableLister skipTemporary(boolean b) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + skipTemporary = b; + return this; + } + + public SSTableLister includeBackups(boolean b) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + includeBackups = b; + return this; + } + + public SSTableLister onlyBackups(boolean b) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + onlyBackups = b; + includeBackups = b; + return this; + } + + public Map<Descriptor, Set<Component>> list() + { + filter(); + return ImmutableMap.copyOf(components); + } + + public List<File> listFiles() + { + filter(); + List<File> l = new ArrayList<File>(nbFiles); + for (Map.Entry<Descriptor, Set<Component>> entry : components.entrySet()) + { + for (Component c : entry.getValue()) + { + l.add(new File(entry.getKey().filenameFor(c))); + } + } + return l; + } + + private void filter() + { + if (filtered) + return; + + for (File location : sstableDirectories) + { + if (!onlyBackups) + location.listFiles(getFilter()); + + if (includeBackups) + new File(location, BACKUPS_SUBDIR).listFiles(getFilter()); + } + filtered = true; + } + + private FileFilter getFilter() + { + // Note: the prefix needs to include cfname + separator to distinguish between a cfs and it's secondary indexes + final String sstablePrefix = tablename + Component.separator + cfname + Component.separator; + return new FileFilter() + { + // This function always return false since accepts adds to the components map + public boolean accept(File file) + { + // we are only interested in the SSTable files that belong to the specific ColumnFamily + if (file.isDirectory() || !file.getName().startsWith(sstablePrefix)) + return false; + + Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName()); + if (pair == null) + return false; + + if (skipCompacted && new File(pair.left.filenameFor(Component.COMPACTED_MARKER)).exists()) + return false; + if (skipTemporary && pair.left.temporary) + return false; + + Set<Component> previous = components.get(pair.left); + if (previous == null) + { + previous = new HashSet<Component>(); + components.put(pair.left, previous); + } + previous.add(pair.right); + nbFiles++; + return false; + } + }; + } + } + + public File tryGetLeveledManifest() + { + for (File dir : sstableDirectories) + { + File manifestFile = new File(dir, cfname + LeveledManifest.EXTENSION); + if (manifestFile.exists()) + { + logger.debug("Found manifest at {}", manifestFile); + return manifestFile; + } + } + logger.debug("No level manifest found"); + return null; + } + + public File getOrCreateLeveledManifest() + { + File manifestFile = tryGetLeveledManifest(); + if (manifestFile == null) + manifestFile = new File(sstableDirectories[0], cfname + LeveledManifest.EXTENSION); + return manifestFile; + } + + public void snapshotLeveledManifest(String snapshotName) throws IOException + { + File manifest = tryGetLeveledManifest(); + if (manifest != null) + { + File snapshotDirectory = getOrCreate(manifest.getParentFile(), SNAPSHOT_SUBDIR, snapshotName); + CLibrary.createHardLink(manifest, new File(snapshotDirectory, manifest.getName())); + } + } + + public boolean snapshotExists(String snapshotName) + { + for (File dir : sstableDirectories) + { + File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, snapshotName)); + if (snapshotDir.exists()) + return true; + } + return false; + } + + public void clearSnapshot(String snapshotName) throws IOException + { + // If snapshotName is empty or null, we will delete the entire snapshot directory + String tag = snapshotName == null ? "" : snapshotName; + for (File dir : sstableDirectories) + { + File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, tag)); + if (snapshotDir.exists()) + { + if (logger.isDebugEnabled()) + logger.debug("Removing snapshot directory " + snapshotDir); + FileUtils.deleteRecursive(snapshotDir); + } + } + } + + private static File getOrCreate(File base, String... subdirs) + { + File dir = subdirs == null || subdirs.length == 0 ? base : new File(base, join(subdirs)); + if (dir.exists()) + { + if (!dir.isDirectory()) + throw new IOError(new IOException(String.format("Invalid directory path %s: path exists but is not a directory", dir))); + } + else if (!dir.mkdirs()) + { + throw new IOError(new IOException("Unable to create directory " + dir)); + } + return dir; + } + + private static String join(String... s) + { + return StringUtils.join(s, File.separator); + } + + /** + * To check if sstables needs migration, we look at the System directory. + * If it contains a directory for the status cf, we'll attempt a sstable + * migration. + * Note that it is mostly harmless to try a migration uselessly, except + * maybe for some wasted cpu cycles. + */ + public static boolean sstablesNeedsMigration() + { + if (StorageService.instance.isClientMode()) + return false; + + boolean hasSystemKeyspace = false; + for (File location : dataFileLocations) + { + File systemDir = new File(location, Table.SYSTEM_TABLE); + hasSystemKeyspace |= (systemDir.exists() && systemDir.isDirectory()); + File statusCFDir = new File(systemDir, SystemTable.STATUS_CF); + if (statusCFDir.exists()) + return false; + } + if (!hasSystemKeyspace) + // This is a brand new node. + return false; + + // Check whether the migration migth create too long a filename + int longestLocation = -1; + try + { + for (File loc : dataFileLocations) + longestLocation = Math.max(longestLocation, loc.getCanonicalPath().length()); + } + catch (IOException e) + { + throw new IOError(e); + } + + for (KSMetaData ksm : Schema.instance.getTableDefinitions()) + { + String ksname = ksm.name; + for (Map.Entry<String, CFMetaData> entry : ksm.cfMetaData().entrySet()) + { + String cfname = entry.getKey(); + // max path is roughly (guess-estimate) <location>/ksname/cfname/snapshots/1324314347102-somename/ksname-cfname-tmp-hb-1024-Statistics.db + if (longestLocation + (ksname.length() + cfname.length()) * 2 + 62 > 256) + throw new RuntimeException("Starting with 1.1, keyspace names and column family names must be less than 32 characters long. " + + ksname + "/" + cfname + " doesn't respect that restriction. Please rename your keyspace/column families to respect that restriction before updating."); + } + } + return true; + } + + /** + * Move sstables from the pre-#2749 layout to their new location/names. + * This involves: + * - moving each sstable to their CF specific directory + * - rename the sstable to include the keyspace in the filename + * + * Note that this also move leveled manifests, snapshots and backups. + */ + public static void migrateSSTables() + { + logger.info("Upgrade from pre-1.1 version detected: migrating sstables to new directory layout"); + + for (File location : dataFileLocations) + { + if (!location.exists() || !location.isDirectory()) + continue; + + for (File ksDir : location.listFiles()) + { + if (!ksDir.isDirectory()) + continue; + + for (File file : ksDir.listFiles()) + migrateFile(file, ksDir, null); + + migrateSnapshots(ksDir); + migrateBackups(ksDir); + } + } + } + + private static void migrateSnapshots(File ksDir) + { + File snapshotDir = new File(ksDir, SNAPSHOT_SUBDIR); + if (!snapshotDir.exists()) + return; + + for (File snapshot : snapshotDir.listFiles()) + { + if (!snapshot.isDirectory()) + continue; + + for (File f : snapshot.listFiles()) + migrateFile(f, ksDir, join(SNAPSHOT_SUBDIR, snapshot.getName())); + + if (!snapshot.delete()) + logger.info("Old snapsot directory {} not deleted by migraation as it is not empty", snapshot); + } + if (!snapshotDir.delete()) + logger.info("Old directory {} not deleted by migration as it is not empty", snapshotDir); + } + + private static void migrateBackups(File ksDir) + { + File backupDir = new File(ksDir, BACKUPS_SUBDIR); + if (!backupDir.exists()) + return; + + for (File f : backupDir.listFiles()) + migrateFile(f, ksDir, BACKUPS_SUBDIR); + + if (!backupDir.delete()) + logger.info("Old directory {} not deleted by migration as it is not empty", backupDir); + } + + private static void migrateFile(File file, File ksDir, String additionalPath) + { + try + { + if (file.isDirectory()) + return; + + String name = file.getName(); + boolean isManifest = name.endsWith(LeveledManifest.EXTENSION); + String cfname = isManifest + ? name.substring(0, name.length() - LeveledManifest.EXTENSION.length()) + : name.substring(0, name.indexOf(Component.separator)); + + int idx = cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); // idx > 0 => secondary index + String dirname = idx > 0 ? cfname.substring(0, idx) : cfname; + File destDir = getOrCreate(ksDir, dirname, additionalPath); + + File destFile = new File(destDir, isManifest ? name : ksDir.getName() + Component.separator + name); + logger.debug(String.format("[upgrade to 1.1] Moving %s to %s", file, destFile)); + FileUtils.renameWithConfirm(file, destFile); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + // Hack for tests, don't use otherwise + static void overrideDataDirectoriesForTest(String loc) + { + for (int i = 0; i < dataFileLocations.length; ++i) + dataFileLocations[i] = new File(loc); + } + + // Hack for tests, don't use otherwise + static void resetDataDirectoriesAfterTest() + { + String[] locations = DatabaseDescriptor.getAllDataFileLocations(); + for (int i = 0; i < locations.length; ++i) + dataFileLocations[i] = new File(locations[i]); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b12c34f3/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 9c19070,7f389bd..e93725c --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -75,33 -76,30 +75,30 @@@ public class CompactionTask extends Abs if (!isCompactionInteresting(toCompact)) return 0; - // If use defined, we don't want to "trust" our space estimation. If - // there isn't enough room, it's the user problem - long expectedSize = isUserDefined ? 0 : cfs.getExpectedCompactedFileSize(toCompact); - File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(expectedSize); - if (partialCompactionsAcceptable()) - if (compactionFileLocation == null) - compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact), ensureFreeSpace()); ++ File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact), ++ ensureFreeSpace()); + + if (compactionFileLocation == null && partialCompactionsAcceptable()) { // If the compaction file path is null that means we have no space left for this compaction. // Try again w/o the largest one. - if (compactionFileLocation == null) + while (compactionFileLocation == null && toCompact.size() > 1) { - while (compactionFileLocation == null && toCompact.size() > 1) - { - logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); - // Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all - // the sstables at the end. - toCompact.remove(cfs.getMaxSizeFile(toCompact)); - compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact)); - } + logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); + // Note that we have removed files that are still marked as compacting. + // This suboptimal but ok since the caller will unmark all the sstables at the end. + toCompact.remove(cfs.getMaxSizeFile(toCompact)); - compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact), - ensureFreeSpace()); ++ compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(cfs.getExpectedCompactedFileSize(toCompact), ++ ensureFreeSpace()); } - + } - if (compactionFileLocation == null) - { - logger.warn("insufficient space to compact even the two smallest files, aborting"); - return 0; - } + if (compactionFileLocation == null) + { + logger.warn("insufficient space to compact even the two smallest files, aborting"); + return 0; } + assert compactionFileLocation != null; if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);