This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 78de6aca166e10524abf7f362efeb65751b78170 Merge: 001deb0 377ceb6 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Mon Apr 27 16:36:32 2020 +0100 Merge branch 'cassandra-3.11' into trunk # Conflicts: # CHANGES.txt # src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java # test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java # test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java CHANGES.txt | 1 + .../cql3/restrictions/StatementRestrictions.java | 9 +- .../index/internal/CassandraIndexSearcher.java | 7 +- .../internal/composites/CompositesSearcher.java | 2 +- .../service/pager/AbstractQueryPager.java | 28 ++- .../service/pager/PartitionRangeQueryPager.java | 5 + .../cassandra/cql3/DistinctQueryPagingTest.java | 231 +++++++++++++++++++++ .../cassandra/cql3/IndexQueryPagingTest.java | 54 +++++ .../validation/entities/StaticColumnsTest.java | 6 +- .../validation/operations/SelectLimitTest.java | 156 ++++++++++++++ .../index/internal/CassandraIndexTest.java | 25 +++ 11 files changed, 504 insertions(+), 20 deletions(-) diff --cc CHANGES.txt index 929dd70,fd7ad6a..11bc713 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,71 -1,9 +1,72 @@@ -3.11.7 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904) +4.0-alpha5 + * Avoid race condition when completing stream sessions (CASSANDRA-15666) + * Flush with fast compressors by default (CASSANDRA-15379) + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637) + * Allow sending Entire SSTables over SSL (CASSANDRA-15740) + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739) + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730) + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687) + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601) + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660) + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597) + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573) + * Improve logging around incremental repair (CASSANDRA-15599) + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688) + * Replace array iterators with get by index (CASSANDRA-15394) + * Minimize BTree iterator allocations (CASSANDRA-15389) +Merged from 3.11: Merged from 3.0: + * Allow selecting static column only when querying static index (CASSANDRA-14242) * cqlsh return non-zero status when STDIN CQL fails (CASSANDRA-15623) * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690) +Merged from 2.2: + * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501) + +4.0-alpha4 + * Add client request size server metrics (CASSANDRA-15704) + * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705) + * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711) + * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612) + * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773) + * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706) + * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390) + * Repair history tables should have TTL and TWCS (CASSANDRA-12701) + * Fix cqlsh erroring out on Python 3.7 due to webbrowser module being absent (CASSANDRA-15572) + * Fix IMH#acquireCapacity() to return correct Outcome when endpoint reserve runs out (CASSANDRA-15607) + * Fix nodetool describering output (CASSANDRA-15682) + * Only track ideal CL failure when request CL met (CASSANDRA-15696) + * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and ConsistentSession (CASSANDRA-15672) + * Fix force compaction of wrapping ranges (CASSANDRA-15664) + * Expose repair streaming metrics (CASSANDRA-15656) + * Set now in seconds in the future for validation repairs (CASSANDRA-15655) + * Emit metric on preview repair failure (CASSANDRA-15654) + * Use more appropriate logging levels (CASSANDRA-15661) + * Fixed empty check in TrieMemIndex due to potential state inconsistency in ConcurrentSkipListMap (CASSANDRA-15526) + * Added UnleveledSSTables global and table level metric (CASSANDRA-15620) + * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616, CASSANDRA-15643) + * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600) + * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308) + * Include finalized pending sstables in preview repair (CASSANDRA-15553) + * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271) + * Correct inaccurate logging message (CASSANDRA-15549) + * Unset GREP_OPTIONS (CASSANDRA-14487) + * Update to Python driver 3.21 for cqlsh (CASSANDRA-14872) + * Fix missing Keyspaces in cqlsh describe output (CASSANDRA-15576) + * Fix multi DC nodetool status output (CASSANDRA-15305) + * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569) + * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190) + * Improve the description of nodetool listsnapshots command (CASSANDRA-14587) + * allow embedded cassandra launched from a one-jar or uno-jar (CASSANDRA-15494) + * Update hppc library to version 0.8.1 (CASSANDRA-12995) + * Limit the dependencies used by UDFs/UDAs (CASSANDRA-14737) + * Make native_transport_max_concurrent_requests_in_bytes updatable (CASSANDRA-15519) + * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469) + * Potential Overflow in DatabaseDescriptor Functions That Convert Between KB/MB & Bytes (CASSANDRA-15470) +Merged from 3.11: + * Allow sstableloader to use SSL on the native port (CASSANDRA-14904) +Merged from 3.0: + * cqlsh return non-zero status when STDIN CQL fails (CASSANDRA-15623) + * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690) * Memtable memory allocations may deadlock (CASSANDRA-15367) * Run evictFromMembership in GossipStage (CASSANDRA-15592) Merged from 2.2: diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index da64a0c,fa3f262..3faa253 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@@ -65,8 -65,14 +65,14 @@@ abstract class AbstractQueryPager<T ext return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - ReadCommand readCommand = nextPageReadCommand(pageSize); - if (readCommand == null) + Pager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec()); - return Transformation.apply(nextPageReadQuery(pageSize).execute(consistency, clientState, queryStartNanoTime), pager); ++ ReadQuery readQuery = nextPageReadQuery(pageSize); ++ if (readQuery == null) + { + exhausted = true; + return EmptyIterators.partition(); + } - return Transformation.apply(readCommand.execute(consistency, clientState, queryStartNanoTime), pager); ++ return Transformation.apply(readQuery.execute(consistency, clientState, queryStartNanoTime), pager); } public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) @@@ -75,19 -81,30 +81,30 @@@ return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - ReadCommand readCommand = nextPageReadCommand(pageSize); - if (readCommand == null) + RowPager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec()); - return Transformation.apply(nextPageReadQuery(pageSize).executeInternal(executionController), pager); ++ ReadQuery readQuery = nextPageReadQuery(pageSize); ++ if (readQuery == null) + { + exhausted = true; + return EmptyIterators.partition(); + } - return Transformation.apply(readCommand.executeInternal(executionController), pager); ++ return Transformation.apply(readQuery.executeInternal(executionController), pager); } - public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadExecutionController executionController) + public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController) { if (isExhausted()) - return EmptyIterators.unfilteredPartition(cfm, false); + return EmptyIterators.unfilteredPartition(metadata); pageSize = Math.min(pageSize, remaining); - UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec()); - ReadCommand readCommand = nextPageReadCommand(pageSize); - if (readCommand == null) + UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), query.nowInSec()); - - return Transformation.apply(nextPageReadQuery(pageSize).executeLocally(executionController), pager); ++ ReadQuery readQuery = nextPageReadQuery(pageSize); ++ if (readQuery == null) + { + exhausted = true; - return EmptyIterators.unfilteredPartition(cfm, false); ++ return EmptyIterators.unfilteredPartition(metadata); + } - return Transformation.apply(readCommand.executeLocally(executionController), pager); ++ return Transformation.apply(readQuery.executeLocally(executionController), pager); } private class UnfilteredPager extends Pager<Unfiltered> diff --cc src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index cebf3c6,75f76cb..4f1e0e7 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@@ -82,8 -86,13 +82,13 @@@ public class PartitionRangeQueryPager e if (lastReturnedKey == null) { pageRange = fullRange; - limits = command.limits().forPaging(pageSize); + limits = query.limits().forPaging(pageSize); } + // if the last key was the one of the end of the range we know that we are done + else if (lastReturnedKey.equals(fullRange.keyRange().right) && remainingInPartition() == 0 && lastReturnedRow == null) + { + return null; + } else { // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother. diff --cc test/unit/org/apache/cassandra/cql3/DistinctQueryPagingTest.java index 0000000,6f0477d..61ac66b mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/cql3/DistinctQueryPagingTest.java +++ b/test/unit/org/apache/cassandra/cql3/DistinctQueryPagingTest.java @@@ -1,0 -1,262 +1,231 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.cql3; + + import org.junit.Assert; + import org.junit.Test; + + public class DistinctQueryPagingTest extends CQLTester + { + /** + * Migrated from cql_tests.py:TestCQL.test_select_distinct() + */ + @Test + public void testSelectDistinct() throws Throwable + { - // Test a regular(CQL3) table. ++ // Test a regular (CQL3) table. + createTable("CREATE TABLE %s (pk0 int, pk1 int, ck0 int, val int, PRIMARY KEY((pk0, pk1), ck0))"); + + for (int i = 0; i < 3; i++) + { + execute("INSERT INTO %s (pk0, pk1, ck0, val) VALUES (?, ?, 0, 0)", i, i); + execute("INSERT INTO %s (pk0, pk1, ck0, val) VALUES (?, ?, 1, 1)", i, i); + } + + assertRows(execute("SELECT DISTINCT pk0, pk1 FROM %s LIMIT 1"), + row(0, 0)); + + assertRows(execute("SELECT DISTINCT pk0, pk1 FROM %s LIMIT 3"), + row(0, 0), + row(2, 2), + row(1, 1)); + + // Test selection validation. + assertInvalidMessage("queries must request all the partition key columns", "SELECT DISTINCT pk0 FROM %s"); + assertInvalidMessage("queries must only request partition key columns", "SELECT DISTINCT pk0, pk1, ck0 FROM %s"); - - //Test a 'compact storage' table. - createTable("CREATE TABLE %s (pk0 int, pk1 int, val int, PRIMARY KEY((pk0, pk1))) WITH COMPACT STORAGE"); - - for (int i = 0; i < 3; i++) - execute("INSERT INTO %s (pk0, pk1, val) VALUES (?, ?, ?)", i, i, i); - - assertRows(execute("SELECT DISTINCT pk0, pk1 FROM %s LIMIT 1"), - row(0, 0)); - - assertRows(execute("SELECT DISTINCT pk0, pk1 FROM %s LIMIT 3"), - row(0, 0), - row(2, 2), - row(1, 1)); - - // Test a 'wide row' thrift table. - createTable("CREATE TABLE %s (pk int, name text, val int, PRIMARY KEY(pk, name)) WITH COMPACT STORAGE"); - - for (int i = 0; i < 3; i++) - { - execute("INSERT INTO %s (pk, name, val) VALUES (?, 'name0', 0)", i); - execute("INSERT INTO %s (pk, name, val) VALUES (?, 'name1', 1)", i); - } - - assertRows(execute("SELECT DISTINCT pk FROM %s LIMIT 1"), - row(1)); - - assertRows(execute("SELECT DISTINCT pk FROM %s LIMIT 3"), - row(1), - row(0), - row(2)); + } + + /** + * Migrated from cql_tests.py:TestCQL.test_select_distinct_with_deletions() + */ + @Test + public void testSelectDistinctWithDeletions() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, c int, v int)"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", i, i, i); + + Object[][] rows = getRows(execute("SELECT DISTINCT k FROM %s")); + Assert.assertEquals(10, rows.length); + Object key_to_delete = rows[3][0]; + + execute("DELETE FROM %s WHERE k=?", key_to_delete); + + rows = getRows(execute("SELECT DISTINCT k FROM %s")); + Assert.assertEquals(9, rows.length); + + rows = getRows(execute("SELECT DISTINCT k FROM %s LIMIT 5")); + Assert.assertEquals(5, rows.length); + + rows = getRows(execute("SELECT DISTINCT k FROM %s")); + Assert.assertEquals(9, rows.length); + } + + @Test + public void testSelectDistinctWithWhereClause() throws Throwable { + createTable("CREATE TABLE %s (k int, a int, b int, PRIMARY KEY (k, a))"); + createIndex("CREATE INDEX ON %s (b)"); + + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?)", i, i, i); + execute("INSERT INTO %s (k, a, b) VALUES (?, ?, ?)", i, i * 10, i * 10); + } + + String distinctQueryErrorMsg = "SELECT DISTINCT with WHERE clause only supports restriction by partition key and/or static columns."; + assertInvalidMessage(distinctQueryErrorMsg, + "SELECT DISTINCT k FROM %s WHERE a >= 80 ALLOW FILTERING"); + + assertInvalidMessage(distinctQueryErrorMsg, + "SELECT DISTINCT k FROM %s WHERE k IN (1, 2, 3) AND a = 10"); + + assertInvalidMessage(distinctQueryErrorMsg, + "SELECT DISTINCT k FROM %s WHERE b = 5"); + + assertRows(execute("SELECT DISTINCT k FROM %s WHERE k = 1"), + row(1)); + assertRows(execute("SELECT DISTINCT k FROM %s WHERE k IN (5, 6, 7)"), + row(5), + row(6), + row(7)); + + // With static columns + createTable("CREATE TABLE %s (k int, a int, s int static, b int, PRIMARY KEY (k, a))"); + createIndex("CREATE INDEX ON %s (b)"); + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (k, a, b, s) VALUES (?, ?, ?, ?)", i, i, i, i); + execute("INSERT INTO %s (k, a, b, s) VALUES (?, ?, ?, ?)", i, i * 10, i * 10, i * 10); + } + + assertRows(execute("SELECT DISTINCT s FROM %s WHERE k = 5"), + row(50)); + assertRows(execute("SELECT DISTINCT s FROM %s WHERE k IN (5, 6, 7)"), + row(50), + row(60), + row(70)); + } + + @Test + public void testSelectDistinctWithWhereClauseOnStaticColumn() throws Throwable + { + createTable("CREATE TABLE %s (k int, a int, s int static, s1 int static, b int, PRIMARY KEY (k, a))"); + + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (k, a, b, s, s1) VALUES (?, ?, ?, ?, ?)", i, i, i, i, i); + execute("INSERT INTO %s (k, a, b, s, s1) VALUES (?, ?, ?, ?, ?)", i, i * 10, i * 10, i * 10, i * 10); + } + + execute("INSERT INTO %s (k, a, b, s, s1) VALUES (?, ?, ?, ?, ?)", 2, 10, 10, 10, 10); + + beforeAndAfterFlush(() -> { + assertRows(execute("SELECT DISTINCT k, s, s1 FROM %s WHERE s = 90 AND s1 = 90 ALLOW FILTERING"), + row(9, 90, 90)); + + assertRows(execute("SELECT DISTINCT k, s, s1 FROM %s WHERE s = 90 AND s1 = 90 ALLOW FILTERING"), + row(9, 90, 90)); + + assertRows(execute("SELECT DISTINCT k, s, s1 FROM %s WHERE s = 10 AND s1 = 10 ALLOW FILTERING"), + row(1, 10, 10), + row(2, 10, 10)); + + assertRows(execute("SELECT DISTINCT k, s, s1 FROM %s WHERE k = 1 AND s = 10 AND s1 = 10 ALLOW FILTERING"), + row(1, 10, 10)); + }); + } + + @Test + public void testSelectDistinctWithStaticColumnsAndPaging() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, s int static, c int, d int, primary key (a, b));"); + + // Test with only static data + for (int i = 0; i < 5; i++) + execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i); + + testSelectDistinctWithPaging(); + + // Test with a mix of partition with rows and partitions without rows + for (int i = 0; i < 5; i++) + { + if (i % 2 == 0) + { + for (int j = 1; j < 4; j++) + { + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", i, j, j, i + j); + } + } + } + + testSelectDistinctWithPaging(); + + // Test with all partition with rows + for (int i = 0; i < 5; i++) + { + for (int j = 1; j < 4; j++) + { + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", i, j, j, i + j); + } + } + + testSelectDistinctWithPaging(); + } + + private void testSelectDistinctWithPaging() throws Throwable + { + for (int pageSize = 1; pageSize < 7; pageSize++) + { + // Range query + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s", pageSize), + row(1, 1), + row(0, 0), + row(2, 2), + row(4, 4), + row(3, 3)); + + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s LIMIT 3", pageSize), + row(1, 1), + row(0, 0), + row(2, 2)); + + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s WHERE s >= 2 ALLOW FILTERING", pageSize), + row(2, 2), + row(4, 4), + row(3, 3)); + + // Multi partition query + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s WHERE a IN (1, 2, 3, 4);", pageSize), + row(1, 1), + row(2, 2), + row(3, 3), + row(4, 4)); + + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s WHERE a IN (1, 2, 3, 4) LIMIT 3;", pageSize), + row(1, 1), + row(2, 2), + row(3, 3)); + + assertRowsNet(executeNetWithPaging("SELECT DISTINCT a, s FROM %s WHERE a IN (1, 2, 3, 4) AND s >= 2 ALLOW FILTERING;", pageSize), + row(2, 2), + row(3, 3), + row(4, 4)); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org