Author: jbellis
Date: Wed Sep 22 13:41:25 2010
New Revision: 999940

URL: http://svn.apache.org/viewvc?rev=999940&view=rev
Log:
avoid allocating HashMap for each mutation
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1517

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=999940&r1=999939&r2=999940&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 22 
13:41:25 2010
@@ -56,7 +56,7 @@ public class Memtable implements Compara
     private final long creationTime;
     private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> 
columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
     private final IPartitioner partitioner;
-    private final ColumnFamilyStore cfs;
+    public final ColumnFamilyStore cfs;
 
     public Memtable(ColumnFamilyStore cfs, IPartitioner partitioner)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999940&r1=999939&r2=999940&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 
13:41:25 2010
@@ -18,37 +18,36 @@
 
 package org.apache.cassandra.db;
 
+import java.io.File;
 import java.io.IOError;
-import java.util.*;
 import java.io.IOException;
-import java.io.File;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.ICompactionInfo;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
-
-import org.apache.commons.lang.ArrayUtils;
-
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class Table
 {
     public static final String SYSTEM_TABLE = "system";
@@ -330,7 +329,7 @@ public class Table
     */
     public void apply(RowMutation mutation, Object serializedMutation, boolean 
writeCommitLog) throws IOException
     {
-        HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
+        List<Memtable> memtablesToFlush = Collections.emptyList();
 
         // write the mutation to the commitlog and memtables
         flusherLock.readLock().lock();
@@ -363,7 +362,9 @@ public class Table
                 if (mutatedIndexedColumns == null)
                 {
                     // just update the actual value, no extra synchronization
-                    applyCF(cfs, key, cf, memtablesToFlush);
+                    Memtable fullMemtable = cfs.apply(key, cf);
+                    if (fullMemtable != null)
+                        memtablesToFlush = addFullMemtable(memtablesToFlush, 
fullMemtable);
                 }
                 else
                 {
@@ -372,8 +373,11 @@ public class Table
                         ColumnFamily oldIndexedColumns = 
readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
                         ignoreObsoleteMutations(cf, cfs.metadata.reconciler, 
mutatedIndexedColumns, oldIndexedColumns);
 
-                        applyCF(cfs, key, cf, memtablesToFlush);
-                        applyIndexUpdates(mutation.key(), memtablesToFlush, 
cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
+                        Memtable fullMemtable = cfs.apply(key, cf);
+                        if (fullMemtable != null)
+                            memtablesToFlush = 
addFullMemtable(memtablesToFlush, fullMemtable);
+                        // ignore full index memtables -- we flush those when 
the "master" one is full
+                        applyIndexUpdates(mutation.key(), cf, cfs, 
mutatedIndexedColumns, oldIndexedColumns);
                     }
                 }
 
@@ -387,9 +391,18 @@ public class Table
             flusherLock.readLock().unlock();
         }
 
-        // flush memtables that got filled up.  usually mTF will be empty and 
this will be a no-op
-        for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-            entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog);
+        // flush memtables that got filled up outside the readlock 
(maybeSwitchMemtable acquires writeLock).
+        // usually mTF will be empty and this will be a no-op.
+        for (Memtable memtable : memtablesToFlush)
+            memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog);
+    }
+
+    private static List<Memtable> addFullMemtable(List<Memtable> 
memtablesToFlush, Memtable fullMemtable)
+    {
+        if (memtablesToFlush.isEmpty())
+            memtablesToFlush = new ArrayList<Memtable>(2);
+        memtablesToFlush.add(fullMemtable);
+        return memtablesToFlush;
     }
 
     private static void ignoreObsoleteMutations(ColumnFamily cf, 
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, 
ColumnFamily oldIndexedColumns)
@@ -414,13 +427,14 @@ public class Table
         return cfs.getColumnFamily(filter);
     }
 
-    private static void applyIndexUpdates(byte[] key,
-                                          HashMap<ColumnFamilyStore, Memtable> 
memtablesToFlush,
-                                          ColumnFamily cf,
-                                          ColumnFamilyStore cfs,
-                                          SortedSet<byte[]> 
mutatedIndexedColumns,
-                                          ColumnFamily oldIndexedColumns)
+    private static List<Memtable> applyIndexUpdates(byte[] key,
+                                                    ColumnFamily cf,
+                                                    ColumnFamilyStore cfs,
+                                                    SortedSet<byte[]> 
mutatedIndexedColumns,
+                                                    ColumnFamily 
oldIndexedColumns)
     {
+        List<Memtable> fullMemtables = Collections.emptyList();
+
         // add new index entries
         for (byte[] columnName : mutatedIndexedColumns)
         {
@@ -428,7 +442,9 @@ public class Table
             DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, 
column.value());
             ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
             cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, 
column.clock()));
-            applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, 
cfi, memtablesToFlush);
+            Memtable fullMemtable = 
cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi);
+            if (fullMemtable != null)
+                fullMemtables = addFullMemtable(fullMemtables, fullMemtable);
         }
 
         // remove the old index entries
@@ -442,9 +458,13 @@ public class Table
                 DecoratedKey<LocalToken> valueKey = 
cfs.getIndexKeyFor(columnName, column.value());
                 ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
                 cfi.deleteColumn(key, localDeletionTime, column.clock());
-                applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, 
cfi, memtablesToFlush);
+                Memtable fullMemtable = 
cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi);
+                if (fullMemtable != null)
+                    fullMemtables = addFullMemtable(fullMemtables, 
fullMemtable);
             }
         }
+
+        return fullMemtables;
     }
 
     public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, 
SortedSet<byte[]> columns, ReducingKeyIterator iter)
@@ -471,7 +491,7 @@ public class Table
             {
                 DecoratedKey key = iter.next();
                 logger.debug("Indexing row {} ", key);
-                HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
+                List<Memtable> memtablesToFlush = Collections.emptyList();
                 flusherLock.readLock().lock();
                 try
                 {
@@ -479,7 +499,7 @@ public class Table
                     {
                         ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
                         if (cf != null)
-                            applyIndexUpdates(key.key, memtablesToFlush, cf, 
cfs, cf.getColumnNames(), null);
+                            memtablesToFlush = applyIndexUpdates(key.key, cf, 
cfs, cf.getColumnNames(), null);
                     }
                 }
                 finally
@@ -487,8 +507,9 @@ public class Table
                     flusherLock.readLock().unlock();
                 }
 
-                for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-                    entry.getKey().maybeSwitchMemtable(entry.getValue(), 
false);
+                // during index build, we do flush index memtables separately 
from master; otherwise we could OOM
+                for (Memtable memtable : memtablesToFlush)
+                    memtable.cfs.maybeSwitchMemtable(memtable, false);
             }
 
             try
@@ -522,13 +543,6 @@ public class Table
         return indexLocks[Math.abs(Arrays.hashCode(key) % indexLocks.length)];
     }
 
-    private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, 
ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> 
memtablesToFlush)
-    {
-        Memtable memtableToFlush = cfs.apply(key, columnFamily);
-        if (memtableToFlush != null)
-            memtablesToFlush.put(cfs, memtableToFlush);
-    }
-
     public List<Future<?>> flush() throws IOException
     {
         List<Future<?>> futures = new ArrayList<Future<?>>();


Reply via email to