Fix null static columns during paging, reversed queries Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8502
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d075540c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d075540c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d075540c Branch: refs/heads/cassandra-2.2 Commit: d075540c46209fdabde74db1e210114965372605 Parents: 63165a7 Author: Tyler Hobbs <tylerlho...@gmail.com> Authored: Wed May 27 13:48:52 2015 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Wed May 27 13:48:52 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/ColumnFamilyStore.java | 4 +- src/java/org/apache/cassandra/db/DataRange.java | 81 +++++++++++++++++--- .../cassandra/db/SliceFromReadCommand.java | 24 ++++++ .../cassandra/db/filter/ColumnCounter.java | 67 +++++++++++++++- .../cassandra/db/filter/ExtendedFilter.java | 13 ++++ .../cassandra/db/filter/SliceQueryFilter.java | 79 ++++++++++++++++++- .../service/pager/AbstractQueryPager.java | 40 ++++++++-- .../service/pager/RangeSliceQueryPager.java | 4 +- .../service/pager/SliceQueryPager.java | 6 +- .../cassandra/cql3/MultiColumnRelationTest.java | 2 + .../service/pager/AbstractQueryPagerTest.java | 8 +- 12 files changed, 293 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 709100b..054cf79 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.16: + * Fix null static columns in pages after the first, paged reversed + queries (CASSANDRA-8502) * Fix failing bound statement after adding a collection (CASSANDRA-9411) * Fix counting cache serialization in request metrics (CASSANDRA-9466) * (cqlsh) Add LOGIN command to switch users (CASSANDRA-7212) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index eec4044..f81ec82 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1682,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean boolean countCQL3Rows, long now) { - DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator); + DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata); return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now); } @@ -1714,7 +1714,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish // through to DataRange.Paging to be used on the first and last partitions SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count); - dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator); + dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 774a3aa..1be9469 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -22,10 +22,12 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import com.google.common.base.Objects; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.*; +import org.apache.cassandra.utils.ByteBufferUtil; /** * Groups key range and column filter for range queries. @@ -41,7 +43,7 @@ import org.apache.cassandra.dht.*; */ public class DataRange { - private final AbstractBounds<RowPosition> keyRange; + protected final AbstractBounds<RowPosition> keyRange; protected IDiskAtomFilter columnFilter; protected final boolean selectFullRow; @@ -146,6 +148,8 @@ public class DataRange // The slice of columns that we want to fetch for each row, ignoring page start/end issues. private final SliceQueryFilter sliceFilter; + private final CFMetaData cfm; + private final Comparator<ByteBuffer> comparator; // used to restrict the start of the slice for the first partition in the range @@ -154,7 +158,11 @@ public class DataRange // used to restrict the end of the slice for the last partition in the range private final ByteBuffer lastPartitionColumnFinish; - private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart, ByteBuffer lastPartitionColumnFinish, Comparator<ByteBuffer> comparator) + // tracks the last key that we updated the filter for to avoid duplicating work + private ByteBuffer lastKeyFilterWasUpdatedFor; + + private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart, + ByteBuffer lastPartitionColumnFinish, CFMetaData cfm, Comparator<ByteBuffer> comparator) { super(range, filter); @@ -163,14 +171,16 @@ public class DataRange assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range; this.sliceFilter = filter; + this.cfm = cfm; this.comparator = comparator; this.firstPartitionColumnStart = firstPartitionColumnStart; this.lastPartitionColumnFinish = lastPartitionColumnFinish; + this.lastKeyFilterWasUpdatedFor = null; } - public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator) + public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, CFMetaData cfm) { - this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator); + this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator : cfm.comparator); } @Override @@ -181,7 +191,7 @@ public class DataRange return false; if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey)) - return selectFullRow; + return true; return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey)); } @@ -201,12 +211,29 @@ public class DataRange * Maybe we should just remove that hack, but in the meantime, we * need to keep a reference the last returned filter. */ - columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey) - ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey)) - : sliceFilter; + if (equals(startKey(), rowKey) || equals(stopKey(), rowKey)) + { + if (!rowKey.equals(lastKeyFilterWasUpdatedFor)) + { + this.lastKeyFilterWasUpdatedFor = rowKey; + columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey)); + } + } + else + { + columnFilter = sliceFilter; + } + return columnFilter; } + /** Returns true if the slice includes static columns, false otherwise. */ + private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm) + { + return cfm.hasStaticColumns() && + cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0; + } + private ColumnSlice[] slicesForKey(ByteBuffer key) { // We don't call that until it's necessary, so assume we have to do some hard work @@ -216,19 +243,37 @@ public class DataRange ByteBuffer newStart = equals(startKey(), key) && firstPartitionColumnStart.hasRemaining() ? firstPartitionColumnStart : null; ByteBuffer newFinish = equals(stopKey(), key) && lastPartitionColumnFinish.hasRemaining() ? lastPartitionColumnFinish : null; - List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices + // in the common case, we'll have the same number of slices + List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length); + // Check our slices to see if any fall before the page start (in which case they can be removed) or + // if they contain the page start (in which case they should start from the page start). However, if the + // slices would include static columns, we need to ensure they are also fetched, and so a separate + // slice for the static columns may be required. + // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so + // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details. for (ColumnSlice slice : sliceFilter.slices) { if (newStart != null) { if (slice.isBefore(comparator, newStart)) - continue; // we skip that slice + { + if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm)) + newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange())); + + continue; + } if (slice.includes(comparator, newStart)) + { + if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)) + newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange())); + slice = new ColumnSlice(newStart, slice.finish); + } - // Whether we've updated the slice or not, we don't have to bother about newStart anymore + // once we see a slice that either includes the page start or is after it, we can stop checking + // against the page start (because the slices are ordered) newStart = null; } @@ -252,5 +297,17 @@ public class DataRange columnFilter.updateColumnsLimit(count); sliceFilter.updateColumnsLimit(count); } + + @Override + public String toString() + { + return Objects.toStringHelper(this) + .add("keyRange", keyRange) + .add("sliceFilter", sliceFilter) + .add("columnFilter", columnFilter) + .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : ByteBufferUtil.bytesToHex(firstPartitionColumnStart)) + .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : ByteBufferUtil.bytesToHex(lastPartitionColumnFinish)) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java index afca338..0ea2de5 100644 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,30 @@ public class SliceFromReadCommand extends ReadCommand public Row getRow(Keyspace keyspace) { + CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName); DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + + // If we're doing a reversed query and the filter includes static columns, we need to issue two separate + // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details. + if (filter.reversed && filter.hasStaticSlice(cfm)) + { + logger.debug("Splitting reversed slice with static columns into two reads"); + Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm); + + Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp)); + Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp)); + + // add the static results to the start of the normal results + if (normalResults.cf == null) + return staticResults; + + if (staticResults.cf != null) + for (Column col : staticResults.cf.getReverseSortedColumns()) + normalResults.cf.addColumn(col); + + return normalResults; + } + return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ColumnCounter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java index 2d0df1f..ddd74b3 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java @@ -75,10 +75,10 @@ public class ColumnCounter public static class GroupByPrefix extends ColumnCounter { - private final CompositeType type; - private final int toGroup; - private ByteBuffer[] previous; - private boolean previousGroupIsStatic; + protected final CompositeType type; + protected final int toGroup; + protected ByteBuffer[] previous; + protected boolean previousGroupIsStatic; /** * A column counter that count only 1 for all the columns sharing a @@ -157,4 +157,63 @@ public class ColumnCounter previous = current; } } + + /** + * Similar to GroupByPrefix, but designed to handle counting cells in reverse order. + */ + public static class GroupByPrefixReversed extends GroupByPrefix + { + public GroupByPrefixReversed(long timestamp, CompositeType type, int toGroup) + { + super(timestamp, type, toGroup); + } + + @Override + public void count(Column column, DeletionInfo.InOrderTester tester) + { + if (tester.isDeleted(column)) + return; + + if (!column.isLive(timestamp)) + { + tombstones++; + return; + } + + if (toGroup == 0) + { + live = 1; + return; + } + + ByteBuffer[] current = type.split(column.name()); + assert current.length >= toGroup; + + boolean isStatic = CompositeType.isStaticName(column.name()); + if (previous == null) + { + // This is the first group we've seen, and it's static. In this case we want to return a count of 1, + // because there are no other live groups. + previousGroupIsStatic = true; + previous = current; + live++; + } + else if (isStatic) + { + // Ignore statics if we've seen any other statics or any other groups + return; + } + + for (int i = 0; i < toGroup; i++) + { + if (ByteBufferUtil.compareUnsigned(previous[i], current[i]) != 0) + { + // it's a new group + live++; + previous = current; + return; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index 82e889d..e03eba1 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import com.google.common.base.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,6 +159,18 @@ public abstract class ExtendedFilter } } + @Override + public String toString() + { + return Objects.toStringHelper(this) + .add("cfs", cfs) + .add("dataRange", dataRange) + .add("maxResults", maxResults) + .add("countCQL3Rows", countCQL3Rows) + .add("currentLimit", currentLimit) + .toString(); + } + public static class WithClauses extends ExtendedFilter { private final List<IndexExpression> clause; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 6e6ab6b..ecf02c1 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -23,6 +23,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,11 +102,61 @@ public class SliceQueryFilter implements IDiskAtomFilter return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup); } - public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator) + /** Returns true if the slice includes static columns, false otherwise. */ + private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm) { - Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator; + return cfm.hasStaticColumns() && + cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0; + } + + public boolean hasStaticSlice(CFMetaData cfm) + { + for (ColumnSlice slice : slices) + if (sliceIncludesStatics(slice, cfm)) + return true; + + return false; + } + + /** + * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the + * remainder of the normal data. + * + * This should only be called when the filter is reversed and the filter is known to cover static columns (through + * hasStaticSlice()). + * + * @return a pair of (static, normal) SliceQueryFilters + */ + public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm) + { + assert reversed; + + ByteBuffer staticSliceEnd = cfm.getStaticColumnNameBuilder().buildAsEndOfRange(); + List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length); + for (ColumnSlice slice : slices) + { + if (sliceIncludesStatics(slice, cfm)) + nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd)); + else + nonStaticSlices.add(slice); + } - List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(); + return Pair.create( + new SliceQueryFilter(staticSliceEnd, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, count, compositesToGroup), + new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup)); + } + + public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, CFMetaData cfm) + { + Comparator<ByteBuffer> cmp = reversed ? cfm.comparator.reverseComparator : cfm.comparator; + + // Check our slices to see if any fall before the new start (in which case they can be removed) or + // if they contain the new start (in which case they should start from the page start). However, if the + // slices would include static columns, we need to ensure they are also fetched, and so a separate + // slice for the static columns may be required. + // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so + // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details. + List<ColumnSlice> newSlices = new ArrayList<>(); boolean pastNewStart = false; for (int i = 0; i < slices.length; i++) { @@ -115,12 +169,23 @@ public class SliceQueryFilter implements IDiskAtomFilter } if (slices[i].isBefore(cmp, newStart)) + { + if (!reversed && sliceIncludesStatics(slice, cfm)) + newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange())); + continue; + } + else if (slice.includes(cmp, newStart)) + { + if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)) + newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange())); - if (slice.includes(cmp, newStart)) newSlices.add(new ColumnSlice(newStart, slice.finish)); + } else + { newSlices.add(slice); + } pastNewStart = true; } @@ -254,12 +319,18 @@ public class SliceQueryFilter implements IDiskAtomFilter return new ColumnCounter(now); else if (compositesToGroup == 0) return new ColumnCounter.GroupByPrefix(now, null, 0); + else if (reversed) + return new ColumnCounter.GroupByPrefixReversed(now, (CompositeType)comparator, compositesToGroup); else return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup); } public void trim(ColumnFamily cf, int trimTo, long now) { + // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming + if (cf.getColumnCount() < trimTo) + return; + ColumnCounter counter = columnCounter(cf.getComparator(), now); Collection<Column> columns = reversed http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index c45dd07..155e538 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -17,14 +17,12 @@ */ package org.apache.cassandra.service.pager; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Iterator; +import java.util.*; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnCounter; @@ -306,13 +304,29 @@ abstract class AbstractQueryPager implements QueryPager { ColumnCounter counter = columnCounter(); - // Discard the first 'toDiscard' live + List<Column> staticColumns = new ArrayList<>(cfm.staticColumns().size()); + + // Discard the first 'toDiscard' live, non-static columns while (iter.hasNext()) { Column c = iter.next(); + + // if it's a static column, don't count it and save it to add to the trimmed results + ColumnDefinition columnDef = cfm.getColumnDefinitionFromColumnName(c.name()); + if (columnDef != null && columnDef.type == ColumnDefinition.Type.STATIC) + { + staticColumns.add(c); + continue; + } + counter.count(c, tester); + + // once we've discarded the required amount, add the rest if (counter.live() > toDiscard) { + for (Column staticColumn : staticColumns) + copy.addColumn(staticColumn); + copy.addColumn(c); while (iter.hasNext()) copy.addColumn(iter.next()); @@ -342,9 +356,21 @@ abstract class AbstractQueryPager implements QueryPager return Math.min(liveCount, toDiscard); } - protected static Column firstColumn(ColumnFamily cf) + /** + * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column + * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal + * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we + * need to start from the last non-static cell. + */ + protected Column firstNonStaticColumn(ColumnFamily cf) { - return cf.iterator().next(); + for (Column column : cf) + { + ColumnDefinition def = cfm.getColumnDefinitionFromColumnName(column.name()); + if (def == null || def.type != ColumnDefinition.Type.STATIC) + return column; + } + return null; } protected static Column lastColumn(ColumnFamily cf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 0df1d25..3618c56 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -93,7 +93,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager return false; // Same as SliceQueryPager, we ignore a deleted column - Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf); + Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf); return !first.cf.deletionInfo().isDeleted(firstColumn) && firstColumn.isLive(timestamp()) && lastReturnedName.equals(firstColumn.name()); @@ -102,7 +102,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager protected boolean recordLast(Row last) { lastReturnedKey = last.key; - lastReturnedName = (isReversed() ? firstColumn(last.cf) : lastColumn(last.cf)).name(); + lastReturnedName = (isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf)).name(); return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java index cdad0a5..ad5a0bf 100644 --- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java @@ -78,7 +78,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti // more rows than we're supposed to. See CASSANDRA-8108 for more details. SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize)); if (lastReturned != null) - filter = filter.withUpdatedStart(lastReturned, cfm.comparator); + filter = filter.withUpdatedStart(lastReturned, cfm); logger.debug("Querying next page of slice query; new filter: {}", filter); ReadCommand pageCmd = command.withUpdatedFilter(filter); @@ -92,7 +92,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti if (lastReturned == null) return false; - Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf); + Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf); // Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the // rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there. return !first.cf.deletionInfo().isDeleted(firstColumn) @@ -102,7 +102,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti protected boolean recordLast(Row last) { - Column lastColumn = isReversed() ? firstColumn(last.cf) : lastColumn(last.cf); + Column lastColumn = isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf); lastReturned = lastColumn.name(); return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java index e3ccba5..30b7f0f 100644 --- a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java +++ b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java @@ -704,6 +704,8 @@ public class MultiColumnRelationTest { for (String tableSuffix : new String[]{"", "_compact"}) { + execute("DELETE FROM %s.multiple_clustering_reversed" + tableSuffix + " WHERE a=0"); + // b and d are reversed in the clustering order execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 0, 0)"); execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 1, 1)"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java index 5467ec0..273487a 100644 --- a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java +++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java @@ -125,9 +125,11 @@ public class AbstractQueryPagerTest return cf; } - private CFMetaData createMetadata() + private static CFMetaData createMetadata() { - return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance); + CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance); + cfm.rebuild(); + return cfm; } private static ByteBuffer bb(int i) @@ -147,7 +149,7 @@ public class AbstractQueryPagerTest // We use this to test more thorougly DiscardFirst and DiscardLast (more generic pager behavior is tested in // QueryPagerTest). The only thing those method use is the result of the columnCounter() method. So to keep // it simple, we fake all actual parameters in the ctor below but just override the columnCounter() method. - super(null, 0, false, null, null, 0); + super(null, 0, false, createMetadata(), null, 0); } @Override