Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88d2ac4f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88d2ac4f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88d2ac4f Branch: refs/heads/cassandra-3.0 Commit: 88d2ac4f2fadba44a9b72286ef924441014a97ba Parents: 7de853b b08843d Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Jul 14 17:14:38 2017 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Jul 14 17:26:34 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 7 +- .../apache/cassandra/db/ColumnFamilyStore.java | 3 +- src/java/org/apache/cassandra/db/DataRange.java | 5 + .../cassandra/db/PartitionRangeReadCommand.java | 7 +- .../org/apache/cassandra/db/ReadCommand.java | 2 +- src/java/org/apache/cassandra/db/ReadQuery.java | 12 + .../db/SinglePartitionReadCommand.java | 21 +- .../apache/cassandra/db/filter/DataLimits.java | 63 +++-- .../apache/cassandra/db/filter/RowFilter.java | 15 ++ .../apache/cassandra/service/CacheService.java | 2 +- .../apache/cassandra/service/DataResolver.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 8 +- .../service/pager/AbstractQueryPager.java | 2 +- .../service/pager/MultiPartitionPager.java | 9 +- .../cassandra/service/pager/QueryPagers.java | 2 +- .../org/apache/cassandra/cql3/CQLTester.java | 8 +- .../validation/operations/SelectLimitTest.java | 256 ++++++++++++++++++- .../db/rows/UnfilteredRowIteratorsTest.java | 10 +- 18 files changed, 382 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index fffda7f,bda510f..4a823c9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,65 -1,18 +1,66 @@@ -2.2.11 +3.0.15 + * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482) + * Set test.runners based on cores and memory size (CASSANDRA-13078) + * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557) + * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) + * Fix secondary index queries on COMPACT tables (CASSANDRA-13627) + * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568) + Merged from 2.2: - * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) - * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) - * Fix nested Tuples/UDTs validation (CASSANDRA-13646) + * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223) + * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) + * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) + * Fix nested Tuples/UDTs validation (CASSANDRA-13646) - * Remove unused max_value_size_in_mb config setting from yaml (CASSANDRA-13625 - -2.2.10 +3.0.14 + * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172) + * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004) + * Failed unregistering mbean during drop keyspace (CASSANDRA-13346) + * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542) + * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120) + * Fix schema digest mismatch during rolling upgrades from versions before 3.0.12 (CASSANDRA-13559) + * Upgrade JNA version to 4.4.0 (CASSANDRA-13072) + * Interned ColumnIdentifiers should use minimal ByteBuffers (CASSANDRA-13533) + * ReverseIndexedReader may drop rows during 2.1 to 3.0 upgrade (CASSANDRA-13525) + * Fix repair process violating start/end token limits for small ranges (CASSANDRA-13052) + * Add storage port options to sstableloader (CASSANDRA-13518) + * Properly handle quoted index names in cqlsh DESCRIBE output (CASSANDRA-12847) + * Avoid reading static row twice from old format sstables (CASSANDRA-13236) + * Fix NPE in StorageService.excise() (CASSANDRA-13163) + * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265) + * Fail repair if insufficient responses received (CASSANDRA-13397) + * Fix SSTableLoader fail when the loaded table contains dropped columns (CASSANDRA-13276) + * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427) + * Handling partially written hint files (CASSANDRA-12728) + * Interrupt replaying hints on decommission (CASSANDRA-13308) + * Fix schema version calculation for rolling upgrades (CASSANDRA-13441) +Merged from 2.2: * Nodes started with join_ring=False should be able to serve requests when authentication is enabled (CASSANDRA-11381) * cqlsh COPY FROM: increment error count only for failures, not for attempts (CASSANDRA-13209) - * nodetool upgradesstables should upgrade system tables (CASSANDRA-13119) + +3.0.13 + * Make reading of range tombstones more reliable (CASSANDRA-12811) + * Fix startup problems due to schema tables not completely flushed (CASSANDRA-12213) + * Fix view builder bug that can filter out data on restart (CASSANDRA-13405) + * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400) + * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395) + * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020) + * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389) + * Legacy deserializer can create empty range tombstones (CASSANDRA-13341) + * Use the Kernel32 library to retrieve the PID on Windows and fix startup checks (CASSANDRA-13333) + * Fix code to not exchange schema across major versions (CASSANDRA-13274) + * Dropping column results in "corrupt" SSTable (CASSANDRA-13337) + * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340) + * Fix CONTAINS filtering for null collections (CASSANDRA-13246) + * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216) + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320) + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305) + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238) + * Legacy caching options can prevent 3.0 upgrade (CASSANDRA-13384) + * Nodetool upgradesstables/scrub/compact ignores system tables (CASSANDRA-13410) + * Fix NPE issue in StorageService (CASSANDRA-13060) +Merged from 2.2: * Avoid starting gossiper in RemoveTest (CASSANDRA-13407) * Fix weightedSize() for row-cache reported by JMX and NodeTool (CASSANDRA-13393) - * Fix JVM metric paths (CASSANDRA-13103) * Honor truststore-password parameter in cassandra-stress (CASSANDRA-12773) * Discard in-flight shadow round responses (CASSANDRA-12653) * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153) http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index e82f1d3,2e52eb2..f720330 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1461,37 -1712,284 +1461,38 @@@ public class ColumnFamilyStore implemen return data.getUncompacting(); } - public ColumnFamily getColumnFamily(DecoratedKey key, - Composite start, - Composite finish, - boolean reversed, - int limit, - long timestamp) + public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec) { - return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp)); + // We can use the cached value only if we know that no data it doesn't contain could be covered + // by the query filter, that is if: + // 1) either the whole partition is cached + // 2) or we can ensure than any data the filter selects is in the cached partition + + // We can guarantee that a partition is fully cached if the number of rows it contains is less than + // what we're caching. Wen doing that, we should be careful about expiring cells: we should count + // something expired that wasn't when the partition was cached, or we could decide that the whole + // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows. + if (cached.cachedLiveRows() < metadata.params.caching.rowsPerPartitionToCache()) + return true; + + // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that + // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached + // partition has more live rows that queried (where live rows refers to the rows that are live now), + // or if we can prove that everything the filter selects is in the cached partition based on its content. - return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec)) || filter.isFullyCoveredBy(cached); ++ return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec, filter.selectsAllPartition())) ++ || filter.isFullyCoveredBy(cached); } - /** - * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it - * - * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk - * - * If row is not cached, we figure out what filter is "biggest", read that from disk, then - * filter the result and either cache that or return it. - * - * @param cfId the column family to read the row from - * @param filter the columns being queried. - * @return the requested data for the filter provided - */ - private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter) + public int gcBefore(int nowInSec) { - assert isRowCacheEnabled() - : String.format("Row cache is not enabled on table [" + name + "]"); - - RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key); + return nowInSec - metadata.params.gcGraceSeconds; + } - // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our - // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 - // TODO: don't evict entire rows on writes (#2864) - IRowCacheEntry cached = CacheService.instance.rowCache.get(key); - if (cached != null) - { - if (cached instanceof RowCacheSentinel) - { - // Some other read is trying to cache the value, just do a normal non-caching read - Tracing.trace("Row cache miss (race)"); - metric.rowCacheMiss.inc(); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - - ColumnFamily cachedCf = (ColumnFamily)cached; - if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp)) - { - metric.rowCacheHit.inc(); - Tracing.trace("Row cache hit"); - ColumnFamily result = filterColumnFamily(cachedCf, filter); - metric.updateSSTableIterated(0); - return result; - } - - metric.rowCacheHitOutOfRange.inc(); - Tracing.trace("Ignoring row cache as cached value could not satisfy query"); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - - metric.rowCacheMiss.inc(); - Tracing.trace("Row cache miss"); - RowCacheSentinel sentinel = new RowCacheSentinel(); - boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); - ColumnFamily data = null; - ColumnFamily toCache = null; - try - { - // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing - if (metadata.getCaching().rowCache.cacheFullPartitions()) - { - data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE); - toCache = data; - Tracing.trace("Populating row cache with the whole partition"); - if (sentinelSuccess && toCache != null) - CacheService.instance.rowCache.replace(key, sentinel, toCache); - return filterColumnFamily(data, filter); - } - - // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query - // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said - // filter so we can populate the cache but only if: - // 1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user. - // 2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the - // amount of extra work we'll do on a user query for the purpose of populating the cache). - // - // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the - // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be - // bogus to compare the filter count to the 'rows to cache' otherwise). - if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator)) - { - SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter; - int rowsToCache = metadata.getCaching().rowCache.rowsToCache; - - SliceQueryFilter cacheSlice = readFilterForCache(); - QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp); - - // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the - // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what - // needs to be cached afterwards. - if (sliceFilter.count < rowsToCache) - { - toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); - if (toCache != null) - { - Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted()); - data = filterColumnFamily(toCache, filter); - } - } - else - { - data = getTopLevelColumns(filter, Integer.MIN_VALUE); - if (data != null) - { - // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty - // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it - // (otherwise a cache hit would assume the whole partition is cached which is not the case). - if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache) - { - toCache = filterColumnFamily(data, cacheFilter); - Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count); - } - else - { - Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache); - } - } - } - - if (sentinelSuccess && toCache != null) - CacheService.instance.rowCache.replace(key, sentinel, toCache); - return data; - } - else - { - Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); - return getTopLevelColumns(filter, Integer.MIN_VALUE); - } - } - finally - { - if (sentinelSuccess && toCache == null) - invalidateCachedRow(key); - } - } - - public SliceQueryFilter readFilterForCache() - { - // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable. - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size()); - } - - public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now) - { - // We can use the cached value only if we know that no data it doesn't contain could be covered - // by the query filter, that is if: - // 1) either the whole partition is cached - // 2) or we can ensure than any data the filter selects are in the cached partition - - // When counting rows to decide if the whole row is cached, we should be careful with expiring - // columns: if we use a timestamp newer than the one that was used when populating the cache, we might - // end up deciding the whole partition is cached when it's really not (just some rows expired since the - // cf was cached). This is the reason for Integer.MIN_VALUE below. - boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache; - - // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the - // timestamp of the query into account when dealing with expired columns. Otherwise, we could think - // the cached partition has enough live rows to satisfy the filter when it doesn't because some - // are now expired. - return wholePartitionCached || filter.isFullyCoveredBy(cachedCf, now); - } - - public int gcBefore(long now) - { - return (int) (now / 1000) - metadata.getGcGraceSeconds(); - } - - /** - * get a list of columns starting from a given column, in a specified order. - * only the latest version of a column is returned. - * @return null if there is no data and no tombstones; otherwise a ColumnFamily - */ - public ColumnFamily getColumnFamily(QueryFilter filter) - { - assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName(); - - ColumnFamily result = null; - - long start = System.nanoTime(); - try - { - int gcBefore = gcBefore(filter.timestamp); - if (isRowCacheEnabled()) - { - assert !isIndex(); // CASSANDRA-5732 - UUID cfId = metadata.cfId; - - ColumnFamily cached = getThroughCache(cfId, filter); - if (cached == null) - { - logger.trace("cached row is empty"); - return null; - } - - result = cached; - } - else - { - ColumnFamily cf = getTopLevelColumns(filter, gcBefore); - - if (cf == null) - return null; - - result = removeDeletedCF(cf, gcBefore); - } - - removeDroppedColumns(result); - - if (filter.filter instanceof SliceQueryFilter) - { - // Log the number of tombstones scanned on single key queries - metric.tombstoneScannedHistogram.update(((SliceQueryFilter) filter.filter).lastTombstones()); - metric.liveScannedHistogram.update(((SliceQueryFilter) filter.filter).lastLive()); - } - } - finally - { - metric.readLatency.addNano(System.nanoTime() - start); - } - - return result; - } - - /** - * Filter a cached row, which will not be modified by the filter, but may be modified by throwing out - * tombstones that are no longer relevant. - * The returned column family won't be thread safe. - */ - ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter) - { - if (cached == null) - return null; - - ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed()); - int gcBefore = gcBefore(filter.timestamp); - filter.collateOnDiskAtom(cf, filter.getIterator(cached), gcBefore); - return removeDeletedCF(cf, gcBefore); - } - - public Set<SSTableReader> getUnrepairedSSTables() - { - Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables()); - Iterator<SSTableReader> sstableIterator = unRepairedSSTables.iterator(); - while(sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); - if (sstable.isRepaired()) - sstableIterator.remove(); - } - return unRepairedSSTables; - } - - public Set<SSTableReader> getRepairedSSTables() - { - Set<SSTableReader> repairedSSTables = new HashSet<>(getSSTables()); - Iterator<SSTableReader> sstableIterator = repairedSSTables.iterator(); - while(sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); - if (!sstable.isRepaired()) - sstableIterator.remove(); - } - return repairedSSTables; - } - - @SuppressWarnings("resource") - public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter) - { - long failingSince = -1L; - while (true) + @SuppressWarnings("resource") + public RefViewFragment selectAndReference(Function<View, Iterable<SSTableReader>> filter) + { + long failingSince = -1L; + while (true) { ViewFragment view = select(filter); Refs<SSTableReader> refs = Refs.tryRef(view.sstables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/DataRange.java index ffe041e,1e6f8c8..d2f9c76 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@@ -179,30 -119,16 +179,35 @@@ public class DataRang return keyRange.contains(pos); } - public int getLiveCount(ColumnFamily data, long now) + /** + * Whether this {@code DataRange} queries everything (has no restriction neither on the + * partition queried, nor within the queried partition). + * + * @return Whether this {@code DataRange} queries everything. + */ + public boolean isUnrestricted() + { + return startKey().isMinimum() && stopKey().isMinimum() && clusteringIndexFilter.selectsAllPartition(); ++ } ++ ++ public boolean selectsAllPartition() + { - return columnFilter instanceof SliceQueryFilter - ? ((SliceQueryFilter)columnFilter).lastCounted() - : columnFilter.getLiveCount(data, now); ++ return clusteringIndexFilter.selectsAllPartition(); } - public boolean selectsFullRowFor(ByteBuffer rowKey) + /** + * The clustering index filter to use for the provided key. + * <p> + * This may or may not be the same filter for all keys (that is, paging range + * use a different filter for their start key). + * + * @param key the partition key for which we want the clustering index filter. + * + * @return the clustering filter to use for {@code key}. + */ + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) { - return selectFullRow; + return clusteringIndexFilter; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 1cf332d,0000000..617e2f5 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -1,341 -1,0 +1,346 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.*; - import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A read command that selects a (part of a) range of partitions. + */ +public class PartitionRangeReadCommand extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DataRange dataRange; + private int oldestUnrepairedTombstone = Integer.MAX_VALUE; + + public PartitionRangeReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + Optional<IndexMetadata> index) + { + super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + this.dataRange = dataRange; + this.index = index; + } + + public PartitionRangeReadCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + Optional<IndexMetadata> index) + { + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index); + } + + /** + * Creates a new read command that query all the data in the table. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * + * @return a newly created read command that queries everything in the table. + */ + public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec) + { + return new PartitionRangeReadCommand(metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + DataRange.allData(metadata.partitioner), + Optional.empty()); + } + + public DataRange dataRange() + { + return dataRange; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return dataRange.clusteringIndexFilter(key); + } + + public boolean isNamesQuery() + { + return dataRange.isNamesQuery(); + } + + public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) + { + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index); + } + + public PartitionRangeReadCommand copy() + { + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); + } + + public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) + { + return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); + } + + public long getTimeout() + { + return DatabaseDescriptor.getRangeRpcTimeout(); + } + + public boolean selectsKey(DecoratedKey key) + { + if (!dataRange().contains(key)) + return false; + + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator()); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + if (!dataRange().clusteringIndexFilter(key).selects(clustering)) + return false; + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.getRangeSlice(this, consistency); + } + + public QueryPager getPager(PagingState pagingState, int protocolVersion) + { + return new PartitionRangeQueryPager(this, pagingState, protocolVersion); + } + + protected void recordLatency(TableMetrics metric, long latencyNanos) + { + metric.rangeLatency.addNano(latencyNanos); + } + + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); + Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); + + // fetch data from current memtable, historical memtables, and SSTables in the correct order. + final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); + + try + { + for (Memtable memtable : view.memtables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift()); + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime()); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + } + + SSTableReadsListener readCountUpdater = newReadCountUpdater(); + for (SSTableReader sstable : view.sstables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift(), readCountUpdater); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + } + return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift()) + : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + + throw e; + } + } + + /** + * Creates a new {@code SSTableReadsListener} to update the SSTables read counts. + * @return a new {@code SSTableReadsListener} to update the SSTables read counts. + */ + private static SSTableReadsListener newReadCountUpdater() + { + return new SSTableReadsListener() + { + @Override + public void onScanningStarted(SSTableReader sstable) + { + sstable.incrementReadCount(); + } + }; + } + + @Override + protected int oldestUnrepairedTombstone() + { + return oldestUnrepairedTombstone; + } + + private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs) + { + class CacheFilter extends Transformation + { + @Override + public BaseRowIterator applyToPartition(BaseRowIterator iter) + { + // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done + // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage. + DecoratedKey dk = iter.partitionKey(); + + // Check if this partition is in the rowCache and if it is, if it covers our filter + CachedPartition cached = cfs.getRawCachedPartition(dk); + ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk); + + if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec())) + { + // We won't use 'iter' so close it now. + iter.close(); + + return filter.getUnfilteredRowIterator(columnFilter(), cached); + } + + return iter; + } + } + return Transformation.apply(iter, new CacheFilter()); + } + + public MessageOut<ReadCommand> createMessage(int version) + { + return dataRange().isPaging() + ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer) + : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer); + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + if (dataRange.isUnrestricted() && rowFilter().isEmpty()) + return; + + sb.append(" WHERE "); + // We put the row filter first because the data range can end by "ORDER BY" + if (!rowFilter().isEmpty()) + { + sb.append(rowFilter()); + if (!dataRange.isUnrestricted()) + sb.append(" AND "); + } + if (!dataRange.isUnrestricted()) + sb.append(dataRange.toCQLString(metadata())); + } + + /** + * Allow to post-process the result of the query after it has been reconciled on the coordinator + * but before it is passed to the CQL layer to return the ResultSet. + * + * See CASSANDRA-8717 for why this exists. + */ + public PartitionIterator postReconciliationProcessing(PartitionIterator result) + { + ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName); + Index index = getIndex(cfs); + return index == null ? result : index.postProcessorFor(this).apply(result, this); + } + + @Override ++ public boolean selectsFullPartition() ++ { ++ return dataRange.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns(); ++ } ++ ++ @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + dataRange().toString(metadata())); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + DataRange.serializer.serialize(dataRange(), out, version, metadata()); + } + + protected long selectionSerializedSize(int version) + { + return DataRange.serializer.serializedSize(dataRange(), version, metadata()); + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + throws IOException + { + DataRange range = DataRange.serializer.deserialize(in, version, metadata); + return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 64da428,cd86336..76180cc --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -264,699 -95,55 +264,699 @@@ public abstract class ReadCommand imple return this; } - public String getColumnFamilyName() + /** + * Sets the digest version, for when digest for that command is requested. + * <p> + * Note that we allow setting this independently of setting the command as a digest query as + * this allows us to use the command as a carrier of the digest version even if we only call + * setIsDigestQuery on some copy of it. + * + * @param digestVersion the version for the digest is this command is used for digest query.. + * @return this read command. + */ + public ReadCommand setDigestVersion(int digestVersion) { - return cfName; + this.digestVersion = digestVersion; + return this; } + /** + * Whether this query is for thrift or not. + * + * @return whether this query is for thrift. + */ + public boolean isForThrift() + { + return isForThrift; + } + + /** + * The clustering index filter this command to use for the provided key. + * <p> + * Note that that method should only be called on a key actually queried by this command + * and in practice, this will almost always return the same filter, but for the sake of + * paging, the filter on the first key of a range command might be slightly different. + * + * @param key a partition key queried by this command. + * + * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. + */ + public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); + + /** + * Returns a copy of this command. + * + * @return a copy of this command. + */ public abstract ReadCommand copy(); - public abstract Row getRow(Keyspace keyspace); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); + + protected abstract int oldestUnrepairedTombstone(); - public abstract IDiskAtomFilter filter(); + public ReadResponse createResponse(UnfilteredPartitionIterator iterator) + { + return isDigestQuery() + ? ReadResponse.createDigestResponse(iterator, this) + : ReadResponse.createDataResponse(iterator, this); + } - public String getKeyspace() + public long indexSerializedSize(int version) { - return ksName; + if (index.isPresent()) + return IndexMetadata.serializer.serializedSize(index.get(), version); + else + return 0; } - // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) + public Index getIndex(ColumnFamilyStore cfs) { - return null; + // if we've already consulted the index manager, and it returned a valid index + // the result should be cached here. + if(index.isPresent()) + return cfs.indexManager.getIndex(index.get()); + + // if no cached index is present, but we've already consulted the index manager + // then no registered index is suitable for this command, so just return null. + if (indexManagerQueried) + return null; + + // do the lookup, set the flag to indicate so and cache the result if not null + Index selected = cfs.indexManager.getBestIndexFor(this); + indexManagerQueried = true; + + if (selected == null) + return null; + + index = Optional.of(selected.getIndexMetadata()); + return selected; } - // maybeTrim removes columns from a response that is too long - public Row maybeTrim(Row row) + /** + * If the index manager for the CFS determines that there's an applicable + * 2i that can be used to execute this command, call its (optional) + * validation method to check that nothing in this command's parameters + * violates the implementation specific validation rules. + */ + public void maybeValidateIndex() { - return row; + Index index = getIndex(Keyspace.openAndGetStore(metadata)); + if (null != index) + index.validate(this); } - public long getTimeout() + /** + * Executes this command on the local host. + * + * @param orderGroup the operation group spanning this command + * + * @return an iterator over the result of executing this command locally. + */ + @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary + // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. + public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup) { - return DatabaseDescriptor.getReadRpcTimeout(); + long startTimeNanos = System.nanoTime(); + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); + Index index = getIndex(cfs); + + Index.Searcher searcher = null; + if (index != null) + { + if (!cfs.indexManager.isIndexQueryable(index)) + throw new IndexNotAvailableException(index); + + searcher = index.searcherFor(this); + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name); + } + + UnfilteredPartitionIterator resultIterator = searcher == null + ? queryStorage(cfs, orderGroup) + : searcher.search(orderGroup); + + try + { + resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + RowFilter updatedFilter = searcher == null + ? rowFilter() + : index.getPostIndexQueryFilter(rowFilter()); + + // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + // would be more efficient (the sooner we discard stuff we know we don't care, the less useless + // processing we do on it). - return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec()); ++ return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition()); + } + catch (RuntimeException | Error e) + { + resultIterator.close(); + throw e; + } } -} -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + protected abstract void recordLatency(TableMetrics metric, long latencyNanos); + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec()); + } + + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.forCommand(this); + } + + /** + * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. + * This also log warning/trow TombstoneOverwhelmingException if appropriate. + */ + private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) + { + class MetricRecording extends Transformation<UnfilteredRowIterator> + { + private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); + private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); + + private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + + private int liveRows = 0; + private int tombstones = 0; + + private DecoratedKey currentKey; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + currentKey = iter.partitionKey(); + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) + { + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); + } + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) + { + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } + + @Override + public void onClose() + { + recordLatency(metric, System.nanoTime() - startTimeNanos); + + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); + + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); + ClientWarn.instance.warn(msg); + logger.warn(msg); + } + + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + } + }; + + return Transformation.apply(iter, new MetricRecording()); + } + + /** + * Creates a message for this command. + */ + public abstract MessageOut<ReadCommand> createMessage(int version); + + protected abstract void appendCQLWhereClause(StringBuilder sb); + + // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which + // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) + { + final boolean isForThrift = iterator.isForThrift(); + class WithoutPurgeableTombstones extends PurgeFunction + { + public WithoutPurgeableTombstones() + { + super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + } + + protected Predicate<Long> getPurgeEvaluator() + { + return time -> true; + } + } + return Transformation.apply(iterator, new WithoutPurgeableTombstones()); + } + + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ").append(columnFilter()); + sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(' ').append(limits()); + return sb.toString(); + } + + private static class Serializer implements IVersionedSerializer<ReadCommand> + { + private static int digestFlag(boolean isDigest) + { + return isDigest ? 0x01 : 0; + } + + private static boolean isDigest(int flags) + { + return (flags & 0x01) != 0; + } + + private static int thriftFlag(boolean isForThrift) + { + return isForThrift ? 0x02 : 0; + } + + private static boolean isForThrift(int flags) + { + return (flags & 0x02) != 0; + } + + private static int indexFlag(boolean hasIndex) + { + return hasIndex ? 0x04 : 0; + } + + private static boolean hasIndex(int flags) + { + return (flags & 0x04) != 0; + } + + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version >= MessagingService.VERSION_30; + + out.writeByte(command.kind.ordinal()); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); + if (command.isDigestQuery()) + out.writeUnsignedVInt(command.digestVersion()); + CFMetaData.serializer.serialize(command.metadata(), out, version); + out.writeInt(command.nowInSec()); + ColumnFilter.serializer.serialize(command.columnFilter(), out, version); + RowFilter.serializer.serialize(command.rowFilter(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version); + if (command.index.isPresent()) + IndexMetadata.serializer.serialize(command.index.get(), out, version); + + command.serializeSelection(out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version >= MessagingService.VERSION_30; + + Kind kind = Kind.values()[in.readByte()]; + int flags = in.readByte(); + boolean isDigest = isDigest(flags); + boolean isForThrift = isForThrift(flags); + boolean hasIndex = hasIndex(flags); + int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0; + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + int nowInSec = in.readInt(); + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); + RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); + DataLimits limits = DataLimits.serializer.deserialize(in, version); + Optional<IndexMetadata> index = hasIndex + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + } + + private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + try + { + return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + } + catch (UnknownIndexException e) + { + String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId.toString()); + logger.info(message); + return Optional.empty(); + } + } + + public long serializedSize(ReadCommand command, int version) + { + assert version >= MessagingService.VERSION_30; + + return 2 // kind + flags + + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + + CFMetaData.serializer.serializedSize(command.metadata(), version) + + TypeSizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + + RowFilter.serializer.serializedSize(command.rowFilter(), version) + + DataLimits.serializer.serializedSize(command.limits(), version) + + command.selectionSerializedSize(version) + + command.indexSerializedSize(version); + } + } + + private enum LegacyType + { + GET_BY_NAMES((byte)1), + GET_SLICES((byte)2); + + public final byte serializedValue; + + LegacyType(byte b) + { + this.serializedValue = b; + } + + public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) + { + return kind == ClusteringIndexFilter.Kind.SLICE + ? GET_SLICES + : GET_BY_NAMES; + } + + public static LegacyType fromSerializedValue(byte b) + { + return b == 1 ? GET_BY_NAMES : GET_SLICES; + } + } + + /** + * Serializer for pre-3.0 RangeSliceCommands. + */ + private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> { - out.writeByte(command.commandType.serializedValue); - switch (command.commandType) + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert !rangeCommand.dataRange().isPaging(); + + // convert pre-3.0 incompatible names filters to slice filters + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + // begin DiskAtomFilterSerializer.serialize() + if (rangeCommand.isNamesQuery()) + { + out.writeByte(1); // 0 for slices, 1 for names + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); + } + else + { + out.writeByte(0); // 0 for slices, 1 for names + + // slice filter serialization + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + // limit + DataLimits limits = rangeCommand.limits(); + if (limits.isDistinct()) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (limits.isDistinct() && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + serializeRowFilter(out, rangeCommand.rowFilter()); + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // maxResults + out.writeInt(rangeCommand.limits().count()); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + + // isPaging + out.writeBoolean(false); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + + ClusteringIndexFilter filter; + ColumnFilter selection; + int compositesToGroup = 0; + int perPartitionLimit = -1; + byte readType = in.readByte(); // 0 for slices, 1 for names + if (readType == 1) + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); + selection = selectionAndFilter.left; + filter = selectionAndFilter.right; + } + else + { + Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + filter = p.left; + perPartitionLimit = in.readInt(); + compositesToGroup = in.readInt(); + selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata); + } + + RowFilter rowFilter = deserializeRowFilter(in, metadata); + + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + int maxResults = in.readInt(); + + boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed) + in.readBoolean(); // isPaging (not needed) + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, + // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less + // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use + // that fact. + boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); + else + limits = DataLimits.cqlLimits(maxResults); + + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); + } + + static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); + out.writeInt(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); + expression.operator().writeTo(out); + ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); + } + } + + static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numRowFilters = in.readInt(); + if (numRowFilters == 0) + return RowFilter.NONE; + + RowFilter rowFilter = RowFilter.create(numRowFilters); + for (int i = 0; i < numRowFilters; i++) + { + ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(columnName); + Operator op = Operator.readFrom(in); + ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); + rowFilter.add(column, op, indexValue); + } + return rowFilter; + } + + static long serializedRowFilterSize(RowFilter rowFilter) + { + long size = TypeSizes.sizeof(0); // rowFilterCount + for (RowFilter.Expression expression : rowFilter) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(0); // operator int value + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + return size; + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + CFMetaData metadata = rangeCommand.metadata(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += 1; // single byte flag: 0 for slices, 1 for names + if (rangeCommand.isNamesQuery()) + { + PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); + } + else + { + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + size += TypeSizes.sizeof(0); // compositesToGroup + } + + if (rangeCommand.rowFilter().equals(RowFilter.NONE)) + { + size += TypeSizes.sizeof(0); + } + else + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); + size += TypeSizes.sizeof(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(expression.operator().ordinal()); + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + } + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + size += TypeSizes.sizeof(rangeCommand.limits().count()); + size += TypeSizes.sizeof(!rangeCommand.isForThrift()); + return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); + } + + static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) { - case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(command, out, version); - break; - case GET_SLICES: - SliceFromReadCommand.serializer.serialize(command, out, version); - break; - default: - throw new AssertionError(); + if (!command.dataRange().isNamesQuery()) + return command; + + CFMetaData metadata = command.metadata(); + if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; + ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); + DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); + return new PartitionRangeReadCommand( + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); + } + + static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) + { + // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. + // In that case, we'll basically be querying the first row of the partition, but we must make sure we include + // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise. + if (compositesToGroup == -2) + return ColumnFilter.all(metadata); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + PartitionColumns columns = selectsStatics + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + return ColumnFilter.selectionBuilder().addAll(columns).build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadQuery.java index 178ca7c,0000000..75ba8f5 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@@ -1,141 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.pager.QueryPager; +import org.apache.cassandra.service.pager.PagingState; + +/** + * Generic abstraction for read queries. + * <p> + * The main implementation of this is {@link ReadCommand} but we have this interface because + * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a + * {@code ReadCommand}. + */ +public interface ReadQuery +{ + ReadQuery EMPTY = new ReadQuery() + { + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.emptyGroup(); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return EmptyIterators.partition(); + } + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + return EmptyIterators.partition(); + } + + public DataLimits limits() + { + // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means + // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging" + // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this. + return DataLimits.cqlLimits(0); + } + + public QueryPager getPager(PagingState state, int protocolVersion) + { + return QueryPager.EMPTY; + } + + public QueryPager getLocalPager() + { + return QueryPager.EMPTY; + } + + public boolean selectsKey(DecoratedKey key) + { + return false; + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return false; + } ++ ++ @Override ++ public boolean selectsFullPartition() ++ { ++ return false; ++ } + }; + + /** + * Starts a new read operation. + * <p> + * This must be called before {@link executeInternal} and passed to it to protect the read. + * The returned object <b>must</b> be closed on all path and it is thus strongly advised to + * use it in a try-with-ressource construction. + * + * @return a newly started order group for this {@code ReadQuery}. + */ + public ReadOrderGroup startOrderGroup(); + + /** + * Executes the query at the provided consistency level. + * + * @param consistency the consistency level to achieve for the query. + * @param clientState the {@code ClientState} for the query. In practice, this can be null unless + * {@code consistency} is a serial consistency. + * + * @return the result of the query. + */ + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException; + + /** + * Execute the query for internal queries (that is, it basically executes the query locally). + * + * @param orderGroup the {@code ReadOrderGroup} protecting the read. + * @return the result of the query. + */ + public PartitionIterator executeInternal(ReadOrderGroup orderGroup); + + /** + * Returns a pager for the query. + * + * @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be + * {@code null} if this is the start of paging. + * @param protocolVersion the protocol version to use for the paging state of that pager. + * + * @return a pager for the query. + */ + public QueryPager getPager(PagingState pagingState, int protocolVersion); + + /** + * The limits for the query. + * + * @return The limits for the query. + */ + public DataLimits limits(); + + /** + * @return true if the read query would select the given key, including checks against the row filter, if + * checkRowFilter is true + */ + public boolean selectsKey(DecoratedKey key); + + /** + * @return true if the read query would select the given clustering, including checks against the row filter, if + * checkRowFilter is true + */ + public boolean selectsClustering(DecoratedKey key, Clustering clustering); ++ ++ /** ++ * Checks if this {@code ReadQuery} selects full partitions, that is it has no filtering on clustering or regular columns. ++ * @return {@code true} if this {@code ReadQuery} selects full partitions, {@code false} otherwise. ++ */ ++ public boolean selectsFullPartition(); +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org