Updated Branches: refs/heads/trunk 8165af5db -> 7aa3364e0
Use Atomic*FieldUpdater to save memory. Patch by marcuse, reviewed by belliottsmith for CASSANDRA-6281. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aa3364e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aa3364e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aa3364e Branch: refs/heads/trunk Commit: 7aa3364e04b286ac7b41cfadda568df41e4e2821 Parents: 8165af5 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Jan 2 21:17:14 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Jan 2 21:17:14 2014 +0100 ---------------------------------------------------------------------- .../cassandra/db/AtomicSortedColumns.java | 56 ++++++++++---------- .../service/DatacenterWriteResponseHandler.java | 3 +- .../apache/cassandra/service/ReadCallback.java | 18 ++++--- .../cassandra/service/WriteResponseHandler.java | 12 +++-- 4 files changed, 47 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index 6e4fd01..b1f1e59 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -18,7 +18,7 @@ package org.apache.cassandra.db; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -49,7 +49,9 @@ import org.apache.cassandra.utils.Allocator; */ public class AtomicSortedColumns extends ColumnFamily { - private final AtomicReference<Holder> ref; + private volatile Holder ref; + private static final AtomicReferenceFieldUpdater<AtomicSortedColumns, Holder> refUpdater + = AtomicReferenceFieldUpdater.newUpdater(AtomicSortedColumns.class, Holder.class, "ref"); public static final ColumnFamily.Factory<AtomicSortedColumns> factory = new Factory<AtomicSortedColumns>() { @@ -67,12 +69,12 @@ public class AtomicSortedColumns extends ColumnFamily private AtomicSortedColumns(CFMetaData metadata, Holder holder) { super(metadata); - this.ref = new AtomicReference<>(holder); + this.ref = holder; } public CellNameType getComparator() { - return (CellNameType)ref.get().map.comparator(); + return (CellNameType)ref.map.comparator(); } public ColumnFamily.Factory getFactory() @@ -82,12 +84,12 @@ public class AtomicSortedColumns extends ColumnFamily public ColumnFamily cloneMe() { - return new AtomicSortedColumns(metadata, ref.get().cloneMe()); + return new AtomicSortedColumns(metadata, ref.cloneMe()); } public DeletionInfo deletionInfo() { - return ref.get().deletionInfo; + return ref.deletionInfo; } public void delete(DeletionTime delTime) @@ -108,29 +110,29 @@ public class AtomicSortedColumns extends ColumnFamily // Keeping deletion info for max markedForDeleteAt value while (true) { - Holder current = ref.get(); + Holder current = ref; DeletionInfo newDelInfo = current.deletionInfo.copy().add(info); - if (ref.compareAndSet(current, current.with(newDelInfo))) + if (refUpdater.compareAndSet(this, current, current.with(newDelInfo))) break; } } public void setDeletionInfo(DeletionInfo newInfo) { - ref.set(ref.get().with(newInfo)); + ref = ref.with(newInfo); } public void purgeTombstones(int gcBefore) { while (true) { - Holder current = ref.get(); + Holder current = ref; if (!current.deletionInfo.hasPurgeableTombstones(gcBefore)) break; DeletionInfo purgedInfo = current.deletionInfo.copy(); purgedInfo.purge(gcBefore); - if (ref.compareAndSet(current, current.with(purgedInfo))) + if (refUpdater.compareAndSet(this, current, current.with(purgedInfo))) break; } } @@ -140,11 +142,11 @@ public class AtomicSortedColumns extends ColumnFamily Holder current, modified; do { - current = ref.get(); + current = ref; modified = current.cloneMe(); modified.addColumn(cell, allocator, SecondaryIndexManager.nullUpdater); } - while (!ref.compareAndSet(current, modified)); + while (!refUpdater.compareAndSet(this, current, modified)); } public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation) @@ -177,7 +179,7 @@ public class AtomicSortedColumns extends ColumnFamily do { sizeDelta = 0; - current = ref.get(); + current = ref; DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo()); modified = new Holder(current.map.clone(), newDelInfo); @@ -194,11 +196,11 @@ public class AtomicSortedColumns extends ColumnFamily { sizeDelta += modified.addColumn(transformation.apply(cell), allocator, indexer); // bail early if we know we've been beaten - if (ref.get() != current) + if (ref != current) continue main_loop; } } - while (!ref.compareAndSet(current, modified)); + while (!refUpdater.compareAndSet(this, current, modified)); indexer.updateRowLevelIndexes(); @@ -214,11 +216,11 @@ public class AtomicSortedColumns extends ColumnFamily boolean replaced; do { - current = ref.get(); + current = ref; modified = current.cloneMe(); replaced = modified.map.replace(oldCell.name(), oldCell, newCell); } - while (!ref.compareAndSet(current, modified)); + while (!refUpdater.compareAndSet(this, current, modified)); return replaced; } @@ -227,45 +229,45 @@ public class AtomicSortedColumns extends ColumnFamily Holder current, modified; do { - current = ref.get(); + current = ref; modified = current.clear(); } - while (!ref.compareAndSet(current, modified)); + while (!refUpdater.compareAndSet(this, current, modified)); } public Cell getColumn(CellName name) { - return ref.get().map.get(name); + return ref.map.get(name); } public SortedSet<CellName> getColumnNames() { - return ref.get().map.keySet(); + return ref.map.keySet(); } public Collection<Cell> getSortedColumns() { - return ref.get().map.values(); + return ref.map.values(); } public Collection<Cell> getReverseSortedColumns() { - return ref.get().map.descendingMap().values(); + return ref.map.descendingMap().values(); } public int getColumnCount() { - return ref.get().map.size(); + return ref.map.size(); } public Iterator<Cell> iterator(ColumnSlice[] slices) { - return new ColumnSlice.NavigableMapIterator(ref.get().map, slices); + return new ColumnSlice.NavigableMapIterator(ref.map, slices); } public Iterator<Cell> reverseIterator(ColumnSlice[] slices) { - return new ColumnSlice.NavigableMapIterator(ref.get().map.descendingMap(), slices); + return new ColumnSlice.NavigableMapIterator(ref.map.descendingMap(), slices); } public boolean isInsertReversed() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 5530374..96fc96d 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -50,8 +50,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler { if (message == null || DatabaseDescriptor.getLocalDataCenter().equals(snitch.getDatacenter(message.from))) { - if (responses.decrementAndGet() == 0) - signal(); + super.response(message); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index d665242..ff6a8d4 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -21,7 +21,7 @@ import java.net.InetAddress; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -54,7 +54,9 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag final List<InetAddress> endpoints; private final IReadCommand command; private final ConsistencyLevel consistencyLevel; - private final AtomicInteger received = new AtomicInteger(0); + private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater + = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received"); + private volatile int received = 0; private final Keyspace keyspace; // TODO push this into ConsistencyLevel? /** @@ -96,10 +98,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag if (!await(command.getTimeout(), TimeUnit.MILLISECONDS)) { // Same as for writes, see AbstractWriteResponseHandler - int acks = received.get(); + int acks = received; if (resolver.isDataPresent() && acks >= blockfor) acks = blockfor - 1; - ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent()); + ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); if (logger.isDebugEnabled()) logger.debug("Read timeout: {}", ex.toString()); throw ex; @@ -112,8 +114,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag { resolver.preprocess(message); int n = waitingFor(message) - ? received.incrementAndGet() - : received.get(); + ? recievedUpdater.incrementAndGet(this) + : received; if (n >= blockfor && resolver.isDataPresent()) { condition.signalAll(); @@ -136,7 +138,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag */ public int getReceivedCount() { - return received.get(); + return received; } public void response(TMessage result) @@ -155,7 +157,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag */ protected void maybeResolveForRepair() { - if (blockfor < endpoints.size() && received.get() == endpoints.size()) + if (blockfor < endpoints.size() && received == endpoints.size()) { assert resolver.isDataPresent(); StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 826ae01..df23b19 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -21,7 +21,7 @@ import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +38,9 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); - protected final AtomicInteger responses; + protected volatile int responses; + private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater + = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); public WriteResponseHandler(Collection<InetAddress> writeEndpoints, Collection<InetAddress> pendingEndpoints, @@ -48,7 +50,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler WriteType writeType) { super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType); - responses = new AtomicInteger(totalBlockFor()); + responses = totalBlockFor(); } public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback) @@ -63,13 +65,13 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler public void response(MessageIn m) { - if (responses.decrementAndGet() == 0) + if (responsesUpdater.decrementAndGet(this) == 0) signal(); } protected int ackCount() { - return totalBlockFor() - responses.get(); + return totalBlockFor() - responses; } public boolean isLatencyForSnitch()