Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 3615da583 -> 5034f4495 refs/heads/trunk a4fe865d4 -> 74ee6684c
Fix Thrift CAS operations for columns with defined metadata Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for CASSANDRA-10576 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5034f449 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5034f449 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5034f449 Branch: refs/heads/cassandra-3.0 Commit: 5034f449575a4648f67b50c1ff535518ce2f00e9 Parents: 3615da5 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Fri Oct 23 13:51:29 2015 +0100 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Wed Oct 28 17:11:04 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/thrift/CassandraServer.java | 40 +++++++++++++------- 2 files changed, 28 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5034f449/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c51cb51..4ead854 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Fix thrift cas operations with defined columns (CASSANDRA-10576) * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606) * Fix thrift get() queries with defined columns (CASSANDRA-10586) * Fix marking of indexes as built and removed (CASSANDRA-10601) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5034f449/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 2a3f73d..02c0871 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -945,15 +945,11 @@ public class CassandraServer implements Cassandra.Iface // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates); - FilteredPartition partitionExpected = null; - if (!expected.isEmpty()) - partitionExpected = FilteredPartition.create(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, expected, nowInSec).iterator(), nowInSec)); - schedule(DatabaseDescriptor.getWriteRpcTimeout()); try (RowIterator result = StorageProxy.cas(cState.getKeyspace(), column_family, dk, - new ThriftCASRequest(partitionExpected, partitionUpdates), + new ThriftCASRequest(toLegacyCells(metadata, expected, nowInSec), partitionUpdates, nowInSec), ThriftConversion.fromThrift(serial_consistency_level), ThriftConversion.fromThrift(commit_consistency_level), cState)) @@ -2509,21 +2505,22 @@ public class CassandraServer implements Cassandra.Iface { private final CFMetaData metadata; private final DecoratedKey key; - - private final FilteredPartition expected; + private final List<LegacyLayout.LegacyCell> expected; private final PartitionUpdate updates; + private final int nowInSec; - private ThriftCASRequest(FilteredPartition expected, PartitionUpdate updates) + private ThriftCASRequest(List<LegacyLayout.LegacyCell> expected, PartitionUpdate updates, int nowInSec) { this.metadata = updates.metadata(); this.key = updates.partitionKey(); this.expected = expected; this.updates = updates; + this.nowInSec = nowInSec; } public SinglePartitionReadCommand readCommand(int nowInSec) { - if (expected == null || expected.isEmpty()) + if (expected.isEmpty()) { // We want to know if the partition exists, so just fetch a single cell. ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false); @@ -2533,9 +2530,13 @@ public class CassandraServer implements Cassandra.Iface // Gather the clustering for the expected values and query those. BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator); - for (Row row : expected) + FilteredPartition expectedPartition = + FilteredPartition.create(LegacyLayout.toRowIterator(metadata, key, expected.iterator(), nowInSec)); + + for (Row row : expectedPartition) clusterings.add(row.clustering()); - PartitionColumns columns = expected.staticRow().isEmpty() + + PartitionColumns columns = expectedPartition.staticRow().isEmpty() ? metadata.partitionColumns().withoutStatics() : metadata.partitionColumns(); ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false); @@ -2544,13 +2545,21 @@ public class CassandraServer implements Cassandra.Iface public boolean appliesTo(FilteredPartition current) { - if (expected == null || expected.isEmpty()) + if (expected.isEmpty()) return current.isEmpty(); else if (current.isEmpty()) return false; + // Push the expected results through ThriftResultsMerger to translate any static + // columns into clusterings. The current partition is retrieved in the same so + // unless they're both handled the same, they won't match. + FilteredPartition expectedPartition = + FilteredPartition.create( + UnfilteredRowIterators.filter( + ThriftResultsMerger.maybeWrap(expectedToUnfilteredRowIterator(), nowInSec), nowInSec)); + // Check that for everything we expected, the fetched values exists and correspond. - for (Row e : expected) + for (Row e : expectedPartition) { Row c = current.getRow(e.clustering()); if (c == null) @@ -2588,5 +2597,10 @@ public class CassandraServer implements Cassandra.Iface { return updates; } + + private UnfilteredRowIterator expectedToUnfilteredRowIterator() + { + return LegacyLayout.toUnfilteredRowIterator(metadata, key, LegacyLayout.LegacyDeletionInfo.live(), expected.iterator()); + } } }