Author: jbellis
Date: Wed Feb 16 22:40:51 2011
New Revision: 1071428

URL: http://svn.apache.org/viewvc?rev=1071428&view=rev
Log:
intern column names to save old-gen heap space
patch by jbellis; reviewed by stuhood for CASSANDRA-1255

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Feb 16 
22:40:51 2011
@@ -215,9 +215,9 @@ public class Column implements IColumn
         return result;
     }
 
-    public IColumn deepCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new Column(ByteBufferUtil.clone(name), 
ByteBufferUtil.clone(value), timestamp);
+        return new Column(cfs.internOrCopy(name), ByteBufferUtil.clone(value), 
timestamp);
     }
     
     public String getString(AbstractType comparator)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Feb 
16 22:40:51 2011
@@ -37,6 +37,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.FBUtilities;
@@ -72,7 +73,7 @@ public class ColumnFamily implements ICo
     private final Integer cfid;
     private final ColumnFamilyType type;
 
-    private transient ICompactSerializer2<IColumn> columnSerializer;
+    private transient IColumnSerializer columnSerializer;
     final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
     final AtomicInteger localDeletionTime = new 
AtomicInteger(Integer.MIN_VALUE);
     private ConcurrentSkipListMap<ByteBuffer, IColumn> columns;
@@ -137,7 +138,7 @@ public class ColumnFamily implements ICo
     /**
      * FIXME: Gross.
      */
-    public ICompactSerializer2<IColumn> getColumnSerializer()
+    public IColumnSerializer getColumnSerializer()
     {
         return columnSerializer;
     }
@@ -434,14 +435,4 @@ public class ColumnFamily implements ICo
     {
         return columns.values().iterator();
     }
-
-    /**
-     * Used to force copy an existing column
-     * @param column column to copy
-     */
-    public void deepCopyColumn(IColumn column)
-    {
-        remove(column.name());
-        addColumn(column.deepCopy());
-    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java 
Wed Feb 16 22:40:51 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.db;
 
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.utils.Pair;
 
 public class ColumnFamilySerializer implements 
ICompactSerializer2<ColumnFamily>
 {
@@ -108,6 +110,11 @@ public class ColumnFamilySerializer impl
 
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
+        return deserialize(dis, false);
+    }
+
+    public ColumnFamily deserialize(DataInput dis, boolean intern) throws 
IOException
+    {
         if (!dis.readBoolean())
             return null;
 
@@ -117,16 +124,17 @@ public class ColumnFamilySerializer impl
             throw new UnserializableColumnFamilyException("Couldn't find 
cfId=" + cfId, cfId);
         ColumnFamily cf = ColumnFamily.create(cfId);
         deserializeFromSSTableNoColumns(cf, dis);
-        deserializeColumns(dis, cf);
+        deserializeColumns(dis, cf, intern);
         return cf;
     }
 
-    public void deserializeColumns(DataInput dis, ColumnFamily cf) throws 
IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean 
intern) throws IOException
     {
         int size = dis.readInt();
+        ColumnFamilyStore interner = intern ? 
Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
         for (int i = 0; i < size; ++i)
         {
-            IColumn column = cf.getColumnSerializer().deserialize(dis);
+            IColumn column = cf.getColumnSerializer().deserialize(dis, 
interner);
             cf.addColumn(column);
         }
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Feb 16 22:40:51 2011
@@ -55,6 +55,7 @@ import org.apache.cassandra.thrift.Index
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -126,6 +127,9 @@ public class ColumnFamilyStore implement
 
     public final CFMetaData metadata;
 
+    private static final int INTERN_CUTOFF = 256;
+    public final ConcurrentMap<ByteBuffer, ByteBuffer> internedNames = new 
NonBlockingHashMap<ByteBuffer, ByteBuffer>();
+
     /* These are locally held copies to be changed from the config during 
runtime */
     private volatile DefaultInteger minCompactionThreshold;
     private volatile DefaultInteger maxCompactionThreshold;
@@ -1152,47 +1156,15 @@ public class ColumnFamilyStore implement
         if ((cached = ssTables.getRowCache().get(key)) == null)
         {
             cached = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new 
QueryPath(columnFamily)), Integer.MIN_VALUE);
-
             if (cached == null)
-            {
                 return null;
-            }
 
-            /**
-             *  checking if name or value of the column don't have backing 
array
-             *  if found then removing column and storing deep copy instead
-             *  because we don't want to put such columns to the cache
-             */
+            // make a deep copy of column data so we don't keep references to 
direct buffers, which
+            // would prevent munmap post-compaction.
             for (IColumn column : cached.getSortedColumns())
             {
-                // for Super CF checking only name
-                if (cached.isSuper())
-                {
-                    // if name of the super column is DirectBuffer then 
copying whole column
-                    if (!column.name().hasArray())
-                    {
-                        cached.deepCopyColumn(column);
-                    }
-                    // checking if sub-columns also have DirectBuffer as name 
or value
-                    else
-                    {
-                        SuperColumn superColumn = (SuperColumn) column;
-
-                        for (IColumn subColumn : column.getSubColumns())
-                        {
-                            if (!subColumn.name().hasArray() || 
!subColumn.value().hasArray())
-                            {
-                                superColumn.remove(subColumn.name());
-                                superColumn.addColumn(subColumn.deepCopy());
-                            }
-                        }
-                    }
-                }
-                // for Standard checking name and value
-                else if (!column.name().hasArray() || 
!column.value().hasArray())
-                {
-                    cached.deepCopyColumn(column);
-                }
+                cached.remove(column.name());
+                cached.addColumn(column.localCopy(this));
             }
 
             // avoid keeping a permanent reference to the original key buffer
@@ -2150,4 +2122,33 @@ public class ColumnFamilyStore implement
             ssTables.getKeyCache().setCapacity(newCapacity);
         }
     }
+
+    private ByteBuffer intern(ByteBuffer name)
+    {
+        ByteBuffer internedName = internedNames.get(name);
+        if (internedName == null)
+        {
+            internedName = ByteBufferUtil.clone(name);
+            ByteBuffer concurrentName = 
internedNames.putIfAbsent(internedName, internedName);
+            if (concurrentName != null)
+                internedName = concurrentName;
+        }
+        return internedName;
+    }
+
+    public ByteBuffer internOrCopy(ByteBuffer name)
+    {
+        if (internedNames.size() >= INTERN_CUTOFF)
+            return ByteBufferUtil.clone(name);
+
+        return intern(name);
+    }
+
+    public ByteBuffer maybeIntern(ByteBuffer name)
+    {
+        if (internedNames.size() >= INTERN_CUTOFF)
+            return name;
+
+        return intern(name);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Wed 
Feb 16 22:40:51 2011
@@ -30,10 +30,11 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ColumnSerializer implements ICompactSerializer2<IColumn>
+public class ColumnSerializer implements IColumnSerializer
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ColumnSerializer.class);
     
@@ -69,9 +70,16 @@ public class ColumnSerializer implements
 
     public Column deserialize(DataInput dis) throws IOException
     {
+        return deserialize(dis, null);
+    }
+
+    public Column deserialize(DataInput dis, ColumnFamilyStore interner) 
throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
             throw new CorruptColumnException("invalid column name length " + 
name.remaining());
+        if (interner != null)
+            name = interner.maybeIntern(name);
 
         int b = dis.readUnsignedByte();
         if ((b & COUNTER_MASK) != 0)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Wed Feb 
16 22:40:51 2011
@@ -154,13 +154,9 @@ public class CounterColumn extends Colum
     }
 
     @Override
-    public IColumn deepCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterColumn(
-            ByteBufferUtil.clone(name),
-            ByteBufferUtil.clone(value),
-            timestamp,
-            timestampOfLastDelete);
+        return new CounterColumn(cfs.internOrCopy(name), 
ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Wed 
Feb 16 22:40:51 2011
@@ -171,30 +171,9 @@ public class CounterMutation implements 
         for (ColumnFamily cf_ : rowMutation.getColumnFamilies())
         {
             ColumnFamily cf = cf_.cloneMeShallow();
-            if (cf_.isSuper())
+            for (IColumn column : cf_.getColumnsMap().values())
             {
-                for (IColumn column : cf_.getSortedColumns())
-                {
-                    IColumn sc = ((SuperColumn)column).shallowCopy();
-                    for (IColumn c : column.getSubColumns())
-                    {
-                        if (c instanceof CounterUpdateColumn)
-                            sc.addColumn(((CounterUpdateColumn) 
c).asCounterColumn());
-                        else
-                            sc.addColumn(c.deepCopy());
-                    }
-                    cf.addColumn(sc);
-                }
-            }
-            else
-            {
-                for (IColumn column : cf_.getSortedColumns())
-                {
-                    if (column instanceof CounterUpdateColumn)
-                        cf.addColumn(((CounterUpdateColumn) 
column).asCounterColumn());
-                    else
-                        cf.addColumn(column.deepCopy());
-                }
+                cf.addColumn(column.localCopy(null)); // TODO fix this
             }
             rm.add(cf);
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterUpdateColumn.java 
Wed Feb 16 22:40:51 2011
@@ -42,15 +42,6 @@ public class CounterUpdateColumn extends
         super(name, value, timestamp);
     }
 
-    public CounterColumn asCounterColumn()
-    {
-        return new CounterColumn(
-                ByteBufferUtil.clone(name()),
-                CounterContext.instance().create(delta()),
-                timestamp(),
-                Long.MIN_VALUE);
-    }
-
     public long delta()
     {
         return value().getLong(value().position());
@@ -87,8 +78,11 @@ public class CounterUpdateColumn extends
     }
 
     @Override
-    public IColumn deepCopy()
+    public CounterColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterUpdateColumn(ByteBufferUtil.clone(name), 
ByteBufferUtil.clone(value), timestamp);
+        return new CounterColumn(cfs.internOrCopy(name),
+                                 CounterContext.instance().create(delta()),
+                                 timestamp(),
+                                 Long.MIN_VALUE);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java Wed Feb 
16 22:40:51 2011
@@ -66,9 +66,9 @@ public class DeletedColumn extends Colum
     }
     
     @Override
-    public IColumn deepCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new DeletedColumn(ByteBufferUtil.clone(name), 
ByteBufferUtil.clone(value), timestamp);
+        return new DeletedColumn(cfs.internOrCopy(name), 
ByteBufferUtil.clone(value), timestamp);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Wed 
Feb 16 22:40:51 2011
@@ -108,9 +108,9 @@ public class ExpiringColumn extends Colu
     }
 
     @Override
-    public IColumn deepCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new ExpiringColumn(ByteBufferUtil.clone(name), 
ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+        return new ExpiringColumn(cfs.internOrCopy(name), 
ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
     }
     
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Wed Feb 16 
22:40:51 2011
@@ -47,8 +47,9 @@ public interface IColumn
     public int getLocalDeletionTime(); // for tombstone GC, so int is 
sufficient granularity
     public String getString(AbstractType comparator);
 
-    /** clones the column, making copies of any underlying byte buffers */
-    IColumn deepCopy();
+    /** clones the column, interning column names and making copies of other 
underlying byte buffers
+     * @param cfs*/
+    IColumn localCopy(ColumnFamilyStore cfs);
 
     /**
      * For a simple column, live == !isMarkedForDelete.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Feb 
16 22:40:51 2011
@@ -315,16 +315,18 @@ public class RowMutation implements IMut
         return rm;
     }
 
-    public RowMutation deepCopy()
+    public RowMutation localCopy()
     {
         RowMutation rm = new RowMutation(table_, ByteBufferUtil.clone(key_));
 
-        for (Map.Entry<Integer, ColumnFamily> e : modifications_.entrySet())
+        Table table = Table.open(table_);
+        for (Map.Entry<Integer, ColumnFamily> entry : 
modifications_.entrySet())
         {
-            ColumnFamily cf = e.getValue().cloneMeShallow();
-            for (Map.Entry<ByteBuffer, IColumn> ce : 
e.getValue().getColumnsMap().entrySet())
-                cf.addColumn(ce.getValue().deepCopy());
-            rm.modifications_.put(e.getKey(), cf);
+            ColumnFamily cf = entry.getValue().cloneMeShallow();
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(cf.id());
+            for (Map.Entry<ByteBuffer, IColumn> ce : 
entry.getValue().getColumnsMap().entrySet())
+                cf.addColumn(ce.getValue().localCopy(cfs));
+            rm.modifications_.put(entry.getKey(), cf);
         }
 
         return rm;
@@ -359,7 +361,7 @@ public class RowMutation implements IMut
             for (int i = 0; i < size; ++i)
             {
                 Integer cfid = Integer.valueOf(dis.readInt());
-                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, 
true);
                 modifications.put(cfid, cf);
             }
             return new RowMutation(table, key, modifications);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed Feb 
16 22:40:51 2011
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.ColumnSortedMap;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -304,17 +304,19 @@ public class SuperColumn implements ICol
         return sc;
     }
     
-    public IColumn deepCopy()
+    public IColumn localCopy(ColumnFamilyStore cfs)
     {
+        // we don't try to intern supercolumn names, because if we're using 
Cassandra correctly it's almost
+        // certainly just going to pollute our interning map with unique, 
dynamic values
         SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), 
this.getComparator());
         sc.localDeletionTime = localDeletionTime;
         sc.markedForDeleteAt = markedForDeleteAt;
         
         for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet())
         {
-            sc.addColumn(c.getValue().deepCopy());
+            sc.addColumn(c.getValue().localCopy(cfs));
         }
-        
+
         return sc;
     }
 
@@ -329,7 +331,7 @@ public class SuperColumn implements ICol
     }
 }
 
-class SuperColumnSerializer implements ICompactSerializer2<IColumn>
+class SuperColumnSerializer implements IColumnSerializer
 {
     private static Logger logger = 
LoggerFactory.getLogger(SuperColumnSerializer.class);
 
@@ -369,6 +371,11 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis) throws IOException
     {
+        return deserialize(dis, null);
+    }
+
+    public IColumn deserialize(DataInput dis, ColumnFamilyStore interner) 
throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
@@ -380,7 +387,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, 
serializer, dis, size);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, 
serializer, dis, interner, size);
         SuperColumn superColumn = new SuperColumn(name, new 
ConcurrentSkipListMap<ByteBuffer,IColumn>(preSortedMap));
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Feb 16 
22:40:51 2011
@@ -148,7 +148,15 @@ public class Table
         Integer id = CFMetaData.getId(name, cfName);
         if (id == null)
             throw new IllegalArgumentException(String.format("Unknown table/cf 
pair (%s.%s)", name, cfName));
-        return columnFamilyStores.get(id);
+        return getColumnFamilyStore(id);
+    }
+
+    public ColumnFamilyStore getColumnFamilyStore(Integer id)
+    {
+        ColumnFamilyStore cfs = columnFamilyStores.get(id);
+        if (cfs == null)
+            throw new IllegalArgumentException("Unknown CF " + id);
+        return cfs;
     }
 
     /**
@@ -255,6 +263,7 @@ public class Table
     {
         name = table;
         KSMetaData ksm = DatabaseDescriptor.getKSMetaData(table);
+        assert ksm != null : "Unknown keyspace " + table;
         try
         {
             createReplicationStrategy(ksm);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 Wed Feb 16 22:40:51 2011
@@ -134,7 +134,7 @@ public class SSTableIdentityIterator imp
     {
         file.seek(columnPosition - 4); // seek to before column count int
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        ColumnFamily.serializer().deserializeColumns(file, cf);
+        ColumnFamily.serializer().deserializeColumns(file, cf, false);
         return cf;
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Wed Feb 16 22:40:51 2011
@@ -420,7 +420,7 @@ public class SSTableWriter extends SSTab
                 // deserialize CF
                 ColumnFamily cf = ColumnFamily.create(desc.ksname, 
desc.cfname);
                 ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, 
dfile);
-                ColumnFamily.serializer().deserializeColumns(dfile, cf);
+                ColumnFamily.serializer().deserializeColumns(dfile, cf, false);
                 rowSizes.add(dataSize);
                 columnCounts.add(cf.getEstimatedColumnCount());
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java 
Wed Feb 16 22:40:51 2011
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.Map.Entry;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.IColumn;
 
@@ -42,11 +43,13 @@ public class ColumnSortedMap implements 
     private DataInput dis;
     private Comparator<ByteBuffer> comparator;
     private int length;
+    private ColumnFamilyStore interner;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer 
serializer, DataInput dis, int length)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer 
serializer, DataInput dis, ColumnFamilyStore interner, int length)
     {
         this.comparator = comparator;
         this.serializer = serializer;
+        this.interner = interner;
         this.dis = dis;
         this.length = length;
     }
@@ -138,7 +141,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, length);
+        return new ColumnSet(serializer, dis, interner, length);
     }
 }
 
@@ -147,11 +150,13 @@ class ColumnSet implements Set<Map.Entry
     private ColumnSerializer serializer;
     private DataInput dis;
     private int length;
+    private ColumnFamilyStore interner;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, 
ColumnFamilyStore interner, int length)
     {
         this.serializer = serializer;
         this.dis = dis;
+        this.interner = interner;
         this.length = length;
     }
 
@@ -172,7 +177,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, length);
+        return new ColumnIterator(serializer, dis, interner, length);
     }
 
     public Object[] toArray()
@@ -226,11 +231,13 @@ class ColumnIterator implements Iterator
     private DataInput dis;
     private int length;
     private int count = 0;
+    private ColumnFamilyStore interner;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int 
length)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, 
ColumnFamilyStore interner, int length)
     {
         this.dis = dis;
         this.serializer = serializer;
+        this.interner = interner;
         this.length = length;
     }
 
@@ -239,7 +246,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis);
+            return serializer.deserialize(dis, interner);
         }
         catch (IOException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed 
Feb 16 22:40:51 2011
@@ -330,7 +330,7 @@ public class StorageProxy implements Sto
         {
             public void runMayThrow() throws IOException
             {
-                rm.deepCopy().apply();
+                rm.localCopy().apply();
                 responseHandler.response(null);
             }
         };

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1071428&r1=1071427&r2=1071428&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java 
Wed Feb 16 22:40:51 2011
@@ -32,6 +32,7 @@ import org.apache.commons.lang.ArrayUtil
 
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.*;
@@ -39,7 +40,7 @@ import org.apache.cassandra.io.util.Data
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class CounterColumnTest
+public class CounterColumnTest extends SchemaLoader
 {
     private static final CounterContext cc = new CounterContext();
 
@@ -67,7 +68,7 @@ public class CounterColumnTest
             ByteBufferUtil.bytes("x"),
             ByteBufferUtil.bytes(delta),
             1L);
-        CounterColumn column = cuc.asCounterColumn();
+        CounterColumn column = 
cuc.localCopy(Table.open("Keyspace5").getColumnFamilyStore("Counter1"));
 
         assert delta == column.total();
         assert Arrays.equals(FBUtilities.getLocalAddress().getAddress(), 
ArrayUtils.subarray(column.value().array(), 0, idLength));


Reply via email to