Repository: cassandra Updated Branches: refs/heads/trunk 45afe1d93 -> 5bd5e25fb
Fix memtable copy bug causing rangetombstones to be missed patch by tjake; reviewed by benedict got CASSANDRA-7371 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a14a01c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a14a01c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a14a01c9 Branch: refs/heads/trunk Commit: a14a01c924e4db8e084ba313149d35149dc53825 Parents: a9da3fd Author: Jake Luciani <j...@apache.org> Authored: Mon Jun 16 16:10:06 2014 -0400 Committer: Jake Luciani <j...@apache.org> Committed: Mon Jun 16 16:10:06 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../cql3/statements/BatchStatement.java | 17 ++- .../apache/cassandra/db/AtomicBTreeColumns.java | 8 +- .../org/apache/cassandra/db/DeletionInfo.java | 12 ++ .../apache/cassandra/db/RangeTombstoneList.java | 21 +++ test/conf/cassandra.yaml | 2 + .../org/apache/cassandra/cql3/DeleteTest.java | 148 +++++++++++++++++++ 7 files changed, 201 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 35c02f9..fd7c62b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,7 +25,7 @@ Merged from 1.2: * Check internal addresses for seeds (CASSANDRA-6523) * Fix potential / by 0 in HHOM page size calculation (CASSANDRA-7354) * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328) - + * Fix RangeTombstone copy bug (CASSANDRA-7371) 2.1.0-rc1 * Revert flush directory (CASSANDRA-6357) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index e513aef..3cec81b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -23,6 +23,7 @@ import java.util.*; import com.google.common.base.Function; import com.google.common.collect.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.Frame; import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +123,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache return statements; } - private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now) + private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, Frame sourceFrame) throws RequestExecutionException, RequestValidationException { Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>(); @@ -131,7 +132,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache ModificationStatement statement = statements.get(i); QueryOptions statementOptions = options.forStatement(i); long timestamp = attrs.getTimestamp(now, statementOptions); - addStatementMutations(statement, statementOptions, local, timestamp, mutations); + addStatementMutations(statement, statementOptions, local, timestamp, mutations, sourceFrame); } return unzipMutations(mutations); } @@ -152,7 +153,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache QueryOptions options, boolean local, long now, - Map<String, Map<ByteBuffer, IMutation>> mutations) + Map<String, Map<ByteBuffer, IMutation>> mutations, + Frame sourceFrame) throws RequestExecutionException, RequestValidationException { String ksName = statement.keyspace(); @@ -177,6 +179,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache if (mutation == null) { mut = new Mutation(ksName, key); + mut.setSourceFrame(sourceFrame); mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut; ksMap.put(key, mutation); } @@ -219,10 +222,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException { - return execute(options, false, options.getTimestamp(queryState)); + return execute(queryState, options, false, options.getTimestamp(queryState)); } - public ResultMessage execute(BatchQueryOptions options, boolean local, long now) + private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now) throws RequestExecutionException, RequestValidationException { if (options.getConsistency() == null) @@ -233,7 +236,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache if (hasConditions) return executeWithConditions(options, now); - executeWithoutConditions(getMutations(options, local, now), options.getConsistency()); + executeWithoutConditions(getMutations(options, local, now, queryState.getSourceFrame()), options.getConsistency()); return new ResultMessage.Void(); } @@ -309,7 +312,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException { assert !hasConditions; - for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp())) + for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryState.getSourceFrame())) { // We don't use counters internally. assert mutation instanceof Mutation; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index 27eb46d..0e38784 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -38,6 +38,7 @@ import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; import org.apache.cassandra.utils.memory.MemtableAllocator; import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; @@ -163,6 +164,8 @@ public class AtomicBTreeColumns extends ColumnFamily public long addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) { ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer); + DeletionInfo inputDeletionInfoCopy = null; + while (true) { Holder current = ref; @@ -172,7 +175,10 @@ public class AtomicBTreeColumns extends ColumnFamily DeletionInfo deletionInfo; if (cm.deletionInfo().mayModify(current.deletionInfo)) { - deletionInfo = current.deletionInfo.copy().add(cm.deletionInfo()); + if (inputDeletionInfoCopy == null) + inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance); + + deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index a167b85..b8988ec 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -33,6 +33,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; +import org.apache.cassandra.utils.memory.MemtableAllocator; /** * A combination of a top-level (or row) tombstone and range tombstones describing the deletions @@ -102,6 +104,16 @@ public class DeletionInfo implements IMeasurableMemory return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy()); } + public DeletionInfo copy(AbstractAllocator allocator) + { + + RangeTombstoneList rangesCopy = null; + if (ranges != null) + rangesCopy = ranges.copy(allocator); + + return new DeletionInfo(topLevel, rangesCopy); + } + /** * Returns whether this DeletionInfo is live, that is deletes no columns. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index b06c520..757a1d0 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -37,6 +37,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; +import org.apache.cassandra.utils.memory.HeapPool; /** * Data structure holding the range tombstones of a ColumnFamily. @@ -114,6 +116,25 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable boundaryHeapSize, size); } + public RangeTombstoneList copy(AbstractAllocator allocator) + { + RangeTombstoneList copy = new RangeTombstoneList(comparator, + new Composite[size], + new Composite[size], + Arrays.copyOf(markedAts, size), + Arrays.copyOf(delTimes, size), + boundaryHeapSize, size); + + + for (int i = 0; i < size; i++) + { + copy.starts[i] = starts[i].copy(null, allocator); + copy.ends[i] = ends[i].copy(null, allocator); + } + + return copy; + } + public void add(RangeTombstone tombstone) { add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index b766a64..72a357d 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -11,6 +11,8 @@ partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 storage_port: 7010 rpc_port: 9170 +start_native_transport: true +native_transport_port: 9042 column_index_size_in_kb: 4 commitlog_directory: build/test/cassandra/commitlog saved_caches_directory: build/test/cassandra/saved_caches http://git-wip-us.apache.org/repos/asf/cassandra/blob/a14a01c9/test/unit/org/apache/cassandra/cql3/DeleteTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/DeleteTest.java new file mode 100644 index 0000000..3395dcc --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/DeleteTest.java @@ -0,0 +1,148 @@ +package org.apache.cassandra.cql3; + + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class DeleteTest extends SchemaLoader +{ + + private static EmbeddedCassandraService cassandra; + + private static Cluster cluster; + private static Session session; + private static PreparedStatement pstmtI; + private static PreparedStatement pstmtU; + private static PreparedStatement pstmtD; + private static PreparedStatement pstmt1; + private static PreparedStatement pstmt2; + private static PreparedStatement pstmt3; + private static PreparedStatement pstmt4; + private static PreparedStatement pstmt5; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + + Schema.instance.clear(); // Schema are now written on disk and will be reloaded + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute("drop keyspace if exists junit;"); + session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };"); + session.execute("CREATE TABLE junit.tpc_base (\n" + + " id int ,\n" + + " cid int ,\n" + + " val text ,\n" + + " PRIMARY KEY ( ( id ), cid )\n" + + ");"); + session.execute("CREATE TABLE junit.tpc_inherit_a (\n" + + " id int ,\n" + + " cid int ,\n" + + " inh_a text ,\n" + + " val text ,\n" + + " PRIMARY KEY ( ( id ), cid )\n" + + ");"); + session.execute("CREATE TABLE junit.tpc_inherit_b (\n" + + " id int ,\n" + + " cid int ,\n" + + " inh_b text ,\n" + + " val text ,\n" + + " PRIMARY KEY ( ( id ), cid )\n" + + ");"); + session.execute("CREATE TABLE junit.tpc_inherit_b2 (\n" + + " id int ,\n" + + " cid int ,\n" + + " inh_b text ,\n" + + " inh_b2 text ,\n" + + " val text ,\n" + + " PRIMARY KEY ( ( id ), cid )\n" + + ");"); + session.execute("CREATE TABLE junit.tpc_inherit_c (\n" + + " id int ,\n" + + " cid int ,\n" + + " inh_c text ,\n" + + " val text ,\n" + + " PRIMARY KEY ( ( id ), cid )\n" + + ");"); + try { + Thread.sleep(2000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + pstmtI = session.prepare("insert into junit.tpc_inherit_b ( id, cid, inh_b, val) values (?, ?, ?, ?)"); + pstmtU = session.prepare("update junit.tpc_inherit_b set inh_b=?, val=? where id=? and cid=?"); + pstmtD = session.prepare("delete from junit.tpc_inherit_b where id=? and cid=?"); + pstmt1 = session.prepare("select id, cid, val from junit.tpc_base where id=? and cid=?"); + pstmt2 = session.prepare("select id, cid, inh_a, val from junit.tpc_inherit_a where id=? and cid=?"); + pstmt3 = session.prepare("select id, cid, inh_b, val from junit.tpc_inherit_b where id=? and cid=?"); + pstmt4 = session.prepare("select id, cid, inh_b, inh_b2, val from junit.tpc_inherit_b2 where id=? and cid=?"); + pstmt5 = session.prepare("select id, cid, inh_c, val from junit.tpc_inherit_c where id=? and cid=?"); + } + + + + @Test + public void lostDeletesTest() + { + + for (int i = 0; i < 500; i++) + { + session.execute(pstmtI.bind(1, 1, "inhB", "valB")); + + ResultSetFuture[] futures = load(); + + Assert.assertTrue(futures[0].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[1].getUninterruptibly().isExhausted()); + Assert.assertNotNull(futures[2].getUninterruptibly().one()); + Assert.assertTrue(futures[3].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[4].getUninterruptibly().isExhausted()); + + session.execute(pstmtU.bind("inhBu", "valBu", 1, 1)); + + futures = load(); + + Assert.assertTrue(futures[0].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[1].getUninterruptibly().isExhausted()); + Assert.assertNotNull(futures[2].getUninterruptibly().one()); + Assert.assertTrue(futures[3].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[4].getUninterruptibly().isExhausted()); + + session.execute(pstmtD.bind(1, 1)); + + futures = load(); + + Assert.assertTrue(futures[0].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[1].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[2].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[3].getUninterruptibly().isExhausted()); + Assert.assertTrue(futures[4].getUninterruptibly().isExhausted()); + } + } + + private ResultSetFuture[] load() { + return new ResultSetFuture[]{ + session.executeAsync(pstmt1.bind(1, 1)), + session.executeAsync(pstmt2.bind(1, 1)), + session.executeAsync(pstmt3.bind(1, 1)), + session.executeAsync(pstmt4.bind(1, 1)), + session.executeAsync(pstmt5.bind(1, 1)) + }; + } +}