Updated Branches:
  refs/heads/trunk 8165af5db -> 7aa3364e0

Use Atomic*FieldUpdater to save memory.

Patch by marcuse, reviewed by belliottsmith for CASSANDRA-6281.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aa3364e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aa3364e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aa3364e

Branch: refs/heads/trunk
Commit: 7aa3364e04b286ac7b41cfadda568df41e4e2821
Parents: 8165af5
Author: Marcus Eriksson <marc...@apache.org>
Authored: Thu Jan 2 21:17:14 2014 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Jan 2 21:17:14 2014 +0100

----------------------------------------------------------------------
 .../cassandra/db/AtomicSortedColumns.java       | 56 ++++++++++----------
 .../service/DatacenterWriteResponseHandler.java |  3 +-
 .../apache/cassandra/service/ReadCallback.java  | 18 ++++---
 .../cassandra/service/WriteResponseHandler.java | 12 +++--
 4 files changed, 47 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java 
b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 6e4fd01..b1f1e59 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -18,7 +18,7 @@
 package org.apache.cassandra.db;
 
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -49,7 +49,9 @@ import org.apache.cassandra.utils.Allocator;
  */
 public class AtomicSortedColumns extends ColumnFamily
 {
-    private final AtomicReference<Holder> ref;
+    private volatile Holder ref;
+    private static final AtomicReferenceFieldUpdater<AtomicSortedColumns, 
Holder> refUpdater
+            = 
AtomicReferenceFieldUpdater.newUpdater(AtomicSortedColumns.class, Holder.class, 
"ref");
 
     public static final ColumnFamily.Factory<AtomicSortedColumns> factory = 
new Factory<AtomicSortedColumns>()
     {
@@ -67,12 +69,12 @@ public class AtomicSortedColumns extends ColumnFamily
     private AtomicSortedColumns(CFMetaData metadata, Holder holder)
     {
         super(metadata);
-        this.ref = new AtomicReference<>(holder);
+        this.ref = holder;
     }
 
     public CellNameType getComparator()
     {
-        return (CellNameType)ref.get().map.comparator();
+        return (CellNameType)ref.map.comparator();
     }
 
     public ColumnFamily.Factory getFactory()
@@ -82,12 +84,12 @@ public class AtomicSortedColumns extends ColumnFamily
 
     public ColumnFamily cloneMe()
     {
-        return new AtomicSortedColumns(metadata, ref.get().cloneMe());
+        return new AtomicSortedColumns(metadata, ref.cloneMe());
     }
 
     public DeletionInfo deletionInfo()
     {
-        return ref.get().deletionInfo;
+        return ref.deletionInfo;
     }
 
     public void delete(DeletionTime delTime)
@@ -108,29 +110,29 @@ public class AtomicSortedColumns extends ColumnFamily
         // Keeping deletion info for max markedForDeleteAt value
         while (true)
         {
-            Holder current = ref.get();
+            Holder current = ref;
             DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
-            if (ref.compareAndSet(current, current.with(newDelInfo)))
+            if (refUpdater.compareAndSet(this, current, 
current.with(newDelInfo)))
                 break;
         }
     }
 
     public void setDeletionInfo(DeletionInfo newInfo)
     {
-        ref.set(ref.get().with(newInfo));
+        ref = ref.with(newInfo);
     }
 
     public void purgeTombstones(int gcBefore)
     {
         while (true)
         {
-            Holder current = ref.get();
+            Holder current = ref;
             if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
                 break;
 
             DeletionInfo purgedInfo = current.deletionInfo.copy();
             purgedInfo.purge(gcBefore);
-            if (ref.compareAndSet(current, current.with(purgedInfo)))
+            if (refUpdater.compareAndSet(this, current, 
current.with(purgedInfo)))
                 break;
         }
     }
@@ -140,11 +142,11 @@ public class AtomicSortedColumns extends ColumnFamily
         Holder current, modified;
         do
         {
-            current = ref.get();
+            current = ref;
             modified = current.cloneMe();
             modified.addColumn(cell, allocator, 
SecondaryIndexManager.nullUpdater);
         }
-        while (!ref.compareAndSet(current, modified));
+        while (!refUpdater.compareAndSet(this, current, modified));
     }
 
     public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, 
Cell> transformation)
@@ -177,7 +179,7 @@ public class AtomicSortedColumns extends ColumnFamily
         do
         {
             sizeDelta = 0;
-            current = ref.get();
+            current = ref;
             DeletionInfo newDelInfo = 
current.deletionInfo.copy().add(cm.deletionInfo());
             modified = new Holder(current.map.clone(), newDelInfo);
 
@@ -194,11 +196,11 @@ public class AtomicSortedColumns extends ColumnFamily
             {
                 sizeDelta += modified.addColumn(transformation.apply(cell), 
allocator, indexer);
                 // bail early if we know we've been beaten
-                if (ref.get() != current)
+                if (ref != current)
                     continue main_loop;
             }
         }
-        while (!ref.compareAndSet(current, modified));
+        while (!refUpdater.compareAndSet(this, current, modified));
 
         indexer.updateRowLevelIndexes();
 
@@ -214,11 +216,11 @@ public class AtomicSortedColumns extends ColumnFamily
         boolean replaced;
         do
         {
-            current = ref.get();
+            current = ref;
             modified = current.cloneMe();
             replaced = modified.map.replace(oldCell.name(), oldCell, newCell);
         }
-        while (!ref.compareAndSet(current, modified));
+        while (!refUpdater.compareAndSet(this, current, modified));
         return replaced;
     }
 
@@ -227,45 +229,45 @@ public class AtomicSortedColumns extends ColumnFamily
         Holder current, modified;
         do
         {
-            current = ref.get();
+            current = ref;
             modified = current.clear();
         }
-        while (!ref.compareAndSet(current, modified));
+        while (!refUpdater.compareAndSet(this, current, modified));
     }
 
     public Cell getColumn(CellName name)
     {
-        return ref.get().map.get(name);
+        return ref.map.get(name);
     }
 
     public SortedSet<CellName> getColumnNames()
     {
-        return ref.get().map.keySet();
+        return ref.map.keySet();
     }
 
     public Collection<Cell> getSortedColumns()
     {
-        return ref.get().map.values();
+        return ref.map.values();
     }
 
     public Collection<Cell> getReverseSortedColumns()
     {
-        return ref.get().map.descendingMap().values();
+        return ref.map.descendingMap().values();
     }
 
     public int getColumnCount()
     {
-        return ref.get().map.size();
+        return ref.map.size();
     }
 
     public Iterator<Cell> iterator(ColumnSlice[] slices)
     {
-        return new ColumnSlice.NavigableMapIterator(ref.get().map, slices);
+        return new ColumnSlice.NavigableMapIterator(ref.map, slices);
     }
 
     public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
     {
-        return new 
ColumnSlice.NavigableMapIterator(ref.get().map.descendingMap(), slices);
+        return new ColumnSlice.NavigableMapIterator(ref.map.descendingMap(), 
slices);
     }
 
     public boolean isInsertReversed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 5530374..96fc96d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -50,8 +50,7 @@ public class DatacenterWriteResponseHandler extends 
WriteResponseHandler
     {
         if (message == null || 
DatabaseDescriptor.getLocalDataCenter().equals(snitch.getDatacenter(message.from)))
         {
-            if (responses.decrementAndGet() == 0)
-                signal();
+            super.response(message);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index d665242..ff6a8d4 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -54,7 +54,9 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
     final List<InetAddress> endpoints;
     private final IReadCommand command;
     private final ConsistencyLevel consistencyLevel;
-    private final AtomicInteger received = new AtomicInteger(0);
+    private static final AtomicIntegerFieldUpdater<ReadCallback> 
recievedUpdater
+            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, 
"received");
+    private volatile int received = 0;
     private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 
     /**
@@ -96,10 +98,10 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
         if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
         {
             // Same as for writes, see AbstractWriteResponseHandler
-            int acks = received.get();
+            int acks = received;
             if (resolver.isDataPresent() && acks >= blockfor)
                 acks = blockfor - 1;
-            ReadTimeoutException ex = new 
ReadTimeoutException(consistencyLevel, received.get(), blockfor, 
resolver.isDataPresent());
+            ReadTimeoutException ex = new 
ReadTimeoutException(consistencyLevel, received, blockfor, 
resolver.isDataPresent());
             if (logger.isDebugEnabled())
                 logger.debug("Read timeout: {}", ex.toString());
             throw ex;
@@ -112,8 +114,8 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
     {
         resolver.preprocess(message);
         int n = waitingFor(message)
-              ? received.incrementAndGet()
-              : received.get();
+              ? recievedUpdater.incrementAndGet(this)
+              : received;
         if (n >= blockfor && resolver.isDataPresent())
         {
             condition.signalAll();
@@ -136,7 +138,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
      */
     public int getReceivedCount()
     {
-        return received.get();
+        return received;
     }
 
     public void response(TMessage result)
@@ -155,7 +157,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
      */
     protected void maybeResolveForRepair()
     {
-        if (blockfor < endpoints.size() && received.get() == endpoints.size())
+        if (blockfor < endpoints.size() && received == endpoints.size())
         {
             assert resolver.isDataPresent();
             StageManager.getStage(Stage.READ_REPAIR).execute(new 
AsyncRepairRunner());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 826ae01..df23b19 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,9 @@ public class WriteResponseHandler extends 
AbstractWriteResponseHandler
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    protected final AtomicInteger responses;
+    protected volatile int responses;
+    private static final AtomicIntegerFieldUpdater<WriteResponseHandler> 
responsesUpdater
+            = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, 
"responses");
 
     public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
                                 Collection<InetAddress> pendingEndpoints,
@@ -48,7 +50,7 @@ public class WriteResponseHandler extends 
AbstractWriteResponseHandler
                                 WriteType writeType)
     {
         super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, 
callback, writeType);
-        responses = new AtomicInteger(totalBlockFor());
+        responses = totalBlockFor();
     }
 
     public WriteResponseHandler(InetAddress endpoint, WriteType writeType, 
Runnable callback)
@@ -63,13 +65,13 @@ public class WriteResponseHandler extends 
AbstractWriteResponseHandler
 
     public void response(MessageIn m)
     {
-        if (responses.decrementAndGet() == 0)
+        if (responsesUpdater.decrementAndGet(this) == 0)
             signal();
     }
 
     protected int ackCount()
     {
-        return totalBlockFor() - responses.get();
+        return totalBlockFor() - responses;
     }
 
     public boolean isLatencyForSnitch()

Reply via email to