Updated Branches: refs/heads/trunk 341d6c48c -> 025b84efd
Fix paging discardFirst IllgalArgumentException patch by slebresne; reviewed by thobbs for CASSANDRA-6555 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55211bca Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55211bca Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55211bca Branch: refs/heads/trunk Commit: 55211bca799cba0b9bb443c5111102216d416591 Parents: 479bf8a Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Jan 29 10:08:08 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 29 10:08:08 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../service/pager/AbstractQueryPager.java | 43 +++-- .../service/pager/AbstractQueryPagerTest.java | 184 +++++++++++++++++++ 3 files changed, 216 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ac75587..c626d37 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Add support for 2.1 global counter shards (CASSANDRA-6505) * Fix NPE when streaming connection is not yet established (CASSANDRA-6210) * Avoid rare duplicate read repair triggering (CASSANDRA-6606) + * Fix paging discardFirst (CASSANDRA-6555) Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index ba4d8f2..297a85f 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import java.util.Iterator; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; @@ -52,16 +54,27 @@ abstract class AbstractQueryPager implements QueryPager IDiskAtomFilter columnFilter, long timestamp) { + this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp); + } + + protected AbstractQueryPager(ConsistencyLevel consistencyLevel, + int toFetch, + boolean localQuery, + CFMetaData cfm, + IDiskAtomFilter columnFilter, + long timestamp) + { this.consistencyLevel = consistencyLevel; this.localQuery = localQuery; - this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily); + this.cfm = cfm; this.columnFilter = columnFilter; this.timestamp = timestamp; this.remaining = toFetch; } + public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException { if (isExhausted()) @@ -178,9 +191,10 @@ abstract class AbstractQueryPager implements QueryPager return discardFirst(rows, 1); } - private List<Row> discardFirst(List<Row> rows, int toDiscard) + @VisibleForTesting + List<Row> discardFirst(List<Row> rows, int toDiscard) { - if (toDiscard == 0) + if (toDiscard == 0 || rows.isEmpty()) return rows; int i = 0; @@ -197,12 +211,14 @@ abstract class AbstractQueryPager implements QueryPager } // If there is less live data than to discard, all is discarded - if (toDiscard > 0 && i >= rows.size()) + if (toDiscard > 0) return Collections.<Row>emptyList(); + // i is the index of the first row that we are sure to keep. On top of that, + // we also keep firstCf is it hasn't been fully emptied by the last iteration above. int count = firstCf.getColumnCount(); - int newSize = rows.size() - i; - List<Row> newRows = new ArrayList<Row>(count == 0 ? newSize-1 : newSize); + int newSize = rows.size() - (count == 0 ? i : i - 1); + List<Row> newRows = new ArrayList<Row>(newSize); if (count != 0) newRows.add(new Row(firstKey, firstCf)); newRows.addAll(rows.subList(i, rows.size())); @@ -215,9 +231,10 @@ abstract class AbstractQueryPager implements QueryPager return discardLast(rows, 1); } - private List<Row> discardLast(List<Row> rows, int toDiscard) + @VisibleForTesting + List<Row> discardLast(List<Row> rows, int toDiscard) { - if (toDiscard == 0) + if (toDiscard == 0 || rows.isEmpty()) return rows; int i = rows.size()-1; @@ -234,13 +251,15 @@ abstract class AbstractQueryPager implements QueryPager } // If there is less live data than to discard, all is discarded - if (toDiscard > 0 && i < 0) + if (toDiscard > 0) return Collections.<Row>emptyList(); + // i is the index of the last row that we are sure to keep. On top of that, + // we also keep lastCf is it hasn't been fully emptied by the last iteration above. int count = lastCf.getColumnCount(); - int newSize = i+1; - List<Row> newRows = new ArrayList<Row>(count == 0 ? newSize-1 : newSize); - newRows.addAll(rows.subList(0, i)); + int newSize = count == 0 ? i+1 : i+2; + List<Row> newRows = new ArrayList<Row>(newSize); + newRows.addAll(rows.subList(0, i+1)); if (count != 0) newRows.add(new Row(lastKey, lastCf)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java new file mode 100644 index 0000000..5467ec0 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.pager; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.junit.Test; +import static org.junit.Assert.*; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnCounter; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class AbstractQueryPagerTest +{ + @Test + public void DiscardFirstTest() + { + TestPager pager = new TestPager(); + List<Row> rows = Arrays.asList(createRow("r1", 1), + createRow("r2", 3), + createRow("r3", 2)); + + assertEquals(3, rows.size()); + assertRow(rows.get(0), "r1", 0); + assertRow(rows.get(1), "r2", 0, 1, 2); + assertRow(rows.get(2), "r3", 0, 1); + + rows = pager.discardFirst(rows, 1); + + assertEquals(2, rows.size()); + assertRow(rows.get(0), "r2", 0, 1, 2); + assertRow(rows.get(1), "r3", 0, 1); + + rows = pager.discardFirst(rows, 1); + + assertEquals(2, rows.size()); + assertRow(rows.get(0), "r2", 1, 2); + assertRow(rows.get(1), "r3", 0, 1); + + rows = pager.discardFirst(rows, 3); + + assertEquals(1, rows.size()); + assertRow(rows.get(0), "r3", 1); + + rows = pager.discardFirst(rows, 1); + + assertTrue(rows.isEmpty()); + } + + @Test + public void DiscardLastTest() + { + TestPager pager = new TestPager(); + List<Row> rows = Arrays.asList(createRow("r1", 2), + createRow("r2", 3), + createRow("r3", 1)); + + assertEquals(3, rows.size()); + assertRow(rows.get(0), "r1", 0, 1); + assertRow(rows.get(1), "r2", 0, 1, 2); + assertRow(rows.get(2), "r3", 0); + + rows = pager.discardLast(rows, 1); + + assertEquals(2, rows.size()); + assertRow(rows.get(0), "r1", 0, 1); + assertRow(rows.get(1), "r2", 0, 1, 2); + + rows = pager.discardLast(rows, 1); + + assertEquals(2, rows.size()); + assertRow(rows.get(0), "r1", 0, 1); + assertRow(rows.get(1), "r2", 0, 1); + + rows = pager.discardLast(rows, 3); + + assertEquals(1, rows.size()); + assertRow(rows.get(0), "r1", 0); + + rows = pager.discardLast(rows, 1); + + assertTrue(rows.isEmpty()); + } + + private void assertRow(Row row, String name, int... values) + { + assertEquals(row.key.key, ByteBufferUtil.bytes(name)); + assertEquals(values.length, row.cf.getColumnCount()); + + int i = 0; + for (Column c : row.cf) + assertEquals(values[i++], i(c.name())); + } + + private Row createRow(String name, int nbCol) + { + return new Row(Util.dk(name), createCF(nbCol)); + } + + private ColumnFamily createCF(int nbCol) + { + ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(createMetadata()); + for (int i = 0; i < nbCol; i++) + cf.addColumn(bb(i), bb(i), 0); + return cf; + } + + private CFMetaData createMetadata() + { + return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance); + } + + private static ByteBuffer bb(int i) + { + return ByteBufferUtil.bytes(i); + } + + private static int i(ByteBuffer bb) + { + return ByteBufferUtil.toInt(bb); + } + + private static class TestPager extends AbstractQueryPager + { + public TestPager() + { + // We use this to test more thorougly DiscardFirst and DiscardLast (more generic pager behavior is tested in + // QueryPagerTest). The only thing those method use is the result of the columnCounter() method. So to keep + // it simple, we fake all actual parameters in the ctor below but just override the columnCounter() method. + super(null, 0, false, null, null, 0); + } + + @Override + public ColumnCounter columnCounter() + { + return new ColumnCounter(0); + } + + public PagingState state() + { + return null; + } + + protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) + { + return null; + } + + protected boolean containsPreviousLast(Row first) + { + return false; + } + + protected boolean recordLast(Row last) + { + return false; + } + + protected boolean isReversed() + { + return false; + } + } +}