Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/730cc606
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/730cc606
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/730cc606

Branch: refs/heads/cassandra-2.1
Commit: 730cc60640a683ef455c1b8856e4ed8fc39a460f
Parents: 0ba5f27 2cf4ca3
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed Apr 1 14:39:50 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Apr 1 14:39:50 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                     | 1 +
 src/java/org/apache/cassandra/db/compaction/CompactionTask.java | 3 +++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ab5fb2d,b6b5caf..3b99c61
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,70 -1,8 +1,71 @@@
 -2.0.14:
 +2.1.4
 + * Buffer bloom filter serialization (CASSANDRA-9066)
 + * Fix anti-compaction target bloom filter size (CASSANDRA-9060)
 + * Make FROZEN and TUPLE unreserved keywords in CQL (CASSANDRA-9047)
 + * Prevent AssertionError from SizeEstimatesRecorder (CASSANDRA-9034)
 + * Avoid overwriting index summaries for sstables with an older format that
 +   does not support downsampling; rebuild summaries on startup when this
 +   is detected (CASSANDRA-8993)
 + * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
 + * Make PasswordAuthenticator number of hashing rounds configurable 
(CASSANDRA-8085)
 + * Fix AssertionError when binding nested collections in DELETE 
(CASSANDRA-8900)
 + * Check for overlap with non-early sstables in LCS (CASSANDRA-8739)
 + * Only calculate max purgable timestamp if we have to (CASSANDRA-8914)
 + * (cqlsh) Greatly improve performance of COPY FROM (CASSANDRA-8225)
 + * IndexSummary effectiveIndexInterval is now a guideline, not a rule 
(CASSANDRA-8993)
 + * Use correct bounds for page cache eviction of compressed files 
(CASSANDRA-8746)
 + * SSTableScanner enforces its bounds (CASSANDRA-8946)
 + * Cleanup cell equality (CASSANDRA-8947)
 + * Introduce intra-cluster message coalescing (CASSANDRA-8692)
 + * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)
 + * Don't check if an sstable is live for offline compactions (CASSANDRA-8841)
 + * Don't set clientMode in SSTableLoader (CASSANDRA-8238)
 + * Fix SSTableRewriter with disabled early open (CASSANDRA-8535)
 + * Allow invalidating permissions and cache time (CASSANDRA-8722)
 + * Log warning when queries that will require ALLOW FILTERING in Cassandra 3.0
 +   are executed (CASSANDRA-8418)
 + * Fix cassandra-stress so it respects the CL passed in user mode 
(CASSANDRA-8948)
 + * Fix rare NPE in ColumnDefinition#hasIndexOption() (CASSANDRA-8786)
 + * cassandra-stress reports per-operation statistics, plus misc 
(CASSANDRA-8769)
 + * Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523)
 + * Use long for key count in cfstats (CASSANDRA-8913)
 + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
 + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
 + * Make EstimatedHistogram#percentile() use ceil instead of floor 
(CASSANDRA-8883)
 + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
 + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
 + * Pick sstables for validation as late as possible inc repairs 
(CASSANDRA-8366)
 + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
 + * Fix parallelism adjustment in range and secondary index queries
 +   when the first fetch does not satisfy the limit (CASSANDRA-8856)
 + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
 + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
 + * Fix CommitLog.forceRecycleAllSegments() memory access error 
(CASSANDRA-8812)
 + * Improve assertions in Memory (CASSANDRA-8792)
 + * Fix SSTableRewriter cleanup (CASSANDRA-8802)
 + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableReader.last corresponds exactly with the file end 
(CASSANDRA-8750)
 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
 + * Enforce SSTableReader.first/last (CASSANDRA-8744)
 + * Cleanup SegmentedFile API (CASSANDRA-8749)
 + * Avoid overlap with early compaction replacement (CASSANDRA-8683)
 + * Safer Resource Management++ (CASSANDRA-8707)
 + * Write partition size estimates into a system table (CASSANDRA-7688)
 + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
 +   (CASSANDRA-8154)
 + * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
 + * IndexSummaryBuilder utilises offheap memory, and shares data between
 +   each IndexSummary opened from it (CASSANDRA-8757)
 + * markCompacting only succeeds if the exact SSTableReader instances being 
 +   marked are in the live set (CASSANDRA-8689)
 + * cassandra-stress support for varint (CASSANDRA-8882)
 + * Fix Adler32 digest for compressed sstables (CASSANDRA-8778)
 + * Add nodetool statushandoff/statusbackup (CASSANDRA-8912)
 + * Use stdout for progress and stats in sstableloader (CASSANDRA-8982)
 +Merged from 2.0:
+  * Avoid race in cancelling compactions (CASSANDRA-9070)
   * More aggressive check for expired sstables in DTCS (CASSANDRA-8359)
 - * Don't set clientMode to true when bulk-loading sstables to avoid
 -   a NullPointerException (CASSANDRA-8238)
   * Fix ignored index_interval change in ALTER TABLE statements 
(CASSANDRA-7976)
   * Do more aggressive compaction in old time windows in DTCS (CASSANDRA-8360)
   * java.lang.AssertionError when reading saved cache (CASSANDRA-8740)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4d9b463,9f7c8dd..392034c
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -137,148 -117,188 +137,151 @@@ public class CompactionTask extends Abs
          // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
          // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
          // all the sstables (that existed when we started)
 -        logger.info("Compacting {}", toCompact);
 +        logger.info("Compacting {}", sstables);
  
          long start = System.nanoTime();
 -        long totalkeysWritten = 0;
 -
 -        long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
 -        long estimatedSSTables = Math.max(1, 
cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / 
strategy.getMaxSSTableBytes());
 -        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / 
estimatedSSTables);
 -        if (logger.isDebugEnabled())
 -            logger.debug("Expected bloom filter size : " + keysPerSSTable);
 -
 -        AbstractCompactionIterable ci = 
DatabaseDescriptor.isMultithreadedCompaction()
 -                                      ? new 
ParallelCompactionIterable(compactionType, 
strategy.getScanners(actuallyCompact), controller)
 -                                      : new 
CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), 
controller);
 -        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
 -        Map<DecoratedKey, RowIndexEntry> cachedKeys = new 
HashMap<DecoratedKey, RowIndexEntry>();
 -
 -        // we can't preheat until the tracker has been set. This doesn't 
happen until we tell the cfs to
 -        // replace the old entries.  Track entries to preheat here until then.
 -        Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new 
HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
 -
 -        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
 -        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
 -
 -        if (collector != null)
 -            collector.beginCompaction(ci);
 -        try
 -        {
 -            if (!controller.cfs.getCompactionStrategy().isActive)
 -                throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+ 
 -            if (!iter.hasNext())
 -            {
 -                // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
 -                // we need to sync it (via closeAndOpen) first, so there is 
no period during which
 -                // a crash could cause data loss.
 -                cfs.markObsolete(toCompact, compactionType);
 -                return;
 -            }
 +        long totalKeysWritten = 0;
  
 -            long writeSize = getExpectedWriteSize() / estimatedSSTables;
 -            Directories.DataDirectory dataDirectory = 
getWriteDirectory(writeSize);
 -            SSTableWriter writer = 
createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), 
keysPerSSTable);
 -            writers.add(writer);
 -            while (iter.hasNext())
 -            {
 -                if (ci.isStopRequested())
 -                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 +        try (CompactionController controller = 
getCompactionController(sstables);)
 +        {
 +            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, 
controller.getFullyExpiredSSTables());
  
 -                AbstractCompactedRow row = iter.next();
 -                RowIndexEntry indexEntry = writer.append(row);
 -                if (indexEntry == null)
 -                {
 -                    controller.invalidateCachedRow(row.key);
 -                    row.close();
 -                    continue;
 -                }
 +            long estimatedTotalKeys = 
Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
 +            long estimatedSSTables = Math.max(1, 
cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / 
strategy.getMaxSSTableBytes());
 +            long keysPerSSTable = (long) Math.ceil((double) 
estimatedTotalKeys / estimatedSSTables);
 +            long expectedSSTableSize = Math.min(getExpectedWriteSize(), 
strategy.getMaxSSTableBytes());
 +            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
  
 -                totalkeysWritten++;
 +            List<SSTableReader> newSStables;
 +            AbstractCompactionIterable ci;
  
 -                if (DatabaseDescriptor.getPreheatKeyCache())
 +            // SSTableScanners need to be closed before 
markCompactedSSTablesReplaced call as scanners contain references
 +            // to both ifile and dfile and SSTR will throw deletion errors on 
Windows if it tries to delete before scanner is closed.
 +            // See CASSANDRA-8019 and CASSANDRA-8399
 +            try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(actuallyCompact))
 +            {
 +                ci = new CompactionIterable(compactionType, 
scanners.scanners, controller);
 +                Iterator<AbstractCompactedRow> iter = ci.iterator();
 +                // we can't preheat until the tracker has been set. This 
doesn't happen until we tell the cfs to
 +                // replace the old entries.  Track entries to preheat here 
until then.
 +                long minRepairedAt = getMinRepairedAt(actuallyCompact);
 +                // we only need the age of the data that we're actually 
retaining
 +                long maxAge = getMaxDataAge(actuallyCompact);
 +                if (collector != null)
 +                    collector.beginCompaction(ci);
 +                long lastCheckObsoletion = start;
 +                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 
maxAge, offline);
 +                try
                  {
 -                    for (SSTableReader sstable : actuallyCompact)
++                    if (!controller.cfs.getCompactionStrategy().isActive)
++                       throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 +                    if (!iter.hasNext())
 +                    {
 +                        // don't mark compacted in the finally block, since 
if there _is_ nondeleted data,
 +                        // we need to sync it (via closeAndOpen) first, so 
there is no period during which
 +                        // a crash could cause data loss.
 +                        cfs.markObsolete(sstables, compactionType);
 +                        return;
 +                    }
 +
 +                    
writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)),
 keysPerSSTable, minRepairedAt));
 +                    while (iter.hasNext())
                      {
 -                        if (sstable.getCachedPosition(row.key, false) != null)
 +                        if (ci.isStopRequested())
 +                            throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 +
 +                        AbstractCompactedRow row = iter.next();
 +                        if (writer.append(row) != null)
 +                        {
 +                            totalKeysWritten++;
 +                            if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
 +                            {
 +                                
writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)),
 keysPerSSTable, minRepairedAt));
 +                            }
 +                        }
 +
 +                        if (System.nanoTime() - lastCheckObsoletion > 
TimeUnit.MINUTES.toNanos(1L))
                          {
 -                            cachedKeys.put(row.key, indexEntry);
 -                            break;
 +                            controller.maybeRefreshOverlaps();
 +                            lastCheckObsoletion = System.nanoTime();
                          }
                      }
 -                }
  
 -                if (newSSTableSegmentThresholdReached(writer))
 +                    // don't replace old sstables yet, as we need to mark the 
compaction finished in the system table
 +                    newSStables = writer.finish();
 +                }
 +                catch (Throwable t)
                  {
 -                    // tmp = false because later we want to query it with 
descriptor from SSTableReader
 -                    cachedKeyMap.put(writer.descriptor.asTemporary(false), 
cachedKeys);
 -                    writeSize = getExpectedWriteSize() / estimatedSSTables;
 -                    dataDirectory = getWriteDirectory(writeSize);
 -                    writer = 
createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), 
keysPerSSTable);
 -                    writers.add(writer);
 -                    cachedKeys = new HashMap<>();
 +                    try
 +                    {
 +                        writer.abort();
 +                    }
 +                    catch (Throwable t2)
 +                    {
 +                        t.addSuppressed(t2);
 +                    }
 +                    throw t;
                  }
 -            }
 -
 -            if (writer.getFilePointer() > 0)
 -            {
 -                cachedKeyMap.put(writer.descriptor.asTemporary(false), 
cachedKeys);
 -            }
 -            else
 -            {
 -                writer.abort();
 -                writers.remove(writer);
 -            }
 +                finally
 +                {
 +                    // point of no return -- the new sstables are live on 
disk; next we'll start deleting the old ones
 +                    // (in replaceCompactedSSTables)
 +                    if (taskId != null)
 +                        SystemKeyspace.finishCompaction(taskId);
  
 -            long maxAge = getMaxDataAge(toCompact);
 -            for (SSTableWriter completedWriter : writers)
 -                sstables.add(completedWriter.closeAndOpenReader(maxAge));
 -        }
 -        catch (Throwable t)
 -        {
 -            for (SSTableWriter writer : writers)
 -                writer.abort();
 -            // also remove already completed SSTables
 -            for (SSTableReader sstable : sstables)
 -            {
 -                sstable.markObsolete();
 -                sstable.releaseReference();
 +                    if (collector != null)
 +                        collector.finishCompaction(ci);
 +                }
              }
 -            throw Throwables.propagate(t);
 -        }
 -        finally
 -        {
 -            controller.close();
 -
 -            // point of no return -- the new sstables are live on disk; next 
we'll start deleting the old ones
 -            // (in replaceCompactedSSTables)
 -            if (taskId != null)
 -                SystemKeyspace.finishCompaction(taskId);
 -
 -            if (collector != null)
 -                collector.finishCompaction(ci);
  
 -            try
 +            Collection<SSTableReader> oldSStables = this.sstables;
 +            if (!offline)
 +                
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, 
compactionType);
 +
 +            // log a bunch of statistics about the result and save to system 
table compaction_history
 +            long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
 +            long startsize = SSTableReader.getTotalBytes(oldSStables);
 +            long endsize = SSTableReader.getTotalBytes(newSStables);
 +            double ratio = (double) endsize / (double) startsize;
 +
 +            StringBuilder newSSTableNames = new StringBuilder();
 +            for (SSTableReader reader : newSStables)
 +                
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 +
 +            double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
 +            long totalSourceRows = 0;
 +            long[] counts = ci.getMergedRowCounts();
 +            StringBuilder mergeSummary = new StringBuilder(counts.length * 
10);
 +            Map<Integer, Long> mergedRows = new HashMap<>();
 +            for (int i = 0; i < counts.length; i++)
              {
 -                // We don't expect this to throw, but just in case, we do it 
after the cleanup above, to make sure
 -                // we don't end up with compaction information hanging around 
indefinitely in limbo.
 -                iter.close();
 -            }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 -        }
 +                long count = counts[i];
 +                if (count == 0)
 +                    continue;
  
 -        replaceCompactedSSTables(toCompact, sstables);
 -        // TODO: this doesn't belong here, it should be part of the reader to 
load when the tracker is wired up
 -        for (SSTableReader sstable : sstables)
 -        {
 -            if (sstable.acquireReference())
 -            {
 -                try
 -                {
 -                    sstable.preheat(cachedKeyMap.get(sstable.descriptor));
 -                }
 -                finally
 -                {
 -                    sstable.releaseReference();
 -                }
 +                int rows = i + 1;
 +                totalSourceRows += rows * count;
 +                mergeSummary.append(String.format("%d:%d, ", rows, count));
 +                mergedRows.put(rows, count);
              }
 -        }
  
 -        // log a bunch of statistics about the result and save to system 
table compaction_history
 -        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 -        long startsize = SSTable.getTotalBytes(toCompact);
 -        long endsize = SSTable.getTotalBytes(sstables);
 -        double ratio = (double) endsize / (double) startsize;
 -
 -        StringBuilder builder = new StringBuilder();
 -        for (SSTableReader reader : sstables)
 -            builder.append(reader.descriptor.baseFilename()).append(",");
 -
 -        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
 -        long totalSourceRows = 0;
 -        long[] counts = ci.getMergedRowCounts();
 -        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 -        Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
 -        for (int i = 0; i < counts.length; i++)
 -        {
 -            long count = counts[i];
 -            if (count == 0)
 -                continue;
 -
 -            int rows = i + 1;
 -            totalSourceRows += rows * count;
 -            mergeSummary.append(String.format("%d:%d, ", rows, count));
 -            mergedRows.put(rows, count);
 +            SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), 
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 +            logger.info(String.format("Compacted %d sstables to [%s].  %,d 
bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions 
merged to %,d.  Partition merge counts were {%s}",
 +                                      oldSStables.size(), 
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, 
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
 +            logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
 +            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", 
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - 
estimatedTotalKeys)/totalKeysWritten));
          }
 +    }
  
 -        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), 
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes 
to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to 
%,d.  Partition merge counts were {%s}",
 -                                  toCompact.size(), builder.toString(), 
startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, 
totalkeysWritten, mergeSummary.toString()));
 -        logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
 +    private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
 +    {
 +        long minRepairedAt= Long.MAX_VALUE;
 +        for (SSTableReader sstable : actuallyCompact)
 +            minRepairedAt = Math.min(minRepairedAt, 
sstable.getSSTableMetadata().repairedAt);
 +        if (minRepairedAt == Long.MAX_VALUE)
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
 +        return minRepairedAt;
      }
  
      protected void checkAvailableDiskSpace(long estimatedSSTables)

Reply via email to