Author: jbellis Date: Wed Sep 22 13:41:25 2010 New Revision: 999940 URL: http://svn.apache.org/viewvc?rev=999940&view=rev Log: avoid allocating HashMap for each mutation patch by jbellis; reviewed by gdusbabek for CASSANDRA-1517
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=999940&r1=999939&r2=999940&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 22 13:41:25 2010 @@ -56,7 +56,7 @@ public class Memtable implements Compara private final long creationTime; private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>(); private final IPartitioner partitioner; - private final ColumnFamilyStore cfs; + public final ColumnFamilyStore cfs; public Memtable(ColumnFamilyStore cfs, IPartitioner partitioner) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999940&r1=999939&r2=999940&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 13:41:25 2010 @@ -18,37 +18,36 @@ package org.apache.cassandra.db; +import java.io.File; import java.io.IOError; -import java.util.*; import java.io.IOException; -import java.io.File; +import java.util.*; import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.clock.AbstractReconciler; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.io.ICompactionInfo; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableDeletingReference; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; - -import org.apache.commons.lang.ArrayUtils; - import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.db.filter.*; import org.apache.cassandra.utils.FBUtilities; import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class Table { public static final String SYSTEM_TABLE = "system"; @@ -330,7 +329,7 @@ public class Table */ public void apply(RowMutation mutation, Object serializedMutation, boolean writeCommitLog) throws IOException { - HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2); + List<Memtable> memtablesToFlush = Collections.emptyList(); // write the mutation to the commitlog and memtables flusherLock.readLock().lock(); @@ -363,7 +362,9 @@ public class Table if (mutatedIndexedColumns == null) { // just update the actual value, no extra synchronization - applyCF(cfs, key, cf, memtablesToFlush); + Memtable fullMemtable = cfs.apply(key, cf); + if (fullMemtable != null) + memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); } else { @@ -372,8 +373,11 @@ public class Table ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); ignoreObsoleteMutations(cf, cfs.metadata.reconciler, mutatedIndexedColumns, oldIndexedColumns); - applyCF(cfs, key, cf, memtablesToFlush); - applyIndexUpdates(mutation.key(), memtablesToFlush, cf, cfs, mutatedIndexedColumns, oldIndexedColumns); + Memtable fullMemtable = cfs.apply(key, cf); + if (fullMemtable != null) + memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); + // ignore full index memtables -- we flush those when the "master" one is full + applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns); } } @@ -387,9 +391,18 @@ public class Table flusherLock.readLock().unlock(); } - // flush memtables that got filled up. usually mTF will be empty and this will be a no-op - for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet()) - entry.getKey().maybeSwitchMemtable(entry.getValue(), writeCommitLog); + // flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires writeLock). + // usually mTF will be empty and this will be a no-op. + for (Memtable memtable : memtablesToFlush) + memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog); + } + + private static List<Memtable> addFullMemtable(List<Memtable> memtablesToFlush, Memtable fullMemtable) + { + if (memtablesToFlush.isEmpty()) + memtablesToFlush = new ArrayList<Memtable>(2); + memtablesToFlush.add(fullMemtable); + return memtablesToFlush; } private static void ignoreObsoleteMutations(ColumnFamily cf, AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns) @@ -414,13 +427,14 @@ public class Table return cfs.getColumnFamily(filter); } - private static void applyIndexUpdates(byte[] key, - HashMap<ColumnFamilyStore, Memtable> memtablesToFlush, - ColumnFamily cf, - ColumnFamilyStore cfs, - SortedSet<byte[]> mutatedIndexedColumns, - ColumnFamily oldIndexedColumns) + private static List<Memtable> applyIndexUpdates(byte[] key, + ColumnFamily cf, + ColumnFamilyStore cfs, + SortedSet<byte[]> mutatedIndexedColumns, + ColumnFamily oldIndexedColumns) { + List<Memtable> fullMemtables = Collections.emptyList(); + // add new index entries for (byte[] columnName : mutatedIndexedColumns) { @@ -428,7 +442,9 @@ public class Table DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value()); ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName); cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, column.clock())); - applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush); + Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi); + if (fullMemtable != null) + fullMemtables = addFullMemtable(fullMemtables, fullMemtable); } // remove the old index entries @@ -442,9 +458,13 @@ public class Table DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value()); ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName); cfi.deleteColumn(key, localDeletionTime, column.clock()); - applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush); + Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi); + if (fullMemtable != null) + fullMemtables = addFullMemtable(fullMemtables, fullMemtable); } } + + return fullMemtables; } public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, ReducingKeyIterator iter) @@ -471,7 +491,7 @@ public class Table { DecoratedKey key = iter.next(); logger.debug("Indexing row {} ", key); - HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2); + List<Memtable> memtablesToFlush = Collections.emptyList(); flusherLock.readLock().lock(); try { @@ -479,7 +499,7 @@ public class Table { ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); if (cf != null) - applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, cf.getColumnNames(), null); + memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null); } } finally @@ -487,8 +507,9 @@ public class Table flusherLock.readLock().unlock(); } - for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet()) - entry.getKey().maybeSwitchMemtable(entry.getValue(), false); + // during index build, we do flush index memtables separately from master; otherwise we could OOM + for (Memtable memtable : memtablesToFlush) + memtable.cfs.maybeSwitchMemtable(memtable, false); } try @@ -522,13 +543,6 @@ public class Table return indexLocks[Math.abs(Arrays.hashCode(key) % indexLocks.length)]; } - private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> memtablesToFlush) - { - Memtable memtableToFlush = cfs.apply(key, columnFamily); - if (memtableToFlush != null) - memtablesToFlush.put(cfs, memtableToFlush); - } - public List<Future<?>> flush() throws IOException { List<Future<?>> futures = new ArrayList<Future<?>>();