Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb41380c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb41380c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb41380c Branch: refs/heads/cassandra-3.0 Commit: eb41380cc27277e34edf2c74f535588fd1382a9a Parents: 14f36fc 7d2fdfe Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Fri Nov 18 12:35:32 2016 +0200 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Fri Nov 18 12:36:26 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 5 +- .../db/compaction/CompactionController.java | 50 ++++--- .../db/compaction/CompactionIterator.java | 22 +-- .../db/compaction/CompactionManager.java | 5 +- .../db/compaction/SSTableSplitter.java | 5 +- .../cassandra/db/compaction/Upgrader.java | 5 +- .../cassandra/db/compaction/Verifier.java | 5 +- .../cassandra/db/partitions/PurgeFunction.java | 6 +- .../db/compaction/CompactionControllerTest.java | 21 ++- .../db/compaction/CompactionsPurgeTest.java | 138 ++++++++++++++++++- 11 files changed, 213 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index efc681d,54dc4b5..8a3ac65 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,42 -1,5 +1,43 @@@ -2.2.9 +3.0.11 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535) + +3.0.10 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889) + * Fix partition count log during compaction (CASSANDRA-12184) + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867) + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788) + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854) + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296) + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689) + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801) + * Include SSTable filename in compacting large row message (CASSANDRA-12384) + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) + * Fix ViewTest.testCompaction (CASSANDRA-12789) + * Improve avg aggregate functions (CASSANDRA-12417) + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803) + * nodetool stopdaemon errors out (CASSANDRA-12646) + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268) + * mx4j does not work in 3.0.8 (CASSANDRA-12274) + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740) + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478) + * Fix exceptions with new vnode allocation (CASSANDRA-12715) + * Unify drain and shutdown processes (CASSANDRA-12509) + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706) + * Fix failure in LogTransactionTest (CASSANDRA-12632) + * Fix potentially incomplete non-frozen UDT values when querying with the + full primary key specified (CASSANDRA-12605) + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670) + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060) + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516) + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472) + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499) + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545) +Merged from 2.2: + * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) * Fix Util.spinAssertEquals (CASSANDRA-12283) http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 70c770d,cd86336..64da428 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -17,124 -17,39 +17,125 @@@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.*; ++import java.util.function.Predicate; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.IReadCommand; -import org.apache.cassandra.service.RowDataResolver; -import org.apache.cassandra.service.pager.Pageable; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.UnknownIndexException; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; -public abstract class ReadCommand implements IReadCommand, Pageable +/** + * General interface for storage-engine read commands (common to both range and + * single partition commands). + * <p> + * This contains all the informations needed to do a local read. + */ +public abstract class ReadCommand implements ReadQuery { - public enum Type + protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); + + public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); + + // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>() { - GET_BY_NAMES((byte)1), - GET_SLICES((byte)2); + protected IVersionedSerializer<ReadCommand> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyReadCommandSerializer : serializer; + } + }; - public final byte serializedValue; + // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>() + { + protected IVersionedSerializer<ReadCommand> delegate(int version) + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer : serializer; + } + }; - private Type(byte b) + // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>() + { + protected IVersionedSerializer<ReadCommand> delegate(int version) { - this.serializedValue = b; + return version < MessagingService.VERSION_30 + ? legacyPagedRangeCommandSerializer : serializer; } + }; + + public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer(); + + private final Kind kind; + private final CFMetaData metadata; + private final int nowInSec; + + private final ColumnFilter columnFilter; + private final RowFilter rowFilter; + private final DataLimits limits; + + // SecondaryIndexManager will attempt to provide the most selective of any available indexes + // during execution. Here we also store an the results of that lookup to repeating it over + // the lifetime of the command. + protected Optional<IndexMetadata> index = Optional.empty(); - public static Type fromSerializedValue(byte b) + // Flag to indicate whether the index manager has been queried to select an index for this + // command. This is necessary as the result of that lookup may be null, in which case we + // still don't want to repeat it. + private boolean indexManagerQueried = false; + + private boolean isDigestQuery; + // if a digest query, the version for which the digest is expected. Ignored if not a digest. + private int digestVersion; + private final boolean isForThrift; + + protected static abstract class SelectionDeserializer + { + public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException; + } + + protected enum Kind + { + SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer), + PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer); + + private final SelectionDeserializer selectionDeserializer; + + Kind(SelectionDeserializer selectionDeserializer) { - return b == 1 ? GET_BY_NAMES : GET_SLICES; + this.selectionDeserializer = selectionDeserializer; } } @@@ -263,699 -95,55 +264,699 @@@ return this; } - public String getColumnFamilyName() + /** + * Sets the digest version, for when digest for that command is requested. + * <p> + * Note that we allow setting this independently of setting the command as a digest query as + * this allows us to use the command as a carrier of the digest version even if we only call + * setIsDigestQuery on some copy of it. + * + * @param digestVersion the version for the digest is this command is used for digest query.. + * @return this read command. + */ + public ReadCommand setDigestVersion(int digestVersion) { - return cfName; + this.digestVersion = digestVersion; + return this; } + /** + * Whether this query is for thrift or not. + * + * @return whether this query is for thrift. + */ + public boolean isForThrift() + { + return isForThrift; + } + + /** + * The clustering index filter this command to use for the provided key. + * <p> + * Note that that method should only be called on a key actually queried by this command + * and in practice, this will almost always return the same filter, but for the sake of + * paging, the filter on the first key of a range command might be slightly different. + * + * @param key a partition key queried by this command. + * + * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. + */ + public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); + + /** + * Returns a copy of this command. + * + * @return a copy of this command. + */ public abstract ReadCommand copy(); - public abstract Row getRow(Keyspace keyspace); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); + + protected abstract int oldestUnrepairedTombstone(); - public abstract IDiskAtomFilter filter(); + public ReadResponse createResponse(UnfilteredPartitionIterator iterator) + { + return isDigestQuery() + ? ReadResponse.createDigestResponse(iterator, this) + : ReadResponse.createDataResponse(iterator, this); + } - public String getKeyspace() + public long indexSerializedSize(int version) { - return ksName; + if (index.isPresent()) + return IndexMetadata.serializer.serializedSize(index.get(), version); + else + return 0; } - // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) + public Index getIndex(ColumnFamilyStore cfs) { - return null; + // if we've already consulted the index manager, and it returned a valid index + // the result should be cached here. + if(index.isPresent()) + return cfs.indexManager.getIndex(index.get()); + + // if no cached index is present, but we've already consulted the index manager + // then no registered index is suitable for this command, so just return null. + if (indexManagerQueried) + return null; + + // do the lookup, set the flag to indicate so and cache the result if not null + Index selected = cfs.indexManager.getBestIndexFor(this); + indexManagerQueried = true; + + if (selected == null) + return null; + + index = Optional.of(selected.getIndexMetadata()); + return selected; } - // maybeTrim removes columns from a response that is too long - public Row maybeTrim(Row row) + /** + * If the index manager for the CFS determines that there's an applicable + * 2i that can be used to execute this command, call its (optional) + * validation method to check that nothing in this command's parameters + * violates the implementation specific validation rules. + */ + public void maybeValidateIndex() { - return row; + Index index = getIndex(Keyspace.openAndGetStore(metadata)); + if (null != index) + index.validate(this); } - public long getTimeout() + /** + * Executes this command on the local host. + * + * @param orderGroup the operation group spanning this command + * + * @return an iterator over the result of executing this command locally. + */ + @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary + // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. + public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup) { - return DatabaseDescriptor.getReadRpcTimeout(); + long startTimeNanos = System.nanoTime(); + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); + Index index = getIndex(cfs); + + Index.Searcher searcher = null; + if (index != null) + { + if (!cfs.indexManager.isIndexQueryable(index)) + throw new IndexNotAvailableException(index); + + searcher = index.searcherFor(this); + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name); + } + + UnfilteredPartitionIterator resultIterator = searcher == null + ? queryStorage(cfs, orderGroup) + : searcher.search(orderGroup); + + try + { + resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + RowFilter updatedFilter = searcher == null + ? rowFilter() + : index.getPostIndexQueryFilter(rowFilter()); + + // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + // would be more efficient (the sooner we discard stuff we know we don't care, the less useless + // processing we do on it). + return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec()); + } + catch (RuntimeException | Error e) + { + resultIterator.close(); + throw e; + } } -} -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + protected abstract void recordLatency(TableMetrics metric, long latencyNanos); + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec()); + } + + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.forCommand(this); + } + + /** + * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. + * This also log warning/trow TombstoneOverwhelmingException if appropriate. + */ + private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) + { + class MetricRecording extends Transformation<UnfilteredRowIterator> + { + private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); + private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); + + private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + + private int liveRows = 0; + private int tombstones = 0; + + private DecoratedKey currentKey; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + currentKey = iter.partitionKey(); + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) + { + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); + } + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) + { + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } + + @Override + public void onClose() + { + recordLatency(metric, System.nanoTime() - startTimeNanos); + + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); + + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); + ClientWarn.instance.warn(msg); + logger.warn(msg); + } + + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + } + }; + + return Transformation.apply(iter, new MetricRecording()); + } + + /** + * Creates a message for this command. + */ + public abstract MessageOut<ReadCommand> createMessage(int version); + + protected abstract void appendCQLWhereClause(StringBuilder sb); + + // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which + // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) + { + final boolean isForThrift = iterator.isForThrift(); + class WithoutPurgeableTombstones extends PurgeFunction + { + public WithoutPurgeableTombstones() + { + super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + } + - protected long getMaxPurgeableTimestamp() ++ protected Predicate<Long> getPurgeEvaluator() + { - return Long.MAX_VALUE; ++ return time -> true; + } + } + return Transformation.apply(iterator, new WithoutPurgeableTombstones()); + } + + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ").append(columnFilter()); + sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(' ').append(limits()); + return sb.toString(); + } + + private static class Serializer implements IVersionedSerializer<ReadCommand> + { + private static int digestFlag(boolean isDigest) + { + return isDigest ? 0x01 : 0; + } + + private static boolean isDigest(int flags) + { + return (flags & 0x01) != 0; + } + + private static int thriftFlag(boolean isForThrift) + { + return isForThrift ? 0x02 : 0; + } + + private static boolean isForThrift(int flags) + { + return (flags & 0x02) != 0; + } + + private static int indexFlag(boolean hasIndex) + { + return hasIndex ? 0x04 : 0; + } + + private static boolean hasIndex(int flags) + { + return (flags & 0x04) != 0; + } + + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version >= MessagingService.VERSION_30; + + out.writeByte(command.kind.ordinal()); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); + if (command.isDigestQuery()) + out.writeUnsignedVInt(command.digestVersion()); + CFMetaData.serializer.serialize(command.metadata(), out, version); + out.writeInt(command.nowInSec()); + ColumnFilter.serializer.serialize(command.columnFilter(), out, version); + RowFilter.serializer.serialize(command.rowFilter(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version); + if (command.index.isPresent()) + IndexMetadata.serializer.serialize(command.index.get(), out, version); + + command.serializeSelection(out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version >= MessagingService.VERSION_30; + + Kind kind = Kind.values()[in.readByte()]; + int flags = in.readByte(); + boolean isDigest = isDigest(flags); + boolean isForThrift = isForThrift(flags); + boolean hasIndex = hasIndex(flags); + int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0; + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + int nowInSec = in.readInt(); + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); + RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); + DataLimits limits = DataLimits.serializer.deserialize(in, version); + Optional<IndexMetadata> index = hasIndex + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + } + + private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + try + { + return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + } + catch (UnknownIndexException e) + { + String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId.toString()); + logger.info(message); + return Optional.empty(); + } + } + + public long serializedSize(ReadCommand command, int version) + { + assert version >= MessagingService.VERSION_30; + + return 2 // kind + flags + + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + + CFMetaData.serializer.serializedSize(command.metadata(), version) + + TypeSizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + + RowFilter.serializer.serializedSize(command.rowFilter(), version) + + DataLimits.serializer.serializedSize(command.limits(), version) + + command.selectionSerializedSize(version) + + command.indexSerializedSize(version); + } + } + + private enum LegacyType + { + GET_BY_NAMES((byte)1), + GET_SLICES((byte)2); + + public final byte serializedValue; + + LegacyType(byte b) + { + this.serializedValue = b; + } + + public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) + { + return kind == ClusteringIndexFilter.Kind.SLICE + ? GET_SLICES + : GET_BY_NAMES; + } + + public static LegacyType fromSerializedValue(byte b) + { + return b == 1 ? GET_BY_NAMES : GET_SLICES; + } + } + + /** + * Serializer for pre-3.0 RangeSliceCommands. + */ + private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> { - out.writeByte(command.commandType.serializedValue); - switch (command.commandType) + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert !rangeCommand.dataRange().isPaging(); + + // convert pre-3.0 incompatible names filters to slice filters + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + // begin DiskAtomFilterSerializer.serialize() + if (rangeCommand.isNamesQuery()) + { + out.writeByte(1); // 0 for slices, 1 for names + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); + } + else + { + out.writeByte(0); // 0 for slices, 1 for names + + // slice filter serialization + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + // limit + DataLimits limits = rangeCommand.limits(); + if (limits.isDistinct()) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (limits.isDistinct() && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + serializeRowFilter(out, rangeCommand.rowFilter()); + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // maxResults + out.writeInt(rangeCommand.limits().count()); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + + // isPaging + out.writeBoolean(false); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + + ClusteringIndexFilter filter; + ColumnFilter selection; + int compositesToGroup = 0; + int perPartitionLimit = -1; + byte readType = in.readByte(); // 0 for slices, 1 for names + if (readType == 1) + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); + selection = selectionAndFilter.left; + filter = selectionAndFilter.right; + } + else + { + Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + filter = p.left; + perPartitionLimit = in.readInt(); + compositesToGroup = in.readInt(); + selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata); + } + + RowFilter rowFilter = deserializeRowFilter(in, metadata); + + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + int maxResults = in.readInt(); + + boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed) + in.readBoolean(); // isPaging (not needed) + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, + // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less + // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use + // that fact. + boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); + else + limits = DataLimits.cqlLimits(maxResults); + + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); + } + + static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); + out.writeInt(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); + expression.operator().writeTo(out); + ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); + } + } + + static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numRowFilters = in.readInt(); + if (numRowFilters == 0) + return RowFilter.NONE; + + RowFilter rowFilter = RowFilter.create(numRowFilters); + for (int i = 0; i < numRowFilters; i++) + { + ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(columnName); + Operator op = Operator.readFrom(in); + ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); + rowFilter.add(column, op, indexValue); + } + return rowFilter; + } + + static long serializedRowFilterSize(RowFilter rowFilter) + { + long size = TypeSizes.sizeof(0); // rowFilterCount + for (RowFilter.Expression expression : rowFilter) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(0); // operator int value + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + return size; + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + CFMetaData metadata = rangeCommand.metadata(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += 1; // single byte flag: 0 for slices, 1 for names + if (rangeCommand.isNamesQuery()) + { + PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); + } + else + { + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + size += TypeSizes.sizeof(0); // compositesToGroup + } + + if (rangeCommand.rowFilter().equals(RowFilter.NONE)) + { + size += TypeSizes.sizeof(0); + } + else + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); + size += TypeSizes.sizeof(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(expression.operator().ordinal()); + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + } + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + size += TypeSizes.sizeof(rangeCommand.limits().count()); + size += TypeSizes.sizeof(!rangeCommand.isForThrift()); + return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); + } + + static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) { - case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(command, out, version); - break; - case GET_SLICES: - SliceFromReadCommand.serializer.serialize(command, out, version); - break; - default: - throw new AssertionError(); + if (!command.dataRange().isNamesQuery()) + return command; + + CFMetaData metadata = command.metadata(); + if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; + ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); + DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); + return new PartitionRangeReadCommand( + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); + } + + static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) + { + // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. + // In that case, we'll basically be querying the first row of the partition, but we must make sure we include + // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise. + if (compositesToGroup == -2) + return ColumnFilter.all(metadata); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + PartitionColumns columns = selectsStatics + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + return ColumnFilter.selectionBuilder().addAll(columns).build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index fbf29e3,e895573..34d093e --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -18,12 -18,10 +18,12 @@@ package org.apache.cassandra.db.compaction; import java.util.*; ++import java.util.function.Predicate; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; +import org.apache.cassandra.db.Memtable; - import org.apache.cassandra.db.lifecycle.SSTableSet; +import com.google.common.collect.Iterables; +import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -194,36 -189,59 +194,52 @@@ public class CompactionController imple } /** - * @return the largest timestamp before which it's okay to drop tombstones for the given partition; - * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed - * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not - * participating in this compaction, or memtable that contains this partition, - * or LONG.MAX_VALUE if no SSTable or memtable exist. + * @param key + * @return a predicate for whether tombstones marked for deletion at the given time for the given partition are + * purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables + * containing his partition and not participating in the compaction. This means there isn't any data in those + * sstables that might still need to be suppressed by a tombstone at this timestamp. */ - public long maxPurgeableTimestamp(DecoratedKey key) + public Predicate<Long> getPurgeEvaluator(DecoratedKey key) { - if (NEVER_PURGE_TOMBSTONES) - return Predicates.alwaysFalse(); + if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES) - return Long.MIN_VALUE; ++ return time -> false; - long min = Long.MAX_VALUE; overlapIterator.update(key); - for (SSTableReader sstable : overlapIterator.overlaps()) + Set<SSTableReader> filteredSSTables = overlapIterator.overlaps(); + Iterable<Memtable> memtables = cfs.getTracker().getView().getAllMemtables(); + long minTimestampSeen = Long.MAX_VALUE; + boolean hasTimestamp = false; + - for (SSTableReader sstable: filteredSSTables) ++ for (SSTableReader sstable : filteredSSTables) { // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), // we check index file instead. - if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null) - min = Math.min(min, sstable.getMinTimestamp()); - else if (sstable.getBloomFilter().isPresent(key)) - min = Math.min(min, sstable.getMinTimestamp()); + if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null + || sstable.getBloomFilter().isPresent(key)) + { + minTimestampSeen = Math.min(minTimestampSeen, sstable.getMinTimestamp()); + hasTimestamp = true; + } - } - for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) + for (Memtable memtable : memtables) { - ColumnFamily cf = memtable.getColumnFamily(key); - if (cf != null) + Partition partition = memtable.getPartition(key); + if (partition != null) - min = Math.min(min, partition.stats().minTimestamp); + { - minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp()); ++ minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp); + hasTimestamp = true; + } + } + + if (!hasTimestamp) - return Predicates.alwaysTrue(); ++ return time -> true; + else + { + final long finalTimestamp = minTimestampSeen; - return new Predicate<Long>() - { - public boolean apply(Long time) - { - return time < finalTimestamp; - } - }; ++ return time -> time < finalTimestamp; } - return min; } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index f8f620c,0000000..9f0984f mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@@ -1,309 -1,0 +1,309 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.List; +import java.util.UUID; ++import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PurgeFunction; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.transactions.CompactionTransaction; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.metrics.CompactionMetrics; + +/** + * Merge multiple iterators over the content of sstable into a "compacted" iterator. + * <p> + * On top of the actual merging the source iterators, this class: + * <ul> + * <li>purge gc-able tombstones if possible (see PurgeIterator below).</li> + * <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are + * not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency + * on reads. This however mean that potentially obsolete index entries could be kept a long time for + * data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly + * an optimization).</li> + * <li>invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with + * only purgable tombstones in the row cache.</li> + * <li>keep tracks of the compaction progress.</li> + * </ul> + */ +public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class); + private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100; + + private final OperationType type; + private final CompactionController controller; + private final List<ISSTableScanner> scanners; + private final int nowInSec; + private final UUID compactionId; + + private final long totalBytes; + private long bytesRead; + + /* + * counters for merged rows. + * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row), + * index 1 is counter for 2 rows merged, and so on. + */ + private final long[] mergeCounters; + + private final UnfilteredPartitionIterator compacted; + private final CompactionMetrics metrics; + + public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId) + { + this(type, scanners, controller, nowInSec, compactionId, null); + } + + @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable + public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics) + { + this.controller = controller; + this.type = type; + this.scanners = scanners; + this.nowInSec = nowInSec; + this.compactionId = compactionId; + this.bytesRead = 0; + + long bytes = 0; + for (ISSTableScanner scanner : scanners) + bytes += scanner.getLengthInBytes(); + this.totalBytes = bytes; + this.mergeCounters = new long[scanners.size()]; + this.metrics = metrics; + + if (metrics != null) + metrics.beginCompaction(this); + + UnfilteredPartitionIterator merged = scanners.isEmpty() + ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false) + : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); + boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug + this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec)); + } + + public boolean isForThrift() + { + return false; + } + + public CFMetaData metadata() + { + return controller.cfs.metadata; + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(controller.cfs.metadata, + type, + bytesRead, + totalBytes, + compactionId); + } + + private void updateCounterFor(int rows) + { + assert rows > 0 && rows - 1 < mergeCounters.length; + mergeCounters[rows - 1] += 1; + } + + public long[] getMergedRowCounts() + { + return mergeCounters; + } + + private UnfilteredPartitionIterators.MergeListener listener() + { + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + int merged = 0; + for (UnfilteredRowIterator iter : versions) + { + if (iter != null) + merged++; + } + + assert merged > 0; + + CompactionIterator.this.updateCounterFor(merged); + + if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes()) + return null; + + Columns statics = Columns.NONE; + Columns regulars = Columns.NONE; + for (UnfilteredRowIterator iter : versions) + { + if (iter != null) + { + statics = statics.mergeTo(iter.columns().statics); + regulars = regulars.mergeTo(iter.columns().regulars); + } + } + final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars); + + // If we have a 2ndary index, we must update it with deleted/shadowed cells. + // we can reuse a single CleanupTransaction for the duration of a partition. + // Currently, it doesn't do any batching of row updates, so every merge event + // for a single partition results in a fresh cycle of: + // * Get new Indexer instances + // * Indexer::start + // * Indexer::onRowMerge (for every row being merged by the compaction) + // * Indexer::commit + // A new OpOrder.Group is opened in an ARM block wrapping the commits + // TODO: this should probably be done asynchronously and batched. + final CompactionTransaction indexTransaction = + controller.cfs.indexManager.newCompactionTransaction(partitionKey, + partitionColumns, + versions.size(), + nowInSec); + + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + } + + public void onMergedRows(Row merged, Row[] versions) + { + indexTransaction.start(); + indexTransaction.onRowMerge(merged, versions); + indexTransaction.commit(); + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions) + { + } + + public void close() + { + } + }; + } + + public void close() + { + } + }; + } + + private void updateBytesRead() + { + long n = 0; + for (ISSTableScanner scanner : scanners) + n += scanner.getCurrentPosition(); + bytesRead = n; + } + + public boolean hasNext() + { + return compacted.hasNext(); + } + + public UnfilteredRowIterator next() + { + return compacted.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + try + { + compacted.close(); + } + finally + { + if (metrics != null) + metrics.finishCompaction(this); + } + } + + public String toString() + { + return this.getCompactionInfo().toString(); + } + + private class Purger extends PurgeFunction + { + private final CompactionController controller; + + private DecoratedKey currentKey; - private long maxPurgeableTimestamp; - private boolean hasCalculatedMaxPurgeableTimestamp; ++ private Predicate<Long> purgeEvaluator; + + private long compactedUnfiltered; + + private Purger(boolean isForThrift, CompactionController controller, int nowInSec) + { + super(isForThrift, nowInSec, controller.gcBefore, controller.compactingRepaired() ? Integer.MAX_VALUE : Integer.MIN_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + this.controller = controller; + } + + @Override + protected void onEmptyPartitionPostPurge(DecoratedKey key) + { + if (type == OperationType.COMPACTION) + controller.cfs.invalidateCachedPartition(key); + } + + @Override + protected void onNewPartition(DecoratedKey key) + { + currentKey = key; - hasCalculatedMaxPurgeableTimestamp = false; ++ purgeEvaluator = null; + } + + @Override + protected void updateProgress() + { + if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0) + updateBytesRead(); + } + + /* - * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable - * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily - * on demand as we only need this if there is tombstones and this a bit expensive (see #8914). ++ * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum ++ * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction. ++ * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive ++ * (see #8914). + */ - protected long getMaxPurgeableTimestamp() ++ protected Predicate<Long> getPurgeEvaluator() + { - if (!hasCalculatedMaxPurgeableTimestamp) ++ if (purgeEvaluator == null) + { - hasCalculatedMaxPurgeableTimestamp = true; - maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey); ++ purgeEvaluator = controller.getPurgeEvaluator(currentKey); + } - return maxPurgeableTimestamp; ++ return purgeEvaluator; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 478b896,8a3c11e..a77cefb --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -22,6 -22,6 +22,7 @@@ import java.io.IOException import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; ++import java.util.function.Predicate; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; @@@ -1431,7 -1434,7 +1432,7 @@@ public class CompactionManager implemen * a tombstone that could shadow a column in another sstable, but this is doubly not a concern * since validation compaction is read-only. */ - return Long.MAX_VALUE; - return Predicates.alwaysTrue(); ++ return time -> true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index 3655a37,6b302d2..fce8c2e --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@@ -18,6 -18,9 +18,7 @@@ package org.apache.cassandra.db.compaction; import java.util.*; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; ++import java.util.function.Predicate; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; @@@ -97,9 -97,9 +98,9 @@@ public class SSTableSplitter } @Override - public long maxPurgeableTimestamp(DecoratedKey key) + public Predicate<Long> getPurgeEvaluator(DecoratedKey key) { - return Long.MIN_VALUE; - return Predicates.alwaysFalse(); ++ return time -> false; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java index 822bb85,d6ef60e..77831a7 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@@ -19,10 -19,12 +19,11 @@@ package org.apache.cassandra.db.compact import java.io.File; import java.util.*; ++import java.util.function.Predicate; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@@ -111,9 -120,9 +112,9 @@@ public class Upgrade } @Override - public long maxPurgeableTimestamp(DecoratedKey key) + public Predicate<Long> getPurgeEvaluator(DecoratedKey key) { - return Long.MIN_VALUE; - return Predicates.alwaysFalse(); ++ return time -> false; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Verifier.java index ce04ad3,42302fe..88bc3a7 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@@ -42,6 -43,6 +42,7 @@@ import java.io.IOError import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; ++import java.util.function.Predicate; public class Verifier implements Closeable { @@@ -281,9 -278,9 +282,9 @@@ } @Override - public long maxPurgeableTimestamp(DecoratedKey key) + public Predicate<Long> getPurgeEvaluator(DecoratedKey key) { - return Long.MIN_VALUE; - return Predicates.alwaysFalse(); ++ return time -> false; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/partitions/PurgeFunction.java index d3255d3,0000000..6679bdf mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@@ -1,125 -1,0 +1,127 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + ++import java.util.function.Predicate; ++ +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; + +public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator> +{ + private final boolean isForThrift; + private final DeletionPurger purger; + private final int nowInSec; + private boolean isReverseOrder; + + public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones) + { + this.isForThrift = isForThrift; + this.nowInSec = nowInSec; + this.purger = (timestamp, localDeletionTime) -> + !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone) + && localDeletionTime < gcBefore - && timestamp < getMaxPurgeableTimestamp(); ++ && getPurgeEvaluator().test(timestamp); + } + - protected abstract long getMaxPurgeableTimestamp(); ++ protected abstract Predicate<Long> getPurgeEvaluator(); + + // Called at the beginning of each new partition + protected void onNewPartition(DecoratedKey partitionKey) + { + } + + // Called for each partition that had only purged infos and are empty post-purge. + protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey) + { + } + + // Called for every unfiltered. Meant for CompactionIterator to update progress + protected void updateProgress() + { + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + onNewPartition(partition.partitionKey()); + + isReverseOrder = partition.isReverseOrder(); + UnfilteredRowIterator purged = Transformation.apply(partition, this); + if (!isForThrift && purged.isEmpty()) + { + onEmptyPartitionPostPurge(purged.partitionKey()); + purged.close(); + return null; + } + + return purged; + } + + @Override + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime; + } + + @Override + protected Row applyToStatic(Row row) + { + updateProgress(); + return row.purge(purger, nowInSec); + } + + @Override + protected Row applyToRow(Row row) + { + updateProgress(); + return row.purge(purger, nowInSec); + } + + @Override + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + updateProgress(); + boolean reversed = isReverseOrder; + if (marker.isBoundary()) + { + // We can only skip the whole marker if both deletion time are purgeable. + // If only one of them is, filterTombstoneMarker will deal with it. + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; + boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed)); + boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed)); + + if (shouldPurgeClose) + { + if (shouldPurgeOpen) + return null; + + return boundary.createCorrespondingOpenMarker(reversed); + } + + return shouldPurgeOpen + ? boundary.createCorrespondingCloseMarker(reversed) + : marker; + } + else + { + return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index e781716,3184159..1b400e8 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@@ -20,7 -20,8 +20,8 @@@ package org.apache.cassandra.db.compact import java.nio.ByteBuffer; import java.util.Set; ++import java.util.function.Predicate; -import com.google.common.base.Predicate; import com.google.common.collect.Sets; import org.junit.BeforeClass; import org.junit.Test; @@@ -41,7 -40,10 +42,9 @@@ import org.apache.cassandra.schema.Keys import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.Util.cellname; import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; public class CompactionControllerTest extends SchemaLoader @@@ -87,26 -83,26 +90,26 @@@ // check max purgeable timestamp without any sstables try(CompactionController controller = new CompactionController(cfs, null, 0)) { - assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only + assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp1); //memtable only cfs.forceBlockingFlush(); - assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables - assertTrue(controller.getPurgeEvaluator(key).apply(Long.MAX_VALUE)); //no memtables and no sstables ++ assertTrue(controller.getPurgeEvaluator(key).test(Long.MAX_VALUE)); //no memtables and no sstables } - Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting + Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting // create another sstable - applyMutation(CF1, rowKey, timestamp2); + applyMutation(cfs.metadata, key, timestamp2); cfs.forceBlockingFlush(); // check max purgeable timestamp when compacting the first sstable with and without a memtable try (CompactionController controller = new CompactionController(cfs, compacting, 0)) { - assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only + assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp2); - applyMutation(CF1, rowKey, timestamp3); + applyMutation(cfs.metadata, key, timestamp3); - assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable + assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //second sstable and second memtable } // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable @@@ -115,11 -111,11 +118,11 @@@ //newest to oldest try (CompactionController controller = new CompactionController(cfs, null, 0)) { - applyMutation(CF1, rowKey, timestamp1); - applyMutation(CF1, rowKey, timestamp2); - applyMutation(CF1, rowKey, timestamp3); + applyMutation(cfs.metadata, key, timestamp1); + applyMutation(cfs.metadata, key, timestamp2); + applyMutation(cfs.metadata, key, timestamp3); - assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //memtable only } cfs.forceBlockingFlush(); @@@ -127,11 -123,11 +130,11 @@@ //oldest to newest try (CompactionController controller = new CompactionController(cfs, null, 0)) { - applyMutation(CF1, rowKey, timestamp3); - applyMutation(CF1, rowKey, timestamp2); - applyMutation(CF1, rowKey, timestamp1); + applyMutation(cfs.metadata, key, timestamp3); + applyMutation(cfs.metadata, key, timestamp2); + applyMutation(cfs.metadata, key, timestamp1); - assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); } } @@@ -176,20 -172,26 +179,26 @@@ assertEquals(0, expired.size()); } - private void applyMutation(String cf, ByteBuffer rowKey, long timestamp) + private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp) { - CellName colName = cellname("birthdate"); ByteBuffer val = ByteBufferUtil.bytes(1L); - Mutation rm = new Mutation(KEYSPACE, rowKey); - rm.add(cf, colName, val, timestamp); - rm.applyUnsafe(); + new RowUpdateBuilder(cfm, timestamp, key) + .clustering("ck") + .add("val", val) + .build() + .applyUnsafe(); } - private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp) + private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp) { - Mutation rm = new Mutation(KEYSPACE, rowKey); - rm.delete(cf, timestamp); - rm.applyUnsafe(); + new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds())) + .applyUnsafe(); } + + private void assertPurgeBoundary(Predicate<Long> evaluator, long boundary) + { - assertFalse(evaluator.apply(boundary)); - assertTrue(evaluator.apply(boundary - 1)); ++ assertFalse(evaluator.test(boundary)); ++ assertTrue(evaluator.test(boundary - 1)); + } }