Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/730cc606 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/730cc606 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/730cc606 Branch: refs/heads/cassandra-2.1 Commit: 730cc60640a683ef455c1b8856e4ed8fc39a460f Parents: 0ba5f27 2cf4ca3 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Apr 1 14:39:50 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Apr 1 14:39:50 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/compaction/CompactionTask.java | 3 +++ 2 files changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ab5fb2d,b6b5caf..3b99c61 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,70 -1,8 +1,71 @@@ -2.0.14: +2.1.4 + * Buffer bloom filter serialization (CASSANDRA-9066) + * Fix anti-compaction target bloom filter size (CASSANDRA-9060) + * Make FROZEN and TUPLE unreserved keywords in CQL (CASSANDRA-9047) + * Prevent AssertionError from SizeEstimatesRecorder (CASSANDRA-9034) + * Avoid overwriting index summaries for sstables with an older format that + does not support downsampling; rebuild summaries on startup when this + is detected (CASSANDRA-8993) + * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949) + * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085) + * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900) + * Check for overlap with non-early sstables in LCS (CASSANDRA-8739) + * Only calculate max purgable timestamp if we have to (CASSANDRA-8914) + * (cqlsh) Greatly improve performance of COPY FROM (CASSANDRA-8225) + * IndexSummary effectiveIndexInterval is now a guideline, not a rule (CASSANDRA-8993) + * Use correct bounds for page cache eviction of compressed files (CASSANDRA-8746) + * SSTableScanner enforces its bounds (CASSANDRA-8946) + * Cleanup cell equality (CASSANDRA-8947) + * Introduce intra-cluster message coalescing (CASSANDRA-8692) + * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839) + * Don't check if an sstable is live for offline compactions (CASSANDRA-8841) + * Don't set clientMode in SSTableLoader (CASSANDRA-8238) + * Fix SSTableRewriter with disabled early open (CASSANDRA-8535) + * Allow invalidating permissions and cache time (CASSANDRA-8722) + * Log warning when queries that will require ALLOW FILTERING in Cassandra 3.0 + are executed (CASSANDRA-8418) + * Fix cassandra-stress so it respects the CL passed in user mode (CASSANDRA-8948) + * Fix rare NPE in ColumnDefinition#hasIndexOption() (CASSANDRA-8786) + * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769) + * Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523) + * Use long for key count in cfstats (CASSANDRA-8913) + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832) + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860) + * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883) + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834) + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067) + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366) + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856) + * Fix parallelism adjustment in range and secondary index queries + when the first fetch does not satisfy the limit (CASSANDRA-8856) + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843) + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842) + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812) + * Improve assertions in Memory (CASSANDRA-8792) + * Fix SSTableRewriter cleanup (CASSANDRA-8802) + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758) + * 'nodetool info' prints exception against older node (CASSANDRA-8796) + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750) + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747) + * Enforce SSTableReader.first/last (CASSANDRA-8744) + * Cleanup SegmentedFile API (CASSANDRA-8749) + * Avoid overlap with early compaction replacement (CASSANDRA-8683) + * Safer Resource Management++ (CASSANDRA-8707) + * Write partition size estimates into a system table (CASSANDRA-7688) + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output + (CASSANDRA-8154) + * Show progress of streaming in nodetool netstats (CASSANDRA-8886) + * IndexSummaryBuilder utilises offheap memory, and shares data between + each IndexSummary opened from it (CASSANDRA-8757) + * markCompacting only succeeds if the exact SSTableReader instances being + marked are in the live set (CASSANDRA-8689) + * cassandra-stress support for varint (CASSANDRA-8882) + * Fix Adler32 digest for compressed sstables (CASSANDRA-8778) + * Add nodetool statushandoff/statusbackup (CASSANDRA-8912) + * Use stdout for progress and stats in sstableloader (CASSANDRA-8982) +Merged from 2.0: + * Avoid race in cancelling compactions (CASSANDRA-9070) * More aggressive check for expired sstables in DTCS (CASSANDRA-8359) - * Don't set clientMode to true when bulk-loading sstables to avoid - a NullPointerException (CASSANDRA-8238) * Fix ignored index_interval change in ALTER TABLE statements (CASSANDRA-7976) * Do more aggressive compaction in old time windows in DTCS (CASSANDRA-8360) * java.lang.AssertionError when reading saved cache (CASSANDRA-8740) http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4d9b463,9f7c8dd..392034c --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -137,148 -117,188 +137,151 @@@ public class CompactionTask extends Abs // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) - logger.info("Compacting {}", toCompact); + logger.info("Compacting {}", sstables); long start = System.nanoTime(); - long totalkeysWritten = 0; - - long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes()); - long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : " + keysPerSSTable); - - AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller) - : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller); - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); - - // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to - // replace the old entries. Track entries to preheat here until then. - Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>(); - - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); - Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>(); - - if (collector != null) - collector.beginCompaction(ci); - try - { - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + - if (!iter.hasNext()) - { - // don't mark compacted in the finally block, since if there _is_ nondeleted data, - // we need to sync it (via closeAndOpen) first, so there is no period during which - // a crash could cause data loss. - cfs.markObsolete(toCompact, compactionType); - return; - } + long totalKeysWritten = 0; - long writeSize = getExpectedWriteSize() / estimatedSSTables; - Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); - SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + try (CompactionController controller = getCompactionController(sstables);) + { + Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); - AbstractCompactedRow row = iter.next(); - RowIndexEntry indexEntry = writer.append(row); - if (indexEntry == null) - { - controller.invalidateCachedRow(row.key); - row.close(); - continue; - } + long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); + long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes()); + long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); + long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes()); + logger.debug("Expected bloom filter size : {}", keysPerSSTable); - totalkeysWritten++; + List<SSTableReader> newSStables; + AbstractCompactionIterable ci; - if (DatabaseDescriptor.getPreheatKeyCache()) + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + { + ci = new CompactionIterable(compactionType, scanners.scanners, controller); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to + // replace the old entries. Track entries to preheat here until then. + long minRepairedAt = getMinRepairedAt(actuallyCompact); + // we only need the age of the data that we're actually retaining + long maxAge = getMaxDataAge(actuallyCompact); + if (collector != null) + collector.beginCompaction(ci); + long lastCheckObsoletion = start; + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline); + try { - for (SSTableReader sstable : actuallyCompact) ++ if (!controller.cfs.getCompactionStrategy().isActive) ++ throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (!iter.hasNext()) + { + // don't mark compacted in the finally block, since if there _is_ nondeleted data, + // we need to sync it (via closeAndOpen) first, so there is no period during which + // a crash could cause data loss. + cfs.markObsolete(sstables, compactionType); + return; + } + + writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt)); + while (iter.hasNext()) { - if (sstable.getCachedPosition(row.key, false) != null) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + AbstractCompactedRow row = iter.next(); + if (writer.append(row) != null) + { + totalKeysWritten++; + if (newSSTableSegmentThresholdReached(writer.currentWriter())) + { + writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt)); + } + } + + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - cachedKeys.put(row.key, indexEntry); - break; + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } } - } - if (newSSTableSegmentThresholdReached(writer)) + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + newSStables = writer.finish(); + } + catch (Throwable t) { - // tmp = false because later we want to query it with descriptor from SSTableReader - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - writeSize = getExpectedWriteSize() / estimatedSSTables; - dataDirectory = getWriteDirectory(writeSize); - writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - cachedKeys = new HashMap<>(); + try + { + writer.abort(); + } + catch (Throwable t2) + { + t.addSuppressed(t2); + } + throw t; } - } - - if (writer.getFilePointer() > 0) - { - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - } - else - { - writer.abort(); - writers.remove(writer); - } + finally + { + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); - long maxAge = getMaxDataAge(toCompact); - for (SSTableWriter completedWriter : writers) - sstables.add(completedWriter.closeAndOpenReader(maxAge)); - } - catch (Throwable t) - { - for (SSTableWriter writer : writers) - writer.abort(); - // also remove already completed SSTables - for (SSTableReader sstable : sstables) - { - sstable.markObsolete(); - sstable.releaseReference(); + if (collector != null) + collector.finishCompaction(ci); + } } - throw Throwables.propagate(t); - } - finally - { - controller.close(); - - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - - if (collector != null) - collector.finishCompaction(ci); - try + Collection<SSTableReader> oldSStables = this.sstables; + if (!offline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(oldSStables); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<>(); + for (int i = 0; i < counts.length; i++) { - // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure - // we don't end up with compaction information hanging around indefinitely in limbo. - iter.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } + long count = counts[i]; + if (count == 0) + continue; - replaceCompactedSSTables(toCompact, sstables); - // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up - for (SSTableReader sstable : sstables) - { - if (sstable.acquireReference()) - { - try - { - sstable.preheat(cachedKeyMap.get(sstable.descriptor)); - } - finally - { - sstable.releaseReference(); - } + int rows = i + 1; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); } - } - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder builder = new StringBuilder(); - for (SSTableReader reader : sstables) - builder.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - Map<Integer, Long> mergedRows = new HashMap<Integer, Long>(); - for (int i = 0; i < counts.length; i++) - { - long count = counts[i]; - if (count == 0) - continue; - - int rows = i + 1; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - mergedRows.put(rows, count); + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); } + } - SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + private long getMinRepairedAt(Set<SSTableReader> actuallyCompact) + { + long minRepairedAt= Long.MAX_VALUE; + for (SSTableReader sstable : actuallyCompact) + minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt); + if (minRepairedAt == Long.MAX_VALUE) + return ActiveRepairService.UNREPAIRED_SSTABLE; + return minRepairedAt; } protected void checkAvailableDiskSpace(long estimatedSSTables)