Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed May 26 04:45:44 2010 @@ -36,6 +36,7 @@ import org.apache.cassandra.service.*; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.Deletion; import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.config.CFMetaData; @@ -100,7 +101,7 @@ public class RowMutation void addHints(byte[] key, byte[] host) throws IOException { QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key, host); - add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis()); + add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())); } /* @@ -133,10 +134,10 @@ public class RowMutation * * param @ cf - column name as <column family>:<column> * param @ value - value associated with the column - * param @ timestamp - timestamp associated with this data. + * param @ clock - clock associated with this data. * param @ timeToLive - ttl for the column, 0 for standard (non expiring) columns */ - public void add(QueryPath path, byte[] value, long timestamp, int timeToLive) + public void add(QueryPath path, byte[] value, IClock clock, int timeToLive) { Integer id = CFMetaData.getId(table_, path.columnFamilyName); ColumnFamily columnFamily = modifications_.get(id); @@ -145,15 +146,15 @@ public class RowMutation columnFamily = ColumnFamily.create(table_, path.columnFamilyName); modifications_.put(id, columnFamily); } - columnFamily.addColumn(path, value, timestamp, timeToLive); + columnFamily.addColumn(path, value, clock, timeToLive); } - public void add(QueryPath path, byte[] value, long timestamp) + public void add(QueryPath path, byte[] value, IClock clock) { - add(path, value, timestamp, 0); + add(path, value, clock, 0); } - public void delete(QueryPath path, long timestamp) + public void delete(QueryPath path, IClock clock) { Integer id = CFMetaData.getId(table_, path.columnFamilyName); @@ -168,17 +169,18 @@ public class RowMutation if (path.superColumnName == null && path.columnName == null) { - columnFamily.delete(localDeleteTime, timestamp); + columnFamily.delete(localDeleteTime, clock); } else if (path.columnName == null) { - SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator()); - sc.markForDeleteAt(localDeleteTime, timestamp); + SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator(), + columnFamily.getClockType()); + sc.markForDeleteAt(localDeleteTime, clock); columnFamily.addColumn(sc); } else { - columnFamily.deleteColumn(path, localDeleteTime, timestamp); + columnFamily.deleteColumn(path, localDeleteTime, clock); } } @@ -247,13 +249,13 @@ public class RowMutation assert cosc.super_column != null; for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns) { - rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl); + rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, unthriftifyClock(column.clock), column.ttl); } } else { assert cosc.super_column == null; - rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl); + rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, unthriftifyClock(cosc.column.clock), cosc.column.ttl); } } } @@ -299,12 +301,12 @@ public class RowMutation { for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns) { - rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl); + rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, unthriftifyClock(column.clock), column.ttl); } } else { - rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl); + rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, unthriftifyClock(cosc.column.clock), cosc.column.ttl); } } @@ -315,16 +317,21 @@ public class RowMutation for(byte[] c : del.predicate.column_names) { if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(rm.table_, cfName) == ColumnFamilyType.Super) - rm.delete(new QueryPath(cfName, c), del.timestamp); + rm.delete(new QueryPath(cfName, c), unthriftifyClock(del.clock)); else - rm.delete(new QueryPath(cfName, del.super_column, c), del.timestamp); + rm.delete(new QueryPath(cfName, del.super_column, c), unthriftifyClock(del.clock)); } } else { - rm.delete(new QueryPath(cfName, del.super_column), del.timestamp); + rm.delete(new QueryPath(cfName, del.super_column), unthriftifyClock(del.clock)); } } + + private static IClock unthriftifyClock(Clock clock) + { + return new TimestampClock(clock.getTimestamp()); + } } class RowMutationSerializer implements ICompactSerializer<RowMutation>
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed May 26 04:45:44 2010 @@ -20,10 +20,12 @@ package org.apache.cassandra.db; import java.io.*; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ConcurrentSkipListMap; import java.security.MessageDigest; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.io.ICompactSerializer2; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.IClock.ClockRelationship; import org.apache.cassandra.utils.FBUtilities; @@ -38,27 +41,28 @@ public class SuperColumn implements ICol { private static Logger logger_ = LoggerFactory.getLogger(SuperColumn.class); - public static SuperColumnSerializer serializer(AbstractType comparator) + public static SuperColumnSerializer serializer(AbstractType comparator, ClockType clockType) { - return new SuperColumnSerializer(comparator); + return new SuperColumnSerializer(comparator, clockType); } private byte[] name_; private ConcurrentSkipListMap<byte[], IColumn> columns_; private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE); - private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE); + private AtomicReference<IClock> markedForDeleteAt; - public SuperColumn(byte[] name, AbstractType comparator) + public SuperColumn(byte[] name, AbstractType comparator, ClockType clockType) { - this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator)); + this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), clockType); } - private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns) + private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns, ClockType clockType) { assert name != null; assert name.length <= IColumn.MAX_NAME_LENGTH; name_ = name; columns_ = columns; + markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock()); } public AbstractType getComparator() @@ -68,21 +72,24 @@ public class SuperColumn implements ICol public SuperColumn cloneMeShallow() { - SuperColumn sc = new SuperColumn(name_, getComparator()); - sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get()); + IClock _markedForDeleteAt = markedForDeleteAt.get(); + SuperColumn sc = new SuperColumn(name_, getComparator(), _markedForDeleteAt.type()); + sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt); return sc; } public IColumn cloneMe() { - SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_)); - sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get()); + IClock _markedForDeleteAt = markedForDeleteAt.get(); + SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type()); + sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt); return sc; } public boolean isMarkedForDelete() { - return markedForDeleteAt.get() > Long.MIN_VALUE; + IClock _markedForDeleteAt = markedForDeleteAt.get(); + return _markedForDeleteAt.compare(_markedForDeleteAt.type().minClock()) == ClockRelationship.GREATER_THAN; } public byte[] name() @@ -125,7 +132,8 @@ public class SuperColumn implements ICol * We need to keep the way we are calculating the column size in sync with the * way we are calculating the size for the column family serializer. */ - return DBConstants.shortSize_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size(); + IClock _markedForDeleteAt = markedForDeleteAt.get(); + return DBConstants.shortSize_ + name_.length + DBConstants.intSize_ + _markedForDeleteAt.size() + DBConstants.intSize_ + size(); } public void remove(byte[] columnName) @@ -133,22 +141,22 @@ public class SuperColumn implements ICol columns_.remove(columnName); } - public long timestamp() + public IClock clock() { throw new UnsupportedOperationException("This operation is not supported for Super Columns."); } - public long mostRecentLiveChangeAt() + public IClock mostRecentLiveChangeAt() { - long max = Long.MIN_VALUE; + List<IClock> clocks = new LinkedList<IClock>(); for (IColumn column : columns_.values()) { - if (!column.isMarkedForDelete() && column.timestamp() > max) + if (!column.isMarkedForDelete()) { - max = column.timestamp(); + clocks.add(column.clock()); } } - return max; + return markedForDeleteAt.get().type().minClock().getSuperset(clocks); } public byte[] value() @@ -163,11 +171,13 @@ public class SuperColumn implements ICol IColumn oldColumn = columns_.putIfAbsent(name, column); if (oldColumn != null) { - while (((Column)oldColumn).comparePriority((Column)column) <= 0) + ClockRelationship rel = ((Column)oldColumn).comparePriority((Column)column); + while (ClockRelationship.GREATER_THAN != rel) { - if (columns_.replace(name, oldColumn, column)) + if (columns_.replace(name, oldColumn, column)) break; oldColumn = columns_.get(name); + rel = ((Column)oldColumn).comparePriority((Column)column); } } } @@ -193,15 +203,17 @@ public class SuperColumn implements ICol return 1 + columns_.size(); } - public long getMarkedForDeleteAt() + public IClock getMarkedForDeleteAt() { return markedForDeleteAt.get(); } public IColumn diff(IColumn columnNew) { - IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator()); - if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt()) + IClock _markedForDeleteAt = markedForDeleteAt.get(); + IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type()); + ClockRelationship rel = columnNew.getMarkedForDeleteAt().compare(_markedForDeleteAt); + if (ClockRelationship.GREATER_THAN == rel) { ((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt()); } @@ -239,7 +251,8 @@ public class SuperColumn implements ICol DataOutputBuffer buffer = new DataOutputBuffer(); try { - buffer.writeLong(markedForDeleteAt.get()); + IClock _markedForDeleteAt = markedForDeleteAt.get(); + _markedForDeleteAt.serialize(buffer); } catch (IOException e) { @@ -259,7 +272,7 @@ public class SuperColumn implements ICol sb.append(comparator.getString(name_)); if (isMarkedForDelete()) { - sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-"); + sb.append(" -delete at ").append(getMarkedForDeleteAt().toString()).append("-"); } sb.append(" ["); @@ -275,20 +288,22 @@ public class SuperColumn implements ICol } @Deprecated // TODO this is a hack to set initial value outside constructor - public void markForDeleteAt(int localDeleteTime, long timestamp) + public void markForDeleteAt(int localDeleteTime, IClock clock) { this.localDeletionTime.set(localDeleteTime); - this.markedForDeleteAt.set(timestamp); + this.markedForDeleteAt.set(clock); } } class SuperColumnSerializer implements ICompactSerializer2<IColumn> { private AbstractType comparator; + private ClockType clockType; - public SuperColumnSerializer(AbstractType comparator) + public SuperColumnSerializer(AbstractType comparator, ClockType clockType) { this.comparator = comparator; + this.clockType = clockType; } public AbstractType getComparator() @@ -303,13 +318,14 @@ class SuperColumnSerializer implements I try { dos.writeInt(superColumn.getLocalDeletionTime()); - dos.writeLong(superColumn.getMarkedForDeleteAt()); + IClock _markedForDeleteAt = superColumn.getMarkedForDeleteAt(); + clockType.serializer().serialize(_markedForDeleteAt, dos); Collection<IColumn> columns = column.getSubColumns(); dos.writeInt(columns.size()); for (IColumn subColumn : columns) { - Column.serializer().serialize(subColumn, dos); + Column.serializer(clockType).serialize(subColumn, dos); } } catch (IOException e) @@ -321,19 +337,19 @@ class SuperColumnSerializer implements I public IColumn deserialize(DataInput dis) throws IOException { byte[] name = FBUtilities.readShortByteArray(dis); - SuperColumn superColumn = new SuperColumn(name, comparator); + SuperColumn superColumn = new SuperColumn(name, comparator, clockType); int localDeleteTime = dis.readInt(); if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0) { throw new IOException("Invalid localDeleteTime read: " + localDeleteTime); } - superColumn.markForDeleteAt(localDeleteTime, dis.readLong()); + superColumn.markForDeleteAt(localDeleteTime, clockType.serializer().deserialize(dis)); /* read the number of columns */ int size = dis.readInt(); for ( int i = 0; i < size; ++i ) { - IColumn subColumn = Column.serializer().deserialize(dis); + IColumn subColumn = Column.serializer(clockType).deserialize(dis); superColumn.addColumn(subColumn); } return superColumn; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed May 26 04:45:44 2010 @@ -68,7 +68,7 @@ public class SystemTable { IPartitioner p = StorageService.getPartitioner(); ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF); - cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token), System.currentTimeMillis())); + cf.addColumn(new Column(ep.getAddress(), p.getTokenFactory().toByteArray(token), new TimestampClock(System.currentTimeMillis()))); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); rm.add(cf); try @@ -89,7 +89,7 @@ public class SystemTable assert metadata != null; IPartitioner p = StorageService.getPartitioner(); ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF); - cf.addColumn(new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), System.currentTimeMillis())); + cf.addColumn(new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), new TimestampClock(System.currentTimeMillis()))); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); rm.add(cf); try @@ -145,9 +145,9 @@ public class SystemTable RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF); - cf.addColumn(new Column(TOKEN, p.getTokenFactory().toByteArray(token))); - cf.addColumn(new Column(GENERATION, FBUtilities.toByteArray(generation))); - cf.addColumn(new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes())); + cf.addColumn(new Column(TOKEN, p.getTokenFactory().toByteArray(token), TimestampClock.ZERO_VALUE)); + cf.addColumn(new Column(GENERATION, FBUtilities.toByteArray(generation), TimestampClock.ZERO_VALUE)); + cf.addColumn(new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE)); rm.add(cf); rm.apply(); metadata = new StorageMetadata(token, generation, DatabaseDescriptor.getClusterName().getBytes()); @@ -170,7 +170,8 @@ public class SystemTable RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF); - Column generation2 = new Column(GENERATION, FBUtilities.toByteArray(gen), generation.timestamp() + 1); + TimestampClock genClock = new TimestampClock(((TimestampClock)generation.clock()).timestamp() + 1); + Column generation2 = new Column(GENERATION, FBUtilities.toByteArray(gen), genClock); cf.addColumn(generation2); byte[] cname; if (cluster != null) @@ -180,7 +181,7 @@ public class SystemTable } else { - Column clustername = new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes()); + Column clustername = new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE); cf.addColumn(clustername); cname = DatabaseDescriptor.getClusterName().getBytes(); logger.info("Saved ClusterName not found. Using " + DatabaseDescriptor.getClusterName()); @@ -204,7 +205,7 @@ public class SystemTable public static void setBootstrapped(boolean isBootstrapped) { ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF); - cf.addColumn(new Column(BOOTSTRAP, new byte[] { (byte) (isBootstrapped ? 1 : 0) }, System.currentTimeMillis())); + cf.addColumn(new Column(BOOTSTRAP, new byte[] { (byte) (isBootstrapped ? 1 : 0) }, new TimestampClock(System.currentTimeMillis()))); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, BOOTSTRAP_KEY); rm.add(cf); try @@ -228,7 +229,7 @@ public class SystemTable RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY); long now = System.currentTimeMillis(); for (IColumn col : cols) - rm.delete(new QueryPath(STATUS_CF, null, col.name()), now); + rm.delete(new QueryPath(STATUS_CF, null, col.name()), new TimestampClock(now)); rm.apply(); } @@ -236,7 +237,7 @@ public class SystemTable public static void markForRemoval(CFMetaData cfm) { ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF); - cf.addColumn(new Column((cfm.tableName + "-" + cfm.cfName + "-" + cfm.cfId).getBytes(), ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis())); + cf.addColumn(new Column((cfm.tableName + "-" + cfm.cfName + "-" + cfm.cfId).getBytes(), ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()))); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY); rm.add(cf); try Added: cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java?rev=948316&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java Wed May 26 04:45:44 2010 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.commons.lang.ArrayUtils; + +import org.apache.cassandra.io.ICompactSerializer2; + +/** + * A simple clock composed of a timestamp. + * The comparison is the timestamp comparison. + */ +public class TimestampClock implements IClock +{ + private static Logger logger_ = Logger.getLogger(TimestampClock.class); + public static TimestampClock MIN_VALUE = new TimestampClock(Long.MIN_VALUE); + public static TimestampClock ZERO_VALUE = new TimestampClock(0); + public static ICompactSerializer2<IClock> SERIALIZER = new TimestampClockSerializer(); + + private final long timestamp; + + public TimestampClock(long timestamp) + { + this.timestamp = timestamp; + } + + public long timestamp() + { + return timestamp; + } + + public ClockRelationship compare(IClock other) + { + assert other instanceof TimestampClock : "Wrong class type."; + + long otherTimestamp = ((TimestampClock)other).timestamp(); + if (timestamp > otherTimestamp) + { + return ClockRelationship.GREATER_THAN; + } + else if (timestamp == otherTimestamp) + { + return ClockRelationship.EQUAL; + } + // timestamp < otherTimestamp + return ClockRelationship.LESS_THAN; + } + + public IClock getSuperset(List<IClock> otherClocks) + { + IClock max = this; + + for (IClock clock : otherClocks) + { + if (clock.compare(max) == ClockRelationship.GREATER_THAN) + { + max = clock; + } + } + + return max; + } + + public int size() + { + return DBConstants.tsSize_; + } + + public ClockType type() + { + return ClockType.Timestamp; + } + + public void serialize(DataOutput out) throws IOException + { + SERIALIZER.serialize(this, out); + } + + public String toString() + { + return Long.toString(timestamp); + } +} + +class TimestampClockSerializer implements ICompactSerializer2<IClock> +{ + public void serialize(IClock tc, DataOutput out) throws IOException + { + out.writeLong(((TimestampClock)tc).timestamp()); + } + + public IClock deserialize(DataInput in) throws IOException + { + return new TimestampClock(in.readLong()); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Wed May 26 04:45:44 2010 @@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.File import org.apache.cassandra.utils.ReducingIterator; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.IClock.ClockRelationship; public class QueryFilter { @@ -112,8 +113,8 @@ public class QueryFilter // filterSuperColumn only looks at immediate parent (the supercolumn) when determining if a subcolumn // is still live, i.e., not shadowed by the parent's tombstone. so, bump it up temporarily to the tombstone // time of the cf, if that is greater. - long deletedAt = c.getMarkedForDeleteAt(); - if (returnCF.getMarkedForDeleteAt() > deletedAt) + IClock deletedAt = c.getMarkedForDeleteAt(); + if (returnCF.getMarkedForDeleteAt().compare(deletedAt) == ClockRelationship.GREATER_THAN) ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); c = filter.filterSuperColumn((SuperColumn)c, gcBefore); @@ -137,9 +138,9 @@ public class QueryFilter // the column itself must be not gc-able (it is live, or a still relevant tombstone, or has live subcolumns), (1) // and if its container is deleted, the column must be changed more recently than the container tombstone (2) // (since otherwise, the only thing repair cares about is the container tombstone) - long maxChange = column.mostRecentLiveChangeAt(); - return (!column.isMarkedForDelete() || column.getLocalDeletionTime() > gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1) - && (!container.isMarkedForDelete() || maxChange > container.getMarkedForDeleteAt()); // (2) + IClock maxChange = column.mostRecentLiveChangeAt(); + return (!column.isMarkedForDelete() || column.getLocalDeletionTime() > gcBefore || (ClockRelationship.GREATER_THAN == maxChange.compare(column.getMarkedForDeleteAt()))) // (1) + && (!container.isMarkedForDelete() || (ClockRelationship.GREATER_THAN == maxChange.compare(container.getMarkedForDeleteAt()))); // (2) } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Wed May 26 04:45:44 2010 @@ -33,6 +33,7 @@ import com.google.common.collect.Collect import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.IClock.ClockRelationship; import org.apache.cassandra.db.marshal.AbstractType; import com.google.common.base.Predicate; @@ -147,7 +148,7 @@ public class SliceQueryFilter implements // only count live columns towards the `count` criteria if (!column.isMarkedForDelete() && (!container.isMarkedForDelete() - || column.mostRecentLiveChangeAt() > container.getMarkedForDeleteAt())) + || (ClockRelationship.GREATER_THAN == column.mostRecentLiveChangeAt().compare(container.getMarkedForDeleteAt())))) { liveColumns++; } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Wed May 26 04:45:44 2010 @@ -26,6 +26,7 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.db.CompactionManager; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.TimestampClock; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryFilter; @@ -117,13 +118,13 @@ public abstract class Migration long now = System.currentTimeMillis(); byte[] buf = getBytes(); RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, MIGRATIONS_KEY); - migration.add(new QueryPath(MIGRATIONS_CF, null, UUIDGen.decompose(newVersion)), buf, now); + migration.add(new QueryPath(MIGRATIONS_CF, null, UUIDGen.decompose(newVersion)), buf, new TimestampClock(now)); migration.apply(); // note that we storing this in the system table, which is not replicated, instead of the definitions table, which is. logger.debug("Applying migration " + newVersion.toString()); migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY); - migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), UUIDGen.decompose(newVersion), now); + migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), UUIDGen.decompose(newVersion), new TimestampClock(now)); migration.apply(); // if we fail here, there will be schema changes in the CL that will get replayed *AFTER* the schema is loaded. @@ -208,9 +209,9 @@ public abstract class Migration final long now = System.currentTimeMillis(); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, toBytes(versionId)); if (remove != null) - rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), System.currentTimeMillis()); + rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), new TimestampClock(System.currentTimeMillis())); if (add != null) - rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), KSMetaData.serialize(add), now); + rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), KSMetaData.serialize(add), new TimestampClock(now)); // include all other key spaces. for (String tableName : DatabaseDescriptor.getNonSystemTables()) @@ -218,7 +219,7 @@ public abstract class Migration if (add != null && add.name.equals(tableName) || remove != null && remove.name.equals(tableName)) continue; KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName); - rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), KSMetaData.serialize(ksm), now); + rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), KSMetaData.serialize(ksm), new TimestampClock(now)); } return rm; } Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Wed May 26 04:45:44 2010 @@ -289,7 +289,8 @@ public class ColumnFamilyRecordReader ex private IColumn unthriftifySuper(SuperColumn super_column) { AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName); - org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator); + ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName); + org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType); for (Column column : super_column.columns) { sc.addColumn(unthriftifySimple(column)); @@ -299,6 +300,11 @@ public class ColumnFamilyRecordReader ex private IColumn unthriftifySimple(Column column) { - return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp); + return new org.apache.cassandra.db.Column(column.name, column.value, unthriftifyClock(column.clock)); + } + + private static IClock unthriftifyClock(Clock clock) + { + return new org.apache.cassandra.db.TimestampClock(clock.getTimestamp()); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed May 26 04:45:44 2010 @@ -277,9 +277,10 @@ public abstract class SSTableReader exte public ICompactSerializer2<IColumn> getColumnSerializer() { ColumnFamilyType cfType = DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()); + ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), getColumnFamilyName()); return cfType == ColumnFamilyType.Standard - ? Column.serializer() - : SuperColumn.serializer(getColumnComparator()); + ? Column.serializer(clockType) + : SuperColumn.serializer(getColumnComparator(), clockType); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed May 26 04:45:44 2010 @@ -137,7 +137,7 @@ public class CassandraServer implements { continue; } - Column thrift_column = new Column(column.name(), column.value(), column.timestamp()); + Column thrift_column = new Column(column.name(), column.value(), thriftifyIClock(column.clock())); if (column instanceof ExpiringColumn) { thrift_column.setTtl(((ExpiringColumn) column).getTimeToLive()); @@ -157,7 +157,7 @@ public class CassandraServer implements { continue; } - Column thrift_column = new Column(column.name(), column.value(), column.timestamp()); + Column thrift_column = new Column(column.name(), column.value(), thriftifyIClock(column.clock())); if (column instanceof ExpiringColumn) { thrift_column.setTtl(((ExpiringColumn) column).getTimeToLive()); @@ -193,6 +193,16 @@ public class CassandraServer implements return thriftSuperColumns; } + private static Clock thriftifyIClock(IClock clock) + { + Clock thrift_clock = new Clock(); + if (clock instanceof TimestampClock) + { + thrift_clock.setTimestamp(((TimestampClock)clock).timestamp()); + } + return thrift_clock; + } + private Map<byte[], List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { @@ -344,11 +354,12 @@ public class CassandraServer implements ThriftValidation.validateKey(key); ThriftValidation.validateColumnParent(keySpace.get(), column_parent); ThriftValidation.validateColumn(keySpace.get(), column_parent, column); + IClock cassandra_clock = ThriftValidation.validateClock(column.clock); RowMutation rm = new RowMutation(keySpace.get(), key); try { - rm.add(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value, column.timestamp, column.ttl); + rm.add(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value, cassandra_clock, column.ttl); } catch (MarshalException e) { @@ -418,7 +429,7 @@ public class CassandraServer implements } } - public void remove(byte[] key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level) + public void remove(byte[] key, ColumnPath column_path, Clock clock, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { if (logger.isDebugEnabled()) @@ -428,9 +439,11 @@ public class CassandraServer implements ThriftValidation.validateKey(key); ThriftValidation.validateColumnPathOrParent(keySpace.get(), column_path); - + + IClock cassandra_clock = ThriftValidation.validateClock(clock); + RowMutation rm = new RowMutation(keySpace.get(), key); - rm.delete(new QueryPath(column_path), timestamp); + rm.delete(new QueryPath(column_path), cassandra_clock); doInsert(consistency_level, rm); } @@ -469,6 +482,7 @@ public class CassandraServer implements Map<String, String> columnMap = new HashMap<String, String>(); columnMap.put("Type", columnFamilyMetaData.cfType.name()); + columnMap.put("ClockType", columnFamilyMetaData.clockType.name()); columnMap.put("Desc", columnFamilyMetaData.comment == null ? columnFamilyMetaData.pretty() : columnFamilyMetaData.comment); columnMap.put("CompareWith", columnFamilyMetaData.comparator.getClass().getName()); if (columnFamilyMetaData.cfType == ColumnFamilyType.Super) @@ -624,6 +638,7 @@ public class CassandraServer implements cf_def.table, cf_def.name, cfType, + ClockType.Timestamp, DatabaseDescriptor.getComparator(cf_def.comparator_type), cf_def.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cf_def.subcomparator_type), cf_def.comment, @@ -733,6 +748,7 @@ public class CassandraServer implements cfDef.table, cfDef.name, cfType, + ClockType.Timestamp, DatabaseDescriptor.getComparator(cfDef.comparator_type), cfDef.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cfDef.subcomparator_type), cfDef.comment, Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Wed May 26 04:45:44 2010 @@ -22,11 +22,14 @@ package org.apache.cassandra.thrift; import java.util.Comparator; import java.util.Arrays; +import org.apache.commons.lang.ArrayUtils; import org.apache.cassandra.db.KeyspaceNotDefinedException; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.IClock; +import org.apache.cassandra.db.TimestampClock; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.MarshalException; @@ -210,6 +213,7 @@ public class ThriftValidation if (cosc.column != null) { validateTtl(cosc.column); + validateClock(cosc.column.clock); ThriftValidation.validateColumnPath(keyspace, new ColumnPath(cfName).setSuper_column(null).setColumn(cosc.column.name)); } @@ -218,6 +222,7 @@ public class ThriftValidation for (Column c : cosc.super_column.columns) { validateTtl(c); + validateClock(c.clock); ThriftValidation.validateColumnPath(keyspace, new ColumnPath(cfName).setSuper_column(cosc.super_column.name).setColumn(c.name)); } } @@ -236,6 +241,15 @@ public class ThriftValidation assert column.isSetTtl() || column.ttl == 0; } + public static IClock validateClock(Clock clock) throws InvalidRequestException + { + if (clock.isSetTimestamp()) + { + return new TimestampClock(clock.getTimestamp()); + } + throw new InvalidRequestException("Clock must have one a timestamp"); + } + public static void validateMutation(String keyspace, String cfName, Mutation mut) throws InvalidRequestException { Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed May 26 04:45:44 2010 @@ -27,6 +27,7 @@ import org.apache.cassandra.config.Datab import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.TimestampClock; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.SSTable; @@ -95,7 +96,7 @@ public class SSTableExport json.append(", "); json.append(quote(bytesToHex(column.value()))); json.append(", "); - json.append(column.timestamp()); + json.append(((TimestampClock) column.clock()).timestamp()); json.append(", "); json.append(column.isMarkedForDelete()); json.append("]"); @@ -125,7 +126,7 @@ public class SSTableExport json.append(asKey(bytesToHex(column.name()))); json.append("{"); json.append(asKey("deletedAt")); - json.append(column.getMarkedForDeleteAt()); + json.append(((TimestampClock) column.getMarkedForDeleteAt()).timestamp()); json.append(", "); json.append(asKey("subColumns")); json.append(serializeColumns(column.getSubColumns(), comparator)); Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Wed May 26 04:45:44 2010 @@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.TimestampClock; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -64,6 +65,7 @@ public class SSTableImport { private String name; private String value; + // TODO: fix when we adding other clock type private long timestamp; private boolean isDeleted; @@ -73,6 +75,7 @@ public class SSTableImport assert colSpec.size() == 4; name = (String)colSpec.get(0); value = (String)colSpec.get(1); + // TODO: fix when we adding other clock type timestamp = (Long)colSpec.get(2); isDeleted = (Boolean)colSpec.get(3); } @@ -93,9 +96,9 @@ public class SSTableImport JsonColumn col = new JsonColumn(c); QueryPath path = new QueryPath(cfm.cfName, null, hexToBytes(col.name)); if (col.isDeleted) { - cfamily.addColumn(path, hexToBytes(col.value), col.timestamp); + cfamily.addColumn(path, hexToBytes(col.value), new TimestampClock(col.timestamp)); } else { - cfamily.addTombstone(path, hexToBytes(col.value), col.timestamp); + cfamily.addTombstone(path, hexToBytes(col.value), new TimestampClock(col.timestamp)); } } } @@ -123,14 +126,14 @@ public class SSTableImport JsonColumn col = new JsonColumn(c); QueryPath path = new QueryPath(cfm.cfName, superName, hexToBytes(col.name)); if (col.isDeleted) { - cfamily.addColumn(path, hexToBytes(col.value), col.timestamp); + cfamily.addColumn(path, hexToBytes(col.value), new TimestampClock(col.timestamp)); } else { - cfamily.addTombstone(path, hexToBytes(col.value), col.timestamp); + cfamily.addTombstone(path, hexToBytes(col.value), new TimestampClock(col.timestamp)); } } SuperColumn superColumn = (SuperColumn)cfamily.getColumn(superName); - superColumn.markForDeleteAt((int)(System.currentTimeMillis()/1000), deletedAt); + superColumn.markForDeleteAt((int)(System.currentTimeMillis()/1000), new TimestampClock(deletedAt)); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed May 26 04:45:44 2010 @@ -28,6 +28,7 @@ import java.security.MessageDigest; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -39,6 +40,8 @@ import org.apache.commons.collections.it import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IClock; +import org.apache.cassandra.db.IClock.ClockRelationship; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.thrift.TBase; @@ -399,23 +402,48 @@ public class FBUtilities public static void atomicSetMax(AtomicInteger atomic, int i) { - int j; while (true) { - if ((j = atomic.getAndSet(i)) <= i) + int j = atomic.get(); + if (j >= i || atomic.compareAndSet(j, i)) break; - i = j; } } public static void atomicSetMax(AtomicLong atomic, long i) { - long j; while (true) { - if ((j = atomic.getAndSet(i)) <= i) + long j = atomic.get(); + if (j >= i || atomic.compareAndSet(j, i)) + break; + } + } + + /** + * Sets an atomic clock reference to the maximum of its current value and + * a new value. + * + * The function is not synchronized and does not guarantee that the resulting + * reference will hold either the old or new value, but it does guarantee + * that it will hold a value, v, such that: v = max(oldValue, newValue, v). + * + * @param atomic the atomic reference to set + * @param newClock the new provided value + */ + public static void atomicSetMax(AtomicReference<IClock> atomic, IClock newClock) + { + while (true) + { + IClock oldClock = atomic.get(); + ClockRelationship rel = oldClock.compare(newClock); + if (rel == ClockRelationship.DISJOINT) + { + newClock = oldClock.getSuperset(Arrays.asList(newClock)); + } + if (rel == ClockRelationship.GREATER_THAN || rel == ClockRelationship.EQUAL + || atomic.compareAndSet(oldClock, newClock)) break; - i = j; } } Modified: cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java (original) +++ cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java Wed May 26 04:45:44 2010 @@ -85,9 +85,9 @@ public class LongCompactionSpeedTest ext for (int i = 0; i < colsPerRow; i++) { // last sstable has highest timestamps - cols[i] = Util.column(String.valueOf(i), String.valueOf(i), k); + cols[i] = Util.column(String.valueOf(i), String.valueOf(i), new TimestampClock(k)); } - rows.put(key, SSTableUtils.createCF(Long.MIN_VALUE, Integer.MIN_VALUE, cols)); + rows.put(key, SSTableUtils.createCF(ClockType.Timestamp.minClock(), Integer.MIN_VALUE, cols)); } SSTableReader sstable = SSTableUtils.writeSSTable(rows); sstables.add(sstable); Modified: cassandra/trunk/test/system/test_avro_server.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=948316&r1=948315&r2=948316&view=diff ============================================================================== --- cassandra/trunk/test/system/test_avro_server.py (original) +++ cassandra/trunk/test/system/test_avro_server.py Wed May 26 04:45:44 2010 @@ -46,7 +46,7 @@ class TestRpcOperations(AvroTester): params['column'] = dict() params['column']['name'] = 'c1' params['column']['value'] = 'v1' - params['column']['timestamp'] = 0 + params['column']['clock'] = { 'timestamp' : 0 } params['consistency_level'] = 'ONE' self.client.request('insert', params) @@ -74,7 +74,7 @@ class TestRpcOperations(AvroTester): params['column'] = dict() params['column']['name'] = i64(1) params['column']['value'] = 'v1' - params['column']['timestamp'] = 0 + params['column']['clock'] = { 'timestamp' : 0 } params['consistency_level'] = 'ONE' self.client.request('insert', params)