Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/Directories.java src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75378c20 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75378c20 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75378c20 Branch: refs/heads/cassandra-2.1 Commit: 75378c204c30f5d5f679009885e2ace105793a67 Parents: 5364083 c20d415 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jan 12 18:43:47 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jan 12 18:43:47 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/Directories.java | 18 ++++++++++++ .../cassandra/db/compaction/CompactionTask.java | 29 ++++++++++++++++---- .../cassandra/io/util/DiskAwareRunnable.java | 17 +----------- 4 files changed, 43 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 55ca55d,9b20a06..f2e25c4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,56 -1,5 +1,57 @@@ -2.0.12: +2.1.3 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537) + * Fix case-sensitivity of index name on CREATE and DROP INDEX + statements (CASSANDRA-8365) + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192) + * Use the correct repairedAt value when closing writer (CASSANDRA-8570) + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512) + * Properly calculate expected write size during compaction (CASSANDRA-8532) + * Invalidate affected prepared statements when a table's columns + are altered (CASSANDRA-7910) + * Stress - user defined writes should populate sequentally (CASSANDRA-8524) + * Fix regression in SSTableRewriter causing some rows to become unreadable + during compaction (CASSANDRA-8429) + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510) + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression + is disabled (CASSANDRA-8288) + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623) + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463) + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507) + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370) + * Make sstablescrub check leveled manifest again (CASSANDRA-8432) + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458) + * Disable mmap on Windows (CASSANDRA-6993) + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253) + * Add auth support to cassandra-stress (CASSANDRA-7985) + * Fix ArrayIndexOutOfBoundsException when generating error message + for some CQL syntax errors (CASSANDRA-8455) + * Scale memtable slab allocation logarithmically (CASSANDRA-7882) + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964) + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926) + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383) + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459) + * Remove tmplink files for offline compactions (CASSANDRA-8321) + * Reduce maxHintsInProgress (CASSANDRA-8415) + * BTree updates may call provided update function twice (CASSANDRA-8018) + * Release sstable references after anticompaction (CASSANDRA-8386) + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320) + * Fix high size calculations for prepared statements (CASSANDRA-8231) + * Centralize shared executors (CASSANDRA-8055) + * Fix filtering for CONTAINS (KEY) relations on frozen collection + clustering columns when the query is restricted to a single + partition (CASSANDRA-8203) + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243) + * Add more log info if readMeter is null (CASSANDRA-8238) + * add check of the system wall clock time at startup (CASSANDRA-8305) + * Support for frozen collections (CASSANDRA-7859) + * Fix overflow on histogram computation (CASSANDRA-8028) + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291) + * Improve JBOD disk utilization (CASSANDRA-7386) + * Log failed host when preparing incremental repair (CASSANDRA-8228) + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) +Merged from 2.0: + * Check for available disk space before starting a compaction (CASSANDRA-8562) * Fix DISTINCT queries with LIMITs or paging when some partitions contain only tombstones (CASSANDRA-8490) * Introduce background cache refreshing to permissions cache http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Directories.java index eb33bd8,8fd1762..6af3082 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -342,6 -308,25 +342,24 @@@ public class Directorie Collections.sort(candidates); } + public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize) + { + long writeSize = expectedTotalWriteSize / estimatedSSTables; + long totalAvailable = 0L; + - for (DataDirectory dataDir : dataFileLocations) ++ for (DataDirectory dataDir : dataDirectories) + { + if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) + continue; + DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir); + // exclude directory if its total writeSize does not fit to data directory + if (candidate.availableSpace < writeSize) + continue; + totalAvailable += candidate.availableSpace; + } + return totalAvailable > expectedTotalWriteSize; + } + - public static File getSnapshotDirectory(Descriptor desc, String snapshotName) { return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName); http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index d215b4c,6c6d3a2..eda09c0 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -19,19 -19,10 +19,20 @@@ package org.apache.cassandra.db.compact import java.io.File; import java.io.IOException; -import java.util.*; ++import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.google.common.base.Throwables; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@@ -83,18 -68,18 +84,20 @@@ public class CompactionTask extends Abs public boolean reduceScopeForLimitedSpace() { - if (partialCompactionsAcceptable() && toCompact.size() > 1) + if (partialCompactionsAcceptable() && sstables.size() > 1) { // Try again w/o the largest one. - logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", ")); + logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", ")); // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. - return sstables.remove(cfs.getMaxSizeFile(sstables)); - } - else - { - return false; - return toCompact.remove(cfs.getMaxSizeFile(toCompact)); - } - else - { - return false; ++ SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables); ++ if (sstables.remove(removedSSTable)) ++ { ++ cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable)); ++ return true; ++ } } ++ return false; } /** @@@ -118,159 -100,214 +121,173 @@@ if (DatabaseDescriptor.isSnapshotBeforeCompaction()) cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name); + // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but + // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here - long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes()); ++ long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(sstables, compactionType) / strategy.getMaxSSTableBytes()); + checkAvailableDiskSpace(earlySSTableEstimate); + // sanity check: all sstables must belong to the same cfs - for (SSTableReader sstable : toCompact) - assert sstable.descriptor.cfname.equals(cfs.name); - - UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact); + assert !Iterables.any(sstables, new Predicate<SSTableReader>() + { + @Override + public boolean apply(SSTableReader sstable) + { + return !sstable.descriptor.cfname.equals(cfs.name); + } + }); - CompactionController controller = getCompactionController(toCompact); - Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables()); + UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) - logger.info("Compacting {}", toCompact); + logger.info("Compacting {}", sstables); long start = System.nanoTime(); - long totalkeysWritten = 0; - - long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes()); - long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : " + keysPerSSTable); - - AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller) - : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller); - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); - - // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to - // replace the old entries. Track entries to preheat here until then. - Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>(); - - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); - Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>(); - - if (collector != null) - collector.beginCompaction(ci); - try + long totalKeysWritten = 0; + + try (CompactionController controller = getCompactionController(sstables);) { - if (!iter.hasNext()) - { - // don't mark compacted in the finally block, since if there _is_ nondeleted data, - // we need to sync it (via closeAndOpen) first, so there is no period during which - // a crash could cause data loss. - cfs.markObsolete(toCompact, compactionType); - return; - } + Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); - long writeSize = getExpectedWriteSize() / estimatedSSTables; - Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); - SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); - long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); ++ long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes()); + long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); + long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes()); + logger.debug("Expected bloom filter size : {}", keysPerSSTable); - AbstractCompactedRow row = iter.next(); - RowIndexEntry indexEntry = writer.append(row); - if (indexEntry == null) - { - controller.invalidateCachedRow(row.key); - row.close(); - continue; - } + List<SSTableReader> newSStables; + AbstractCompactionIterable ci; - totalkeysWritten++; - - if (DatabaseDescriptor.getPreheatKeyCache()) + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + { + ci = new CompactionIterable(compactionType, scanners.scanners, controller); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to + // replace the old entries. Track entries to preheat here until then. + long minRepairedAt = getMinRepairedAt(actuallyCompact); + // we only need the age of the data that we're actually retaining + long maxAge = getMaxDataAge(actuallyCompact); + if (collector != null) + collector.beginCompaction(ci); + long lastCheckObsoletion = start; + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline); + try { - for (SSTableReader sstable : actuallyCompact) + if (!iter.hasNext()) + { + // don't mark compacted in the finally block, since if there _is_ nondeleted data, + // we need to sync it (via closeAndOpen) first, so there is no period during which + // a crash could cause data loss. + cfs.markObsolete(sstables, compactionType); + return; + } + + writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt)); + while (iter.hasNext()) { - if (sstable.getCachedPosition(row.key, false) != null) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + AbstractCompactedRow row = iter.next(); + if (writer.append(row) != null) + { + totalKeysWritten++; + if (newSSTableSegmentThresholdReached(writer.currentWriter())) + { + writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt)); + } + } + + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - cachedKeys.put(row.key, indexEntry); - break; + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } } - } - if (newSSTableSegmentThresholdReached(writer)) + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + newSStables = writer.finish(); + } + catch (Throwable t) { - // tmp = false because later we want to query it with descriptor from SSTableReader - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - writeSize = getExpectedWriteSize() / estimatedSSTables; - dataDirectory = getWriteDirectory(writeSize); - writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - cachedKeys = new HashMap<>(); + writer.abort(); + throw t; } - } - - if (writer.getFilePointer() > 0) - { - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - } - else - { - writer.abort(); - writers.remove(writer); - } + finally + { + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); - long maxAge = getMaxDataAge(toCompact); - for (SSTableWriter completedWriter : writers) - sstables.add(completedWriter.closeAndOpenReader(maxAge)); - } - catch (Throwable t) - { - for (SSTableWriter writer : writers) - writer.abort(); - // also remove already completed SSTables - for (SSTableReader sstable : sstables) - { - sstable.markObsolete(); - sstable.releaseReference(); + if (collector != null) + collector.finishCompaction(ci); + } } - throw Throwables.propagate(t); - } - finally - { - controller.close(); - - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - if (collector != null) - collector.finishCompaction(ci); - - try - { - // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure - // we don't end up with compaction information hanging around indefinitely in limbo. - iter.close(); - } - catch (IOException e) + Collection<SSTableReader> oldSStables = this.sstables; + if (!offline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(oldSStables); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<>(); + for (int i = 0; i < counts.length; i++) { - throw new RuntimeException(e); - } - } + long count = counts[i]; + if (count == 0) + continue; - replaceCompactedSSTables(toCompact, sstables); - // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up - for (SSTableReader sstable : sstables) - { - if (sstable.acquireReference()) - { - try - { - sstable.preheat(cachedKeyMap.get(sstable.descriptor)); - } - finally - { - sstable.releaseReference(); - } + int rows = i + 1; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); } - } - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder builder = new StringBuilder(); - for (SSTableReader reader : sstables) - builder.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - Map<Integer, Long> mergedRows = new HashMap<Integer, Long>(); - for (int i = 0; i < counts.length; i++) - { - long count = counts[i]; - if (count == 0) - continue; - - int rows = i + 1; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - mergedRows.put(rows, count); + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); } + } - SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + private long getMinRepairedAt(Set<SSTableReader> actuallyCompact) + { + long minRepairedAt= Long.MAX_VALUE; + for (SSTableReader sstable : actuallyCompact) + minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt); + if (minRepairedAt == Long.MAX_VALUE) + return ActiveRepairService.UNREPAIRED_SSTABLE; + return minRepairedAt; } + protected void checkAvailableDiskSpace(long estimatedSSTables) + { + while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize())) + { + if (!reduceScopeForLimitedSpace()) + throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize())); + } + } + - private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable) + private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt) { assert sstableDirectory != null; return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),