Repository: cassandra
Updated Branches:
  refs/heads/trunk 204442452 -> 9aace4836


Explicitly use Long.MAX_VALUE timestamp for counter deletions

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-7346


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fe75576
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fe75576
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fe75576

Branch: refs/heads/trunk
Commit: 5fe7557627fac6ace2554a4f8ef552c9d9512490
Parents: deaf5ba
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Wed Jun 11 09:47:22 2014 -0500
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Wed Jun 11 09:47:22 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../cassandra/cql/AbstractModification.java     |  9 ++++--
 .../apache/cassandra/cql/DeleteStatement.java   | 27 ++++++++++--------
 .../apache/cassandra/cql/UpdateStatement.java   |  7 -----
 .../apache/cassandra/cql3/UpdateParameters.java | 19 ++++++++++---
 .../cql3/statements/DeleteStatement.java        |  2 +-
 .../apache/cassandra/db/CounterMutation.java    |  3 ++
 .../cassandra/thrift/CassandraServer.java       | 29 ++++++++++++--------
 8 files changed, 59 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b70782b..a8a84d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.0
+ * Explicitly use Long.MAX_VALUE timestamp for counter deletions
+   (CASSANDRA-7346)
  * Fix native protocol CAS batches (CASSANDRA-7337)
  * Reduce likelihood of contention on local paxos locking (CASSANDRA-7359)
  * Upgrade to Pig 0.12.1 (CASSANDRA-6556)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/AbstractModification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AbstractModification.java 
b/src/java/org/apache/cassandra/cql/AbstractModification.java
index 8da2611..9b88b5e 100644
--- a/src/java/org/apache/cassandra/cql/AbstractModification.java
+++ b/src/java/org/apache/cassandra/cql/AbstractModification.java
@@ -107,8 +107,11 @@ public abstract class AbstractModification
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, List<ByteBuffer> variables)
-    throws InvalidRequestException, UnauthorizedException;
+    public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, List<ByteBuffer> variables)
+    throws InvalidRequestException, UnauthorizedException
+    {
+        return prepareRowMutations(keyspace, clientState, null, variables);
+    }
 
     /**
      * Convert statement into a list of mutations to apply on the server
@@ -121,6 +124,6 @@ public abstract class AbstractModification
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
+    public abstract List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, Long batchTimestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql/DeleteStatement.java
index 71942e4..e00ffc7 100644
--- a/src/java/org/apache/cassandra/cql/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.IMutation;
@@ -62,13 +63,7 @@ public class DeleteStatement extends AbstractModification
         return keys;
     }
 
-    public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, List<ByteBuffer> variables)
-    throws InvalidRequestException, UnauthorizedException
-    {
-        return prepareRowMutations(keyspace, clientState, null, variables);
-    }
-
-    public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
+    public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, Long batchTimestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         CFMetaData metadata = validateColumnFamily(keyspace, columnFamily);
@@ -79,22 +74,22 @@ public class DeleteStatement extends AbstractModification
         List<IMutation> mutations = new ArrayList<IMutation>(keys.size());
 
         for (Term key : keys)
-            mutations.add(mutationForKey(key.getByteBuffer(keyType, 
variables), keyspace, timestamp, clientState, variables, metadata));
+            mutations.add(mutationForKey(key.getByteBuffer(keyType, 
variables), keyspace, batchTimestamp, clientState, variables, metadata));
 
         return mutations;
     }
 
-    public Mutation mutationForKey(ByteBuffer key, String keyspace, Long 
timestamp, ThriftClientState clientState, List<ByteBuffer> variables, 
CFMetaData metadata)
+    public Mutation mutationForKey(ByteBuffer key, String keyspace, Long 
batchTimestamp, ThriftClientState clientState, List<ByteBuffer> variables, 
CFMetaData metadata)
     throws InvalidRequestException
     {
         Mutation mutation = new Mutation(keyspace, key);
 
         QueryProcessor.validateKeyAlias(metadata, keyName);
 
-        if (columns.size() < 1)
+        if (columns.isEmpty())
         {
             // No columns, delete the partition
-            mutation.delete(columnFamily, (timestamp == null) ? 
getTimestamp(clientState) : timestamp);
+            mutation.delete(columnFamily, batchTimestamp == null ? 
getTimestamp(clientState) : batchTimestamp);
         }
         else
         {
@@ -104,13 +99,21 @@ public class DeleteStatement extends AbstractModification
             {
                 CellName columnName = 
metadata.comparator.cellFromByteBuffer(column.getByteBuffer(at, variables));
                 validateColumnName(columnName);
-                mutation.delete(columnFamily, columnName, (timestamp == null) 
? getTimestamp(clientState) : timestamp);
+                mutation.delete(columnFamily, columnName, batchTimestamp == 
null ? getTimestamp(clientState) : batchTimestamp);
             }
         }
 
         return mutation;
     }
 
+    @Override
+    public long getTimestamp(ThriftClientState clientState)
+    {
+        return Schema.instance.getCFMetaData(keyspace, 
columnFamily).isCounter()
+             ? CounterMutation.TOMBSTONE_TIMESTAMP
+             : super.getTimestamp(clientState);
+    }
+
     public String toString()
     {
         return String.format("DeleteStatement(columns=%s, keyspace=%s, 
columnFamily=%s, consistency=%s keys=%s)",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index 16a0d76..8a995d2 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -119,13 +119,6 @@ public class UpdateStatement extends AbstractModification
     }
 
     /** {@inheritDoc} */
-    public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, List<ByteBuffer> variables)
-    throws InvalidRequestException, UnauthorizedException
-    {
-        return prepareRowMutations(keyspace, clientState, null, variables);
-    }
-
-    /** {@inheritDoc} */
     public List<IMutation> prepareRowMutations(String keyspace, 
ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8a47536..3f0c2b9 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
- * A simple container that simplify passing parameters for collections methods.
+ * A simple container that simplifies passing parameters for (mostly) 
collections methods.
  */
 public class UpdateParameters
 {
@@ -61,21 +61,27 @@ public class UpdateParameters
     public Cell makeTombstone(CellName name) throws InvalidRequestException
     {
         QueryProcessor.validateCellName(name, metadata.comparator);
-        return new BufferDeletedCell(name, localDeletionTime, timestamp);
+        return new BufferDeletedCell(name, localDeletionTime, 
tombstoneTimestamp());
     }
 
     public RangeTombstone makeRangeTombstone(ColumnSlice slice) throws 
InvalidRequestException
     {
         QueryProcessor.validateComposite(slice.start, metadata.comparator);
         QueryProcessor.validateComposite(slice.finish, metadata.comparator);
-        return new RangeTombstone(slice.start, slice.finish, timestamp, 
localDeletionTime);
+        return new RangeTombstone(slice.start, slice.finish, 
tombstoneTimestamp(), localDeletionTime);
     }
 
     public RangeTombstone makeTombstoneForOverwrite(ColumnSlice slice) throws 
InvalidRequestException
     {
         QueryProcessor.validateComposite(slice.start, metadata.comparator);
         QueryProcessor.validateComposite(slice.finish, metadata.comparator);
-        return new RangeTombstone(slice.start, slice.finish, timestamp - 1, 
localDeletionTime);
+        // As of 2.1, will never be called for a counter table. However, in 
3.0, CASSANDRA-6506 might change that, so play safe.
+        return new RangeTombstone(slice.start, slice.finish, 
tombstoneTimestamp() - 1, localDeletionTime);
+    }
+
+    public DeletionInfo makeDeletionInfo()
+    {
+        return new DeletionInfo(tombstoneTimestamp(), localDeletionTime);
     }
 
     public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier 
cql3ColumnName)
@@ -86,4 +92,9 @@ public class UpdateParameters
         CQL3Row row = prefetchedLists.get(rowKey);
         return row == null ? Collections.<Cell>emptyList() : 
row.getCollection(cql3ColumnName);
     }
+
+    private long tombstoneTimestamp()
+    {
+        return metadata.isCounter() ? CounterMutation.TOMBSTONE_TIMESTAMP : 
timestamp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 49fdfc2..1569ae4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -66,7 +66,7 @@ public class DeleteStatement extends ModificationStatement
             if (prefix.isEmpty())
             {
                 // No columns specified, delete the row
-                cf.delete(new DeletionInfo(params.timestamp, 
params.localDeletionTime));
+                cf.delete(params.makeDeletionInfo());
             }
             else if (cfm.comparator.isDense() && prefix.size() == 
cfm.clusteringColumns().size())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 2bfdd4e..848e4db 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.*;
 
 public class CounterMutation implements IMutation
 {
+    // Counter deletions are final in C*, because there is no way to provide 
reliable deletion otherwise.
+    public static final long TOMBSTONE_TIMESTAMP = Long.MAX_VALUE;
+
     public static final CounterMutationSerializer serializer = new 
CounterMutationSerializer();
 
     private static final Striped<Lock> LOCKS = 
Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1a77ffa..49466bd 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -907,16 +907,18 @@ public class CassandraServer implements Cassandra.Iface
 
     private void deleteColumnOrSuperColumn(org.apache.cassandra.db.Mutation 
mutation, CFMetaData cfm, Deletion del)
     {
+        long timestamp = cfm.isCounter() ? CounterMutation.TOMBSTONE_TIMESTAMP 
: del.timestamp;
+
         if (del.predicate != null && del.predicate.column_names != null)
         {
             for (ByteBuffer c : del.predicate.column_names)
             {
                 if (del.super_column == null && cfm.isSuper())
-                    mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), 
SuperColumns.endOf(c), del.timestamp);
+                    mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), 
SuperColumns.endOf(c), timestamp);
                 else if (del.super_column != null)
-                    mutation.delete(cfm.cfName, 
cfm.comparator.makeCellName(del.super_column, c), del.timestamp);
+                    mutation.delete(cfm.cfName, 
cfm.comparator.makeCellName(del.super_column, c), timestamp);
                 else
-                    mutation.delete(cfm.cfName, 
cfm.comparator.cellFromByteBuffer(c), del.timestamp);
+                    mutation.delete(cfm.cfName, 
cfm.comparator.cellFromByteBuffer(c), timestamp);
             }
         }
         else if (del.predicate != null && del.predicate.slice_range != null)
@@ -925,24 +927,27 @@ public class CassandraServer implements Cassandra.Iface
                 mutation.deleteRange(cfm.cfName,
                                      
SuperColumns.startOf(del.predicate.getSlice_range().start),
                                      
SuperColumns.startOf(del.predicate.getSlice_range().finish),
-                                     del.timestamp);
+                                     timestamp);
             else if (del.super_column != null)
-                mutation.deleteRange(cfm.cfName,
-                                     
cfm.comparator.makeCellName(del.super_column, 
del.predicate.getSlice_range().start),
-                                     
cfm.comparator.makeCellName(del.super_column, 
del.predicate.getSlice_range().finish),
-                                     del.timestamp);
+                    mutation.deleteRange(cfm.cfName,
+                                         
cfm.comparator.makeCellName(del.super_column, 
del.predicate.getSlice_range().start),
+                                         
cfm.comparator.makeCellName(del.super_column, 
del.predicate.getSlice_range().finish),
+                                         timestamp);
             else
                 mutation.deleteRange(cfm.cfName,
                                      
cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().start),
                                      
cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().finish),
-                                     del.timestamp);
+                                     timestamp);
         }
         else
         {
             if (del.super_column != null)
-                mutation.deleteRange(cfm.cfName, 
SuperColumns.startOf(del.super_column), SuperColumns.endOf(del.super_column), 
del.timestamp);
+                mutation.deleteRange(cfm.cfName,
+                                     SuperColumns.startOf(del.super_column),
+                                     SuperColumns.endOf(del.super_column),
+                                     timestamp);
             else
-                mutation.delete(cfm.cfName, del.timestamp);
+                mutation.delete(cfm.cfName, timestamp);
         }
     }
 
@@ -1830,7 +1835,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_remove(key, path, System.currentTimeMillis(), 
consistency_level, true);
+            internal_remove(key, path, CounterMutation.TOMBSTONE_TIMESTAMP, 
consistency_level, true);
         }
         catch (RequestValidationException e)
         {

Reply via email to