Merge branch 'cassandra-2.1' into cassandra-2.2 Conflicts: src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/efc57cdf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/efc57cdf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/efc57cdf Branch: refs/heads/cassandra-2.2 Commit: efc57cdfe262c78db82ba029f5ba7f9294c38c04 Parents: 1735249 bdbc61f Author: Marcus Eriksson <marc...@apache.org> Authored: Mon May 18 06:45:36 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon May 18 06:45:36 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/compaction/CompactionManager.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/efc57cdf/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 9f14fba,198935b..7885e0e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,119 -1,5 +1,120 @@@ +2.2.0-beta1 + * Introduce Transactional API for internal state changes (CASSANDRA-8984) + * Add a flag in cassandra.yaml to enable UDFs (CASSANDRA-9404) + * Better support of null for UDF (CASSANDRA-8374) + * Use ecj instead of javassist for UDFs (CASSANDRA-8241) + * faster async logback configuration for tests (CASSANDRA-9376) + * Add `smallint` and `tinyint` data types (CASSANDRA-8951) + * Avoid thrift schema creation when native driver is used in stress tool (CASSANDRA-9374) + * Populate TokenMetadata early during startup (CASSANDRA-9317) + * Make Functions.declared thread-safe + * Add client warnings to native protocol v4 (CASSANDRA-8930) + * Allow roles cache to be invalidated (CASSANDRA-8967) + * Upgrade Snappy (CASSANDRA-9063) + * Don't start Thrift rpc by default (CASSANDRA-9319) + * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267) + * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321) + * Remove Thrift dependencies in bundled tools (CASSANDRA-8358) + * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242) + * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049) + * Distinguish between null and unset in protocol v4 (CASSANDRA-7304) + * Add user/role permissions for user-defined functions (CASSANDRA-7557) + * Allow cassandra config to be updated to restart daemon without unloading classes (CASSANDRA-9046) + * Don't initialize compaction writer before checking if iter is empty (CASSANDRA-9117) + * Don't execute any functions at prepare-time (CASSANDRA-9037) + * Share file handles between all instances of a SegmentedFile (CASSANDRA-8893) + * Make it possible to major compact LCS (CASSANDRA-7272) + * Make FunctionExecutionException extend RequestExecutionException + (CASSANDRA-9055) + * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), fromJson() + functions (CASSANDRA-7970) + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920) + * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) + * New tool added to validate all sstables in a node (CASSANDRA-5791) + * Push notification when tracing completes for an operation (CASSANDRA-7807) + * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) + * Compressed Commit Log (CASSANDRA-6809) + * Optimise IntervalTree (CASSANDRA-8988) + * Add a key-value payload for third party usage (CASSANDRA-8553, 9212) + * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149) + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789) + * Add WriteFailureException to native protocol, notify coordinator of + write failures (CASSANDRA-8592) + * Convert SequentialWriter to nio (CASSANDRA-8709) + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850) + * Record client ip address in tracing sessions (CASSANDRA-8162) + * Indicate partition key columns in response metadata for prepared + statements (CASSANDRA-7660) + * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759) + * Avoid memory allocation when searching index summary (CASSANDRA-8793) + * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730) + * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836) + * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714, 9197) + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268) + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657) + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) + * Support direct buffer decompression for reads (CASSANDRA-8464) + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039) + * Group sstables for anticompaction correctly (CASSANDRA-8578) + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) + * Allow mixing token and partition key restrictions (CASSANDRA-7016) + * Support index key/value entries on map collections (CASSANDRA-8473) + * Modernize schema tables (CASSANDRA-8261) + * Support for user-defined aggregation functions (CASSANDRA-8053) + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) + * Refactor SelectStatement, return IN results in natural order instead + of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981) + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) + * Fix aggregate fn results on empty selection, result column name, + and cqlsh parsing (CASSANDRA-8229) + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any partition key column (CASSANDRA-7855) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * Improve concurrency of repair (CASSANDRA-6455, 8208, 9145) + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) + * Generalize progress reporting (CASSANDRA-8901) + * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942) + * Allow scrub for secondary index (CASSANDRA-5174) + * Save repair data to system table (CASSANDRA-5839) + * fix nodetool names that reference column families (CASSANDRA-8872) + + 2.1.6 + * Use configured gcgs in anticompaction (CASSANDRA-9397) * Warn on misuse of unlogged batches (CASSANDRA-9282) * Failure detector detects and ignores local pauses (CASSANDRA-9183) * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029) http://git-wip-us.apache.org/repos/asf/cassandra/blob/efc57cdf/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index fc83cc5,c7232a0..5d5464c --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1153,83 -1060,68 +1153,83 @@@ public class CompactionManager implemen if (!new File(sstable.getFilename()).exists()) { logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable); + i.remove(); continue; } + if (groupMaxDataAge < sstable.maxDataAge) + groupMaxDataAge = sstable.maxDataAge; + } - logger.info("Anticompacting {}", sstable); - Set<SSTableReader> sstableAsSet = new HashSet<>(); - sstableAsSet.add(sstable); + + if (anticompactionGroup.size() == 0) + { + logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); + return 0; + } - File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false); + logger.info("Anticompacting {}", anticompactionGroup); + Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup); + + File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); + long repairedKeyCount = 0; + long unrepairedKeyCount = 0; + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup); - CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) ++ CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) + { + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup))); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable))); - CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) - { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys()); - repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + metrics.beginCompaction(ci); + try + { + while (iter.hasNext()) { - while (iter.hasNext()) + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable + if (Range.isInRanges(row.key.getToken(), ranges)) { - AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else + { + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } } - finally - { - metrics.finishCompaction(ci); - } - anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); } - catch (Throwable e) + finally { - JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + sstable, e); - repairedSSTableWriter.abort(); - unRepairedSSTableWriter.abort(); + metrics.finishCompaction(ci); } + // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them + // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness + List<SSTableReader> anticompactedSSTables = new ArrayList<>(); + anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish()); + anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish()); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); + + logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, + repairedKeyCount + unrepairedKeyCount, + cfs.keyspace.getName(), + cfs.getColumnFamilyName(), + anticompactionGroup); + return anticompactedSSTables.size(); } - String format = "Repaired {} keys of {} for {}/{}"; - logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); - String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size()); - - return anticompactedSSTables; + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Error anticompacting " + anticompactionGroup, e); + } + return 0; } /**