This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cassandra-2.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.1 by this push: new dd228d4 Special case page handling for DISTINCT queries dd228d4 is described below commit dd228d4581b020fb2fb788858481c81357d7fa72 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Thu Jan 3 19:03:11 2019 +0000 Special case page handling for DISTINCT queries Simplify the removal of the first row in a paged range slice page with DISTINCT if that row was already returned at the end of the previous page Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam Tunnicliffe and Marcus Eriksson for CASSANDRA-14956 Co-authored-by: Sam Tunnicliffe <s...@beobal.com> Co-authored-by: Marcus Eriksson <marc...@apache.org> --- CHANGES.txt | 1 + .../service/pager/AbstractQueryPager.java | 2 +- .../service/pager/RangeSliceQueryPager.java | 23 +++ .../unit/org/apache/cassandra/cql3/PagingTest.java | 160 +++++++++++++++++++++ 4 files changed, 185 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7b88686..3582d4f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.21 + * Paged Range Slice queries with DISTINCT can drop rows from results (CASSANDRA-14956) * Update release checksum algorithms to SHA-256, SHA-512 (CASSANDRA-14970) * Check checksum before decompressing data (CASSANDRA-14284) * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183) diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 8bbf6d6..445a507 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -207,7 +207,7 @@ abstract class AbstractQueryPager implements QueryPager protected abstract boolean isReversed(); - private List<Row> discardFirst(List<Row> rows) + protected List<Row> discardFirst(List<Row> rows) { return discardFirst(rows, 1); } diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 3ac777e..a37db02 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service.pager; +import java.util.ArrayList; import java.util.List; import org.apache.cassandra.config.CFMetaData; @@ -114,6 +115,28 @@ public class RangeSliceQueryPager extends AbstractQueryPager && firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName); } + protected List<Row> discardFirst(List<Row> rows) + { + if (rows.isEmpty()) + return rows; + + // Special case for distinct queries because the superclass' discardFirst keeps dropping cells + // until it has removed the first *live* row. In a distinct query we only fetch the first row + // from a given partition, which may be entirely non-live. In the case where such a non-live + // row is the last in page N & the first in page N+1, we would also end up discarding an + // additional live row from page N+1. + // The simplest solution is to just remove whichever row is first in the page, without bothering + // to do liveness checks etc. + if (isDistinct()) + { + List<Row> newRows = new ArrayList<>(Math.max(1, rows.size() - 1)); + newRows.addAll(rows.subList(1, rows.size())); + return newRows; + } + + return super.discardFirst(rows); + } + private boolean isDistinct() { // As this pager is never used for Thrift queries, checking the countCQL3Rows is enough. diff --git a/test/unit/org/apache/cassandra/cql3/PagingTest.java b/test/unit/org/apache/cassandra/cql3/PagingTest.java new file mode 100644 index 0000000..531ddde --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/PagingTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.config.DatabaseDescriptor; + +import org.apache.cassandra.dht.LongToken; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.AbstractEndpointSnitch; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static junit.framework.Assert.assertFalse; +import static org.junit.Assert.assertEquals; + + +public class PagingTest +{ + private static Cluster cluster; + private static Session session; + + private static final String KEYSPACE = "paging_test"; + private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };"; + + private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE; + + @BeforeClass + public static void setup() throws Exception + { + DatabaseDescriptor.setPartitioner(new Murmur3Partitioner()); + EmbeddedCassandraService cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + // Currently the native server start method return before the server is fully binded to the socket, so we need + // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep. + Thread.sleep(500); + + cluster = Cluster.builder().addContactPoint("127.0.0.1") + .withPort(DatabaseDescriptor.getNativeTransportPort()) + .build(); + session = cluster.connect(); + + session.execute(dropKsStatement); + session.execute(createKsStatement); + } + + @AfterClass + public static void tearDown() + { + cluster.close(); + } + + /** + * Makes sure that we don't drop any live rows when paging with DISTINCT queries + * + * * We need to have more rows than fetch_size + * * The node must have a token within the first page (so that the range gets split up in StorageProxy#getRestrictedRanges) + * - This means that the second read in the second range will read back too many rows + * * The extra rows are dropped (so that we only return fetch_size rows to client) + * * This means that the last row recorded in AbstractQueryPager#recordLast is a non-live one + * * For the next page, the first row returned will be the same non-live row as above + * * The bug in CASSANDRA-14956 caused us to drop that non-live row + the first live row in the next page + */ + @Test + public void testPaging() throws InterruptedException + { + String table = KEYSPACE + ".paging"; + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + table + " (id int, id2 int, id3 int, val text, PRIMARY KEY ((id, id2), id3));"; + String dropTableStatement = "DROP TABLE IF EXISTS " + table + ';'; + + // custom snitch to avoid merging ranges back together after StorageProxy#getRestrictedRanges splits them up + IEndpointSnitch snitch = new AbstractEndpointSnitch() + { + private IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch(); + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return oldSnitch.compareEndpoints(target, a1, a2); + } + + public String getRack(InetAddress endpoint) + { + return oldSnitch.getRack(endpoint); + } + + public String getDatacenter(InetAddress endpoint) + { + return oldSnitch.getDatacenter(endpoint); + } + + @Override + public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) + { + return false; + } + }; + DatabaseDescriptor.setEndpointSnitch(snitch); + StorageService.instance.getTokenMetadata().clearUnsafe(); + StorageService.instance.getTokenMetadata().updateNormalToken(new LongToken(5097162189738624638L), FBUtilities.getBroadcastAddress()); + session.execute(createTableStatement); + + for (int i = 0; i < 110; i++) + { + // removing row with idx 10 causes the last row in the first page read to be empty + String ttlClause = i == 10 ? "USING TTL 1" : ""; + session.execute(String.format("INSERT INTO %s (id, id2, id3, val) VALUES (%d, %d, %d, '%d') %s", table, i, i, i, i, ttlClause)); + } + Thread.sleep(1500); + + Statement stmt = new SimpleStatement(String.format("SELECT DISTINCT token(id, id2), id, id2 FROM %s", table)); + stmt.setFetchSize(100); + ResultSet res = session.execute(stmt); + stmt.setFetchSize(200); + ResultSet res2 = session.execute(stmt); + + Iterator<Row> iter1 = res.iterator(); + Iterator<Row> iter2 = res2.iterator(); + + while (iter1.hasNext() && iter2.hasNext()) + { + Row row1 = iter1.next(); + Row row2 = iter2.next(); + assertEquals(row1.getInt("id"), row2.getInt("id")); + } + assertFalse(iter1.hasNext()); + assertFalse(iter2.hasNext()); + session.execute(dropTableStatement); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org