Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed May 
26 04:45:44 2010
@@ -36,6 +36,7 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.Deletion;
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.config.CFMetaData;
@@ -100,7 +101,7 @@ public class RowMutation
     void addHints(byte[] key, byte[] host) throws IOException
     {
         QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key, 
host);
-        add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
+        add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new 
TimestampClock(System.currentTimeMillis()));
     }
 
     /*
@@ -133,10 +134,10 @@ public class RowMutation
      *
      * param @ cf - column name as <column family>:<column>
      * param @ value - value associated with the column
-     * param @ timestamp - timestamp associated with this data.
+     * param @ clock - clock associated with this data.
      * param @ timeToLive - ttl for the column, 0 for standard (non expiring) 
columns
     */
-    public void add(QueryPath path, byte[] value, long timestamp, int 
timeToLive)
+    public void add(QueryPath path, byte[] value, IClock clock, int timeToLive)
     {
         Integer id = CFMetaData.getId(table_, path.columnFamilyName);
         ColumnFamily columnFamily = modifications_.get(id);
@@ -145,15 +146,15 @@ public class RowMutation
             columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
             modifications_.put(id, columnFamily);
         }
-        columnFamily.addColumn(path, value, timestamp, timeToLive);
+        columnFamily.addColumn(path, value, clock, timeToLive);
     }
 
-    public void add(QueryPath path, byte[] value, long timestamp)
+    public void add(QueryPath path, byte[] value, IClock clock)
     {
-        add(path, value, timestamp, 0);
+        add(path, value, clock, 0);
     }
 
-    public void delete(QueryPath path, long timestamp)
+    public void delete(QueryPath path, IClock clock)
     {
         Integer id = CFMetaData.getId(table_, path.columnFamilyName);
 
@@ -168,17 +169,18 @@ public class RowMutation
 
         if (path.superColumnName == null && path.columnName == null)
         {
-            columnFamily.delete(localDeleteTime, timestamp);
+            columnFamily.delete(localDeleteTime, clock);
         }
         else if (path.columnName == null)
         {
-            SuperColumn sc = new SuperColumn(path.superColumnName, 
columnFamily.getSubComparator());
-            sc.markForDeleteAt(localDeleteTime, timestamp);
+            SuperColumn sc = new SuperColumn(path.superColumnName, 
columnFamily.getSubComparator(), 
+                    columnFamily.getClockType());
+            sc.markForDeleteAt(localDeleteTime, clock);
             columnFamily.addColumn(sc);
         }
         else
         {
-            columnFamily.deleteColumn(path, localDeleteTime, timestamp);
+            columnFamily.deleteColumn(path, localDeleteTime, clock);
         }
     }
 
@@ -247,13 +249,13 @@ public class RowMutation
                     assert cosc.super_column != null;
                     for (org.apache.cassandra.thrift.Column column : 
cosc.super_column.columns)
                     {
-                        rm.add(new QueryPath(cfName, cosc.super_column.name, 
column.name), column.value, column.timestamp, column.ttl);
+                        rm.add(new QueryPath(cfName, cosc.super_column.name, 
column.name), column.value, unthriftifyClock(column.clock), column.ttl);
                     }
                 }
                 else
                 {
                     assert cosc.super_column == null;
-                    rm.add(new QueryPath(cfName, null, cosc.column.name), 
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
+                    rm.add(new QueryPath(cfName, null, cosc.column.name), 
cosc.column.value, unthriftifyClock(cosc.column.clock), cosc.column.ttl);
                 }
             }
         }
@@ -299,12 +301,12 @@ public class RowMutation
         {
             for (org.apache.cassandra.thrift.Column column : 
cosc.super_column.columns)
             {
-                rm.add(new QueryPath(cfName, cosc.super_column.name, 
column.name), column.value, column.timestamp, column.ttl);
+                rm.add(new QueryPath(cfName, cosc.super_column.name, 
column.name), column.value, unthriftifyClock(column.clock), column.ttl);
             }
         }
         else
         {
-            rm.add(new QueryPath(cfName, null, cosc.column.name), 
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
+            rm.add(new QueryPath(cfName, null, cosc.column.name), 
cosc.column.value, unthriftifyClock(cosc.column.clock), cosc.column.ttl);
         }
     }
 
@@ -315,16 +317,21 @@ public class RowMutation
             for(byte[] c : del.predicate.column_names)
             {
                 if (del.super_column == null && 
DatabaseDescriptor.getColumnFamilyType(rm.table_, cfName) == 
ColumnFamilyType.Super)
-                    rm.delete(new QueryPath(cfName, c), del.timestamp);
+                    rm.delete(new QueryPath(cfName, c), 
unthriftifyClock(del.clock));
                 else
-                    rm.delete(new QueryPath(cfName, del.super_column, c), 
del.timestamp);
+                    rm.delete(new QueryPath(cfName, del.super_column, c), 
unthriftifyClock(del.clock));
             }
         }
         else
         {
-            rm.delete(new QueryPath(cfName, del.super_column), del.timestamp);
+            rm.delete(new QueryPath(cfName, del.super_column), 
unthriftifyClock(del.clock));
         }
     }
+
+    private static IClock unthriftifyClock(Clock clock)
+    {
+        return new TimestampClock(clock.getTimestamp());
+    }
 }
 
 class RowMutationSerializer implements ICompactSerializer<RowMutation>

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed May 
26 04:45:44 2010
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.security.MessageDigest;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.IClock.ClockRelationship;
 import org.apache.cassandra.utils.FBUtilities;
 
 
@@ -38,27 +41,28 @@ public class SuperColumn implements ICol
 {
        private static Logger logger_ = 
LoggerFactory.getLogger(SuperColumn.class);
 
-    public static SuperColumnSerializer serializer(AbstractType comparator)
+    public static SuperColumnSerializer serializer(AbstractType comparator, 
ClockType clockType)
     {
-        return new SuperColumnSerializer(comparator);
+        return new SuperColumnSerializer(comparator, clockType);
     }
 
     private byte[] name_;
     private ConcurrentSkipListMap<byte[], IColumn> columns_;
     private AtomicInteger localDeletionTime = new 
AtomicInteger(Integer.MIN_VALUE);
-       private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
+    private AtomicReference<IClock> markedForDeleteAt;
 
-    public SuperColumn(byte[] name, AbstractType comparator)
+    public SuperColumn(byte[] name, AbstractType comparator, ClockType 
clockType)
     {
-        this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator));
+        this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), 
clockType);
     }
 
-    private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> 
columns)
+    private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> 
columns, ClockType clockType)
     {
         assert name != null;
         assert name.length <= IColumn.MAX_NAME_LENGTH;
        name_ = name;
         columns_ = columns;
+        markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock());
     }
 
     public AbstractType getComparator()
@@ -68,21 +72,24 @@ public class SuperColumn implements ICol
 
     public SuperColumn cloneMeShallow()
     {
-        SuperColumn sc = new SuperColumn(name_, getComparator());
-        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+        IClock _markedForDeleteAt = markedForDeleteAt.get();
+        SuperColumn sc = new SuperColumn(name_, getComparator(), 
_markedForDeleteAt.type());
+        sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
         return sc;
     }
 
     public IColumn cloneMe()
     {
-        SuperColumn sc = new SuperColumn(name_, new 
ConcurrentSkipListMap<byte[], IColumn>(columns_));
-        sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
+        IClock _markedForDeleteAt = markedForDeleteAt.get();
+        SuperColumn sc = new SuperColumn(name_, new 
ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type());
+        sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
         return sc;
     }
 
        public boolean isMarkedForDelete()
        {
-               return markedForDeleteAt.get() > Long.MIN_VALUE;
+        IClock _markedForDeleteAt = markedForDeleteAt.get();
+        return 
_markedForDeleteAt.compare(_markedForDeleteAt.type().minClock()) == 
ClockRelationship.GREATER_THAN;
        }
 
     public byte[] name()
@@ -125,7 +132,8 @@ public class SuperColumn implements ICol
         * We need to keep the way we are calculating the column size in sync 
with the
         * way we are calculating the size for the column family serializer.
         */
-       return DBConstants.shortSize_ + name_.length + DBConstants.intSize_ + 
DBConstants.longSize_ + DBConstants.intSize_ + size();
+      IClock _markedForDeleteAt = markedForDeleteAt.get();
+      return DBConstants.shortSize_ + name_.length + DBConstants.intSize_ + 
_markedForDeleteAt.size() + DBConstants.intSize_ + size();
     }
 
     public void remove(byte[] columnName)
@@ -133,22 +141,22 @@ public class SuperColumn implements ICol
        columns_.remove(columnName);
     }
 
-    public long timestamp()
+    public IClock clock()
     {
        throw new UnsupportedOperationException("This operation is not 
supported for Super Columns.");
     }
 
-    public long mostRecentLiveChangeAt()
+    public IClock mostRecentLiveChangeAt()
     {
-        long max = Long.MIN_VALUE;
+        List<IClock> clocks = new LinkedList<IClock>();
         for (IColumn column : columns_.values())
         {
-            if (!column.isMarkedForDelete() && column.timestamp() > max)
+            if (!column.isMarkedForDelete())
             {
-                max = column.timestamp();
+                clocks.add(column.clock());
             }
         }
-        return max;
+        return markedForDeleteAt.get().type().minClock().getSuperset(clocks);
     }
 
     public byte[] value()
@@ -163,11 +171,13 @@ public class SuperColumn implements ICol
         IColumn oldColumn = columns_.putIfAbsent(name, column);
        if (oldColumn != null)
         {
-               while (((Column)oldColumn).comparePriority((Column)column) <= 0)
+            ClockRelationship rel = 
((Column)oldColumn).comparePriority((Column)column);
+            while (ClockRelationship.GREATER_THAN != rel)
             {
-                       if (columns_.replace(name, oldColumn, column))
+                if (columns_.replace(name, oldColumn, column))
                     break;
                 oldColumn = columns_.get(name);
+                rel = ((Column)oldColumn).comparePriority((Column)column);
             }
        }
     }
@@ -193,15 +203,17 @@ public class SuperColumn implements ICol
        return 1 + columns_.size();
     }
 
-    public long getMarkedForDeleteAt()
+    public IClock getMarkedForDeleteAt()
     {
         return markedForDeleteAt.get();
     }
 
     public IColumn diff(IColumn columnNew)
     {
-       IColumn columnDiff = new SuperColumn(columnNew.name(), 
((SuperColumn)columnNew).getComparator());
-        if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt())
+        IClock _markedForDeleteAt = markedForDeleteAt.get();
+        IColumn columnDiff = new SuperColumn(columnNew.name(), 
((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type());
+        ClockRelationship rel = 
columnNew.getMarkedForDeleteAt().compare(_markedForDeleteAt);
+        if (ClockRelationship.GREATER_THAN == rel)
         {
             
((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(), 
columnNew.getMarkedForDeleteAt());
         }
@@ -239,7 +251,8 @@ public class SuperColumn implements ICol
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
-            buffer.writeLong(markedForDeleteAt.get());
+            IClock _markedForDeleteAt = markedForDeleteAt.get();
+            _markedForDeleteAt.serialize(buffer);
         }
         catch (IOException e)
         {
@@ -259,7 +272,7 @@ public class SuperColumn implements ICol
        sb.append(comparator.getString(name_));
 
         if (isMarkedForDelete()) {
-            sb.append(" -delete at 
").append(getMarkedForDeleteAt()).append("-");
+            sb.append(" -delete at 
").append(getMarkedForDeleteAt().toString()).append("-");
         }
 
         sb.append(" [");
@@ -275,20 +288,22 @@ public class SuperColumn implements ICol
     }
 
     @Deprecated // TODO this is a hack to set initial value outside constructor
-    public void markForDeleteAt(int localDeleteTime, long timestamp)
+    public void markForDeleteAt(int localDeleteTime, IClock clock)
     {
         this.localDeletionTime.set(localDeleteTime);
-        this.markedForDeleteAt.set(timestamp);
+        this.markedForDeleteAt.set(clock);
     }
 }
 
 class SuperColumnSerializer implements ICompactSerializer2<IColumn>
 {
     private AbstractType comparator;
+    private ClockType clockType;
 
-    public SuperColumnSerializer(AbstractType comparator)
+    public SuperColumnSerializer(AbstractType comparator, ClockType clockType)
     {
         this.comparator = comparator;
+        this.clockType = clockType;
     }
 
     public AbstractType getComparator()
@@ -303,13 +318,14 @@ class SuperColumnSerializer implements I
         try
         {
             dos.writeInt(superColumn.getLocalDeletionTime());
-            dos.writeLong(superColumn.getMarkedForDeleteAt());
+            IClock _markedForDeleteAt = superColumn.getMarkedForDeleteAt();
+            clockType.serializer().serialize(_markedForDeleteAt, dos);
 
             Collection<IColumn> columns = column.getSubColumns();
             dos.writeInt(columns.size());
             for (IColumn subColumn : columns)
             {
-                Column.serializer().serialize(subColumn, dos);
+                Column.serializer(clockType).serialize(subColumn, dos);
             }
         }
         catch (IOException e)
@@ -321,19 +337,19 @@ class SuperColumnSerializer implements I
     public IColumn deserialize(DataInput dis) throws IOException
     {
         byte[] name = FBUtilities.readShortByteArray(dis);
-        SuperColumn superColumn = new SuperColumn(name, comparator);
+        SuperColumn superColumn = new SuperColumn(name, comparator, clockType);
         int localDeleteTime = dis.readInt();
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {
             throw new IOException("Invalid localDeleteTime read: " + 
localDeleteTime);
         }
-        superColumn.markForDeleteAt(localDeleteTime, dis.readLong());
+        superColumn.markForDeleteAt(localDeleteTime, 
clockType.serializer().deserialize(dis));
 
         /* read the number of columns */
         int size = dis.readInt();
         for ( int i = 0; i < size; ++i )
         {
-            IColumn subColumn = Column.serializer().deserialize(dis);
+            IColumn subColumn = Column.serializer(clockType).deserialize(dis);
             superColumn.addColumn(subColumn);
         }
         return superColumn;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed May 
26 04:45:44 2010
@@ -68,7 +68,7 @@ public class SystemTable
     {
         IPartitioner p = StorageService.getPartitioner();
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column(ep.getAddress(), 
p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+        cf.addColumn(new Column(ep.getAddress(), 
p.getTokenFactory().toByteArray(token), new 
TimestampClock(System.currentTimeMillis())));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         rm.add(cf);
         try
@@ -89,7 +89,7 @@ public class SystemTable
         assert metadata != null;
         IPartitioner p = StorageService.getPartitioner();
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column(SystemTable.TOKEN, 
p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+        cf.addColumn(new Column(SystemTable.TOKEN, 
p.getTokenFactory().toByteArray(token), new 
TimestampClock(System.currentTimeMillis())));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         rm.add(cf);
         try
@@ -145,9 +145,9 @@ public class SystemTable
 
             RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
             cf = ColumnFamily.create(Table.SYSTEM_TABLE, 
SystemTable.STATUS_CF);
-            cf.addColumn(new Column(TOKEN, 
p.getTokenFactory().toByteArray(token)));
-            cf.addColumn(new Column(GENERATION, 
FBUtilities.toByteArray(generation)));
-            cf.addColumn(new Column(CLUSTERNAME, 
DatabaseDescriptor.getClusterName().getBytes()));
+            cf.addColumn(new Column(TOKEN, 
p.getTokenFactory().toByteArray(token), TimestampClock.ZERO_VALUE));
+            cf.addColumn(new Column(GENERATION, 
FBUtilities.toByteArray(generation), TimestampClock.ZERO_VALUE));
+            cf.addColumn(new Column(CLUSTERNAME, 
DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE));
             rm.add(cf);
             rm.apply();
             metadata = new StorageMetadata(token, generation, 
DatabaseDescriptor.getClusterName().getBytes());
@@ -170,7 +170,8 @@ public class SystemTable
 
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF);
-        Column generation2 = new Column(GENERATION, 
FBUtilities.toByteArray(gen), generation.timestamp() + 1);
+        TimestampClock genClock = new 
TimestampClock(((TimestampClock)generation.clock()).timestamp() + 1);
+        Column generation2 = new Column(GENERATION, 
FBUtilities.toByteArray(gen), genClock);
         cf.addColumn(generation2);
         byte[] cname;
         if (cluster != null)
@@ -180,7 +181,7 @@ public class SystemTable
         }
         else
         {
-            Column clustername = new Column(CLUSTERNAME, 
DatabaseDescriptor.getClusterName().getBytes());
+            Column clustername = new Column(CLUSTERNAME, 
DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE);
             cf.addColumn(clustername);
             cname = DatabaseDescriptor.getClusterName().getBytes();
             logger.info("Saved ClusterName not found. Using " + 
DatabaseDescriptor.getClusterName());
@@ -204,7 +205,7 @@ public class SystemTable
     public static void setBootstrapped(boolean isBootstrapped)
     {
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column(BOOTSTRAP, new byte[] { (byte) (isBootstrapped 
? 1 : 0) }, System.currentTimeMillis()));
+        cf.addColumn(new Column(BOOTSTRAP, new byte[] { (byte) (isBootstrapped 
? 1 : 0) }, new TimestampClock(System.currentTimeMillis())));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, BOOTSTRAP_KEY);
         rm.add(cf);
         try
@@ -228,7 +229,7 @@ public class SystemTable
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY);
         long now = System.currentTimeMillis();
         for (IColumn col : cols)
-            rm.delete(new QueryPath(STATUS_CF, null, col.name()), now);
+            rm.delete(new QueryPath(STATUS_CF, null, col.name()), new 
TimestampClock(now));
         rm.apply();
     }
     
@@ -236,7 +237,7 @@ public class SystemTable
     public static void markForRemoval(CFMetaData cfm)
     {
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
-        cf.addColumn(new Column((cfm.tableName + "-" + cfm.cfName + "-" + 
cfm.cfId).getBytes(), ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis()));
+        cf.addColumn(new Column((cfm.tableName + "-" + cfm.cfName + "-" + 
cfm.cfId).getBytes(), ArrayUtils.EMPTY_BYTE_ARRAY, new 
TimestampClock(System.currentTimeMillis())));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY);
         rm.add(cf);
         try

Added: cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java?rev=948316&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java Wed 
May 26 04:45:44 2010
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.io.ICompactSerializer2;
+
+/**
+ * A simple clock composed of a timestamp.
+ * The comparison is the timestamp comparison.
+ */
+public class TimestampClock implements IClock
+{
+    private static Logger logger_ = Logger.getLogger(TimestampClock.class);
+    public static TimestampClock MIN_VALUE = new 
TimestampClock(Long.MIN_VALUE);
+    public static TimestampClock ZERO_VALUE = new TimestampClock(0);
+    public static ICompactSerializer2<IClock> SERIALIZER = new 
TimestampClockSerializer();
+
+    private final long timestamp;
+
+    public TimestampClock(long timestamp)
+    {
+        this.timestamp = timestamp;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    public ClockRelationship compare(IClock other)
+    {
+        assert other instanceof TimestampClock : "Wrong class type.";
+
+        long otherTimestamp = ((TimestampClock)other).timestamp();
+        if (timestamp > otherTimestamp)
+        {
+            return ClockRelationship.GREATER_THAN;
+        }
+        else if (timestamp == otherTimestamp)
+        {
+            return ClockRelationship.EQUAL;
+        }
+        // timestamp < otherTimestamp
+        return ClockRelationship.LESS_THAN;
+    }
+
+    public IClock getSuperset(List<IClock> otherClocks)
+    {
+        IClock max = this;
+
+        for (IClock clock : otherClocks)
+        {
+            if (clock.compare(max) == ClockRelationship.GREATER_THAN)
+            {
+                max = clock;
+            }
+        }
+
+        return max;
+    }
+
+    public int size()
+    {
+        return DBConstants.tsSize_;
+    }
+
+    public ClockType type()
+    {
+        return ClockType.Timestamp;
+    }
+
+    public void serialize(DataOutput out) throws IOException
+    {
+        SERIALIZER.serialize(this, out);
+    }
+
+    public String toString()
+    {
+        return Long.toString(timestamp);
+    }
+}
+
+class TimestampClockSerializer implements ICompactSerializer2<IClock>
+{
+    public void serialize(IClock tc, DataOutput out) throws IOException
+    {
+        out.writeLong(((TimestampClock)tc).timestamp());
+    }
+
+    public IClock deserialize(DataInput in) throws IOException
+    {
+        return new TimestampClock(in.readLong());
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java 
Wed May 26 04:45:44 2010
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.IClock.ClockRelationship;
 
 public class QueryFilter
 {
@@ -112,8 +113,8 @@ public class QueryFilter
                     // filterSuperColumn only looks at immediate parent (the 
supercolumn) when determining if a subcolumn
                     // is still live, i.e., not shadowed by the parent's 
tombstone.  so, bump it up temporarily to the tombstone
                     // time of the cf, if that is greater.
-                    long deletedAt = c.getMarkedForDeleteAt();
-                    if (returnCF.getMarkedForDeleteAt() > deletedAt)
+                    IClock deletedAt = c.getMarkedForDeleteAt();
+                    if (returnCF.getMarkedForDeleteAt().compare(deletedAt) == 
ClockRelationship.GREATER_THAN)
                         
((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), 
returnCF.getMarkedForDeleteAt());
 
                     c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
@@ -137,9 +138,9 @@ public class QueryFilter
         // the column itself must be not gc-able (it is live, or a still 
relevant tombstone, or has live subcolumns), (1)
         // and if its container is deleted, the column must be changed more 
recently than the container tombstone (2)
         // (since otherwise, the only thing repair cares about is the 
container tombstone)
-        long maxChange = column.mostRecentLiveChangeAt();
-        return (!column.isMarkedForDelete() || column.getLocalDeletionTime() > 
gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1)
-               && (!container.isMarkedForDelete() || maxChange > 
container.getMarkedForDeleteAt()); // (2)
+        IClock maxChange = column.mostRecentLiveChangeAt();
+        return (!column.isMarkedForDelete() || column.getLocalDeletionTime() > 
gcBefore || (ClockRelationship.GREATER_THAN == 
maxChange.compare(column.getMarkedForDeleteAt()))) // (1)
+               && (!container.isMarkedForDelete() || 
(ClockRelationship.GREATER_THAN == 
maxChange.compare(container.getMarkedForDeleteAt()))); // (2)
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
Wed May 26 04:45:44 2010
@@ -33,6 +33,7 @@ import com.google.common.collect.Collect
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.IClock.ClockRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 import com.google.common.base.Predicate;
@@ -147,7 +148,7 @@ public class SliceQueryFilter implements
             // only count live columns towards the `count` criteria
             if (!column.isMarkedForDelete()
                 && (!container.isMarkedForDelete()
-                    || column.mostRecentLiveChangeAt() > 
container.getMarkedForDeleteAt()))
+                    || (ClockRelationship.GREATER_THAN == 
column.mostRecentLiveChangeAt().compare(container.getMarkedForDeleteAt()))))
             {
                 liveColumns++;
             }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
Wed May 26 04:45:44 2010
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -117,13 +118,13 @@ public abstract class Migration
             long now = System.currentTimeMillis();
             byte[] buf = getBytes();
             RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, 
MIGRATIONS_KEY);
-            migration.add(new QueryPath(MIGRATIONS_CF, null, 
UUIDGen.decompose(newVersion)), buf, now);
+            migration.add(new QueryPath(MIGRATIONS_CF, null, 
UUIDGen.decompose(newVersion)), buf, new TimestampClock(now));
             migration.apply();
             
             // note that we storing this in the system table, which is not 
replicated, instead of the definitions table, which is.
             logger.debug("Applying migration " + newVersion.toString());
             migration = new RowMutation(Table.SYSTEM_TABLE, 
LAST_MIGRATION_KEY);
-            migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), 
UUIDGen.decompose(newVersion), now);
+            migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), 
UUIDGen.decompose(newVersion), new TimestampClock(now));
             migration.apply();
             
             // if we fail here, there will be schema changes in the CL that 
will get replayed *AFTER* the schema is loaded.
@@ -208,9 +209,9 @@ public abstract class Migration
         final long now = System.currentTimeMillis();
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
toBytes(versionId));
         if (remove != null)
-            rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), 
System.currentTimeMillis());
+            rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), 
new TimestampClock(System.currentTimeMillis()));
         if (add != null)
-            rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), 
KSMetaData.serialize(add), now);
+            rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), 
KSMetaData.serialize(add), new TimestampClock(now));
         
         // include all other key spaces.
         for (String tableName : DatabaseDescriptor.getNonSystemTables())
@@ -218,7 +219,7 @@ public abstract class Migration
             if (add != null && add.name.equals(tableName) || remove != null && 
remove.name.equals(tableName))
                 continue;
             KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName);
-            rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), 
KSMetaData.serialize(ksm), now);
+            rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), 
KSMetaData.serialize(ksm), new TimestampClock(now));
         }
         return rm;
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Wed May 26 04:45:44 2010
@@ -289,7 +289,8 @@ public class ColumnFamilyRecordReader ex
     private IColumn unthriftifySuper(SuperColumn super_column)
     {
         AbstractType subComparator = 
DatabaseDescriptor.getSubComparator(keyspace, cfName);
-        org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
+        ClockType clockType = DatabaseDescriptor.getClockType(keyspace, 
cfName);
+        org.apache.cassandra.db.SuperColumn sc = new 
org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, 
clockType);
         for (Column column : super_column.columns)
         {
             sc.addColumn(unthriftifySimple(column));
@@ -299,6 +300,11 @@ public class ColumnFamilyRecordReader ex
 
     private IColumn unthriftifySimple(Column column)
     {
-        return new org.apache.cassandra.db.Column(column.name, column.value, 
column.timestamp);
+        return new org.apache.cassandra.db.Column(column.name, column.value, 
unthriftifyClock(column.clock));
+    }
+
+    private static IClock unthriftifyClock(Clock clock)
+    {
+        return new 
org.apache.cassandra.db.TimestampClock(clock.getTimestamp());
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Wed May 26 04:45:44 2010
@@ -277,9 +277,10 @@ public abstract class SSTableReader exte
     public ICompactSerializer2<IColumn> getColumnSerializer()
     {
         ColumnFamilyType cfType = 
DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName());
+        ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), 
getColumnFamilyName());
         return cfType == ColumnFamilyType.Standard
-               ? Column.serializer()
-               : SuperColumn.serializer(getColumnComparator());
+               ? Column.serializer(clockType)
+               : SuperColumn.serializer(getColumnComparator(), clockType);
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Wed May 26 04:45:44 2010
@@ -137,7 +137,7 @@ public class CassandraServer implements 
             {
                 continue;
             }
-            Column thrift_column = new Column(column.name(), column.value(), 
column.timestamp());
+            Column thrift_column = new Column(column.name(), column.value(), 
thriftifyIClock(column.clock()));
             if (column instanceof ExpiringColumn)
             {
                 thrift_column.setTtl(((ExpiringColumn) 
column).getTimeToLive());
@@ -157,7 +157,7 @@ public class CassandraServer implements 
             {
                 continue;
             }
-            Column thrift_column = new Column(column.name(), column.value(), 
column.timestamp());
+            Column thrift_column = new Column(column.name(), column.value(), 
thriftifyIClock(column.clock()));
             if (column instanceof ExpiringColumn)
             {
                 thrift_column.setTtl(((ExpiringColumn) 
column).getTimeToLive());
@@ -193,6 +193,16 @@ public class CassandraServer implements 
         return thriftSuperColumns;
     }
 
+    private static Clock thriftifyIClock(IClock clock)
+    {
+        Clock thrift_clock = new Clock();
+        if (clock instanceof TimestampClock)
+        {
+            thrift_clock.setTimestamp(((TimestampClock)clock).timestamp());
+        }
+        return thrift_clock;
+    }
+
     private Map<byte[], List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> 
commands, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -344,11 +354,12 @@ public class CassandraServer implements 
         ThriftValidation.validateKey(key);
         ThriftValidation.validateColumnParent(keySpace.get(), column_parent);
         ThriftValidation.validateColumn(keySpace.get(), column_parent, column);
+        IClock cassandra_clock = ThriftValidation.validateClock(column.clock);
 
         RowMutation rm = new RowMutation(keySpace.get(), key);
         try
         {
-            rm.add(new QueryPath(column_parent.column_family, 
column_parent.super_column, column.name), column.value, column.timestamp, 
column.ttl);
+            rm.add(new QueryPath(column_parent.column_family, 
column_parent.super_column, column.name), column.value, cassandra_clock, 
column.ttl);
         }
         catch (MarshalException e)
         {
@@ -418,7 +429,7 @@ public class CassandraServer implements 
         }
     }
 
-    public void remove(byte[] key, ColumnPath column_path, long timestamp, 
ConsistencyLevel consistency_level)
+    public void remove(byte[] key, ColumnPath column_path, Clock clock, 
ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         if (logger.isDebugEnabled())
@@ -428,9 +439,11 @@ public class CassandraServer implements 
 
         ThriftValidation.validateKey(key);
         ThriftValidation.validateColumnPathOrParent(keySpace.get(), 
column_path);
-        
+
+        IClock cassandra_clock = ThriftValidation.validateClock(clock);
+
         RowMutation rm = new RowMutation(keySpace.get(), key);
-        rm.delete(new QueryPath(column_path), timestamp);
+        rm.delete(new QueryPath(column_path), cassandra_clock);
 
         doInsert(consistency_level, rm);
     }
@@ -469,6 +482,7 @@ public class CassandraServer implements 
 
             Map<String, String> columnMap = new HashMap<String, String>();
             columnMap.put("Type", columnFamilyMetaData.cfType.name());
+            columnMap.put("ClockType", columnFamilyMetaData.clockType.name());
             columnMap.put("Desc", columnFamilyMetaData.comment == null ? 
columnFamilyMetaData.pretty() : columnFamilyMetaData.comment);
             columnMap.put("CompareWith", 
columnFamilyMetaData.comparator.getClass().getName());
             if (columnFamilyMetaData.cfType == ColumnFamilyType.Super)
@@ -624,6 +638,7 @@ public class CassandraServer implements 
                         cf_def.table,
                         cf_def.name,
                         cfType,
+                        ClockType.Timestamp,
                         
DatabaseDescriptor.getComparator(cf_def.comparator_type),
                         cf_def.subcomparator_type.length() == 0 ? null : 
DatabaseDescriptor.getComparator(cf_def.subcomparator_type),
                         cf_def.comment,
@@ -733,6 +748,7 @@ public class CassandraServer implements 
                         cfDef.table,
                         cfDef.name,
                         cfType,
+                        ClockType.Timestamp,
                         
DatabaseDescriptor.getComparator(cfDef.comparator_type),
                         cfDef.subcomparator_type.length() == 0 ? null : 
DatabaseDescriptor.getComparator(cfDef.subcomparator_type),
                         cfDef.comment,

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java 
Wed May 26 04:45:44 2010
@@ -22,11 +22,14 @@ package org.apache.cassandra.thrift;
 
 import java.util.Comparator;
 import java.util.Arrays;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.db.KeyspaceNotDefinedException;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.IClock;
+import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 
@@ -210,6 +213,7 @@ public class ThriftValidation
         if (cosc.column != null)
         {
             validateTtl(cosc.column);
+            validateClock(cosc.column.clock);
             ThriftValidation.validateColumnPath(keyspace, new 
ColumnPath(cfName).setSuper_column(null).setColumn(cosc.column.name));
         }
 
@@ -218,6 +222,7 @@ public class ThriftValidation
             for (Column c : cosc.super_column.columns)
             {
                 validateTtl(c);
+                validateClock(c.clock);
                 ThriftValidation.validateColumnPath(keyspace, new 
ColumnPath(cfName).setSuper_column(cosc.super_column.name).setColumn(c.name));
             }
         }
@@ -236,6 +241,15 @@ public class ThriftValidation
         assert column.isSetTtl() || column.ttl == 0;
     }
 
+    public static IClock validateClock(Clock clock) throws 
InvalidRequestException
+    {
+        if (clock.isSetTimestamp())
+        {
+            return new TimestampClock(clock.getTimestamp());
+        }
+        throw new InvalidRequestException("Clock must have one a timestamp");
+    }
+
     public static void validateMutation(String keyspace, String cfName, 
Mutation mut)
             throws InvalidRequestException
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed 
May 26 04:45:44 2010
@@ -27,6 +27,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -95,7 +96,7 @@ public class SSTableExport
             json.append(", ");
             json.append(quote(bytesToHex(column.value())));
             json.append(", ");
-            json.append(column.timestamp());
+            json.append(((TimestampClock) column.clock()).timestamp());
             json.append(", ");
             json.append(column.isMarkedForDelete());
             json.append("]");
@@ -125,7 +126,7 @@ public class SSTableExport
                 json.append(asKey(bytesToHex(column.name())));
                 json.append("{");
                 json.append(asKey("deletedAt"));
-                json.append(column.getMarkedForDeleteAt());
+                json.append(((TimestampClock) 
column.getMarkedForDeleteAt()).timestamp());
                 json.append(", ");
                 json.append(asKey("subColumns"));
                 json.append(serializeColumns(column.getSubColumns(), 
comparator));

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Wed 
May 26 04:45:44 2010
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -64,6 +65,7 @@ public class SSTableImport
     {
         private String name;
         private String value;
+        // TODO: fix when we adding other clock type
         private long timestamp;
         private boolean isDeleted;
         
@@ -73,6 +75,7 @@ public class SSTableImport
             assert colSpec.size() == 4;
             name = (String)colSpec.get(0);
             value = (String)colSpec.get(1);
+            // TODO: fix when we adding other clock type
             timestamp = (Long)colSpec.get(2);
             isDeleted = (Boolean)colSpec.get(3);
         }
@@ -93,9 +96,9 @@ public class SSTableImport
             JsonColumn col = new JsonColumn(c);
             QueryPath path = new QueryPath(cfm.cfName, null, 
hexToBytes(col.name));
             if (col.isDeleted) {
-                cfamily.addColumn(path, hexToBytes(col.value), col.timestamp);
+                cfamily.addColumn(path, hexToBytes(col.value), new 
TimestampClock(col.timestamp));
             } else {
-                cfamily.addTombstone(path, hexToBytes(col.value), 
col.timestamp);
+                cfamily.addTombstone(path, hexToBytes(col.value), new 
TimestampClock(col.timestamp));
             }
         }
     }
@@ -123,14 +126,14 @@ public class SSTableImport
                 JsonColumn col = new JsonColumn(c);
                 QueryPath path = new QueryPath(cfm.cfName, superName, 
hexToBytes(col.name));
                 if (col.isDeleted) {
-                    cfamily.addColumn(path, hexToBytes(col.value), 
col.timestamp);
+                    cfamily.addColumn(path, hexToBytes(col.value), new 
TimestampClock(col.timestamp));
                 } else {
-                    cfamily.addTombstone(path, hexToBytes(col.value), 
col.timestamp);
+                    cfamily.addTombstone(path, hexToBytes(col.value), new 
TimestampClock(col.timestamp));
                 }
             }
             
             SuperColumn superColumn = 
(SuperColumn)cfamily.getColumn(superName);
-            
superColumn.markForDeleteAt((int)(System.currentTimeMillis()/1000), deletedAt);
+            
superColumn.markForDeleteAt((int)(System.currentTimeMillis()/1000), new 
TimestampClock(deletedAt));
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed 
May 26 04:45:44 2010
@@ -28,6 +28,7 @@ import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
@@ -39,6 +40,8 @@ import org.apache.commons.collections.it
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IClock;
+import org.apache.cassandra.db.IClock.ClockRelationship;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.thrift.TBase;
@@ -399,23 +402,48 @@ public class FBUtilities
 
     public static void atomicSetMax(AtomicInteger atomic, int i)
     {
-        int j;
         while (true)
         {
-            if ((j = atomic.getAndSet(i)) <= i)
+            int j = atomic.get();
+            if (j >= i || atomic.compareAndSet(j, i))
                 break;
-            i = j;
         }
     }
 
     public static void atomicSetMax(AtomicLong atomic, long i)
     {
-        long j;
         while (true)
         {
-            if ((j = atomic.getAndSet(i)) <= i)
+            long j = atomic.get();
+            if (j >= i || atomic.compareAndSet(j, i))
+                break;
+        }
+    }
+
+    /** 
+     * Sets an atomic clock reference to the maximum of its current value and
+     * a new value.
+     *
+     * The function is not synchronized and does not guarantee that the 
resulting
+     * reference will hold either the old or new value, but it does guarantee
+     * that it will hold a value, v, such that: v = max(oldValue, newValue, v).
+     *
+     * @param atomic the atomic reference to set
+     * @param newClock the new provided value
+     */
+    public static void atomicSetMax(AtomicReference<IClock> atomic, IClock 
newClock)
+    {
+        while (true)
+        {
+            IClock oldClock = atomic.get();
+            ClockRelationship rel = oldClock.compare(newClock);
+            if (rel == ClockRelationship.DISJOINT)
+            {
+                newClock = oldClock.getSuperset(Arrays.asList(newClock));
+            }
+            if (rel == ClockRelationship.GREATER_THAN || rel == 
ClockRelationship.EQUAL 
+                || atomic.compareAndSet(oldClock, newClock))
                 break;
-            i = j;
         }
     }
 

Modified: 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java 
(original)
+++ 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java 
Wed May 26 04:45:44 2010
@@ -85,9 +85,9 @@ public class LongCompactionSpeedTest ext
                 for (int i = 0; i < colsPerRow; i++)
                 {
                     // last sstable has highest timestamps
-                    cols[i] = Util.column(String.valueOf(i), 
String.valueOf(i), k);
+                    cols[i] = Util.column(String.valueOf(i), 
String.valueOf(i), new TimestampClock(k));
                 }
-                rows.put(key, SSTableUtils.createCF(Long.MIN_VALUE, 
Integer.MIN_VALUE, cols));
+                rows.put(key, 
SSTableUtils.createCF(ClockType.Timestamp.minClock(), Integer.MIN_VALUE, cols));
             }
             SSTableReader sstable = SSTableUtils.writeSSTable(rows);
             sstables.add(sstable);

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=948316&r1=948315&r2=948316&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Wed May 26 04:45:44 2010
@@ -46,7 +46,7 @@ class TestRpcOperations(AvroTester):
         params['column'] = dict()
         params['column']['name'] = 'c1'
         params['column']['value'] = 'v1'
-        params['column']['timestamp'] = 0
+        params['column']['clock'] = { 'timestamp' : 0 }
         params['consistency_level'] = 'ONE'
         self.client.request('insert', params)
 
@@ -74,7 +74,7 @@ class TestRpcOperations(AvroTester):
         params['column'] = dict()
         params['column']['name'] = i64(1)
         params['column']['value'] = 'v1'
-        params['column']['timestamp'] = 0
+        params['column']['clock'] = { 'timestamp' : 0 }
         params['consistency_level'] = 'ONE'
         self.client.request('insert', params)
 


Reply via email to