Repository: cassandra
Updated Branches:
  refs/heads/trunk 691d5308a -> b86705027


Introduce new append-only concurrent collection, Accumulator, and use for 
AbstractRowResolver.replies

patch by benedict; reviewed by mishail for CASSANDRA-7873


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b8670502
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b8670502
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b8670502

Branch: refs/heads/trunk
Commit: b867050270408ed6cc77c03d78d75cce9799e38e
Parents: 691d530
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Tue Sep 9 09:47:17 2014 +0700
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Tue Sep 9 09:47:17 2014 +0700

----------------------------------------------------------------------
 .../cassandra/service/AbstractReadExecutor.java |   2 +-
 .../cassandra/service/AbstractRowResolver.java  |  11 +-
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../cassandra/service/RowDataResolver.java      |   4 +-
 .../cassandra/service/RowDigestResolver.java    |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/utils/concurrent/Accumulator.java | 133 +++++++++++++++++++
 7 files changed, 145 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2c3261f..d08c63e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -66,7 +66,7 @@ public abstract class AbstractReadExecutor
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
-        resolver = new RowDigestResolver(command.ksName, command.key);
+        resolver = new RowDigestResolver(command.ksName, command.key, 
targetReplicas.size());
         handler = new ReadCallback<>(resolver, consistencyLevel, command, 
targetReplicas);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java 
b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 6db2569..f362047 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -18,9 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,20 +26,22 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.concurrent.Accumulator;
 
 public abstract class AbstractRowResolver implements 
IResponseResolver<ReadResponse, Row>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractRowResolver.class);
 
     protected final String keyspaceName;
-    // synchronizedList gives us thread-safety without the overhead of 
guaranteeing uniqueness like a Set would
-    protected final List<MessageIn<ReadResponse>> replies = 
Collections.synchronizedList(new ArrayList<MessageIn<ReadResponse>>());
+    // Accumulator gives us non-blocking thread-safety with optimal 
algorithmic constraints
+    protected final Accumulator<MessageIn<ReadResponse>> replies;
     protected final DecoratedKey key;
 
-    public AbstractRowResolver(ByteBuffer key, String keyspaceName)
+    public AbstractRowResolver(ByteBuffer key, String keyspaceName, int 
maxResponseCount)
     {
         this.key = StorageService.getPartitioner().decorateKey(key);
         this.keyspaceName = keyspaceName;
+        this.replies = new Accumulator<>(maxResponseCount);
     }
 
     public void preprocess(MessageIn<ReadResponse> message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 29eaadf..51e1818 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -185,7 +185,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
                 ReadRepairMetrics.repairedBackground.mark();
                 
                 ReadCommand readCommand = (ReadCommand) command;
-                final RowDataResolver repairResolver = new 
RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), 
readCommand.timestamp);
+                final RowDataResolver repairResolver = new 
RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), 
readCommand.timestamp, endpoints.size());
                 AsyncRepairCallback repairHandler = new 
AsyncRepairCallback(repairResolver, endpoints.size());
 
                 MessageOut<ReadCommand> message = ((ReadCommand) 
command).createMessage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java 
b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 9b3684b..6c222d5 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -41,9 +41,9 @@ public class RowDataResolver extends AbstractRowResolver
     private final IDiskAtomFilter filter;
     private final long timestamp;
 
-    public RowDataResolver(String keyspaceName, ByteBuffer key, 
IDiskAtomFilter qFilter, long timestamp)
+    public RowDataResolver(String keyspaceName, ByteBuffer key, 
IDiskAtomFilter qFilter, long timestamp, int maxResponseCount)
     {
-        super(key, keyspaceName);
+        super(key, keyspaceName, maxResponseCount);
         this.filter = qFilter;
         this.timestamp = timestamp;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java 
b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index 21b16bf..82ccc1a 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -27,9 +27,9 @@ import org.apache.cassandra.net.MessageIn;
 
 public class RowDigestResolver extends AbstractRowResolver
 {
-    public RowDigestResolver(String keyspaceName, ByteBuffer key)
+    public RowDigestResolver(String keyspaceName, ByteBuffer key, int 
maxResponseCount)
     {
-        super(key, keyspaceName);
+        super(key, keyspaceName, maxResponseCount);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ff6d89c..52910e8 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1261,7 +1261,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadRepairMetrics.repairedBlocking.mark();
 
                     // Do a full data read to resolve the correct response 
(and repair node that need be)
-                    RowDataResolver resolver = new 
RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), 
exec.command.timestamp);
+                    RowDataResolver resolver = new 
RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), 
exec.command.timestamp, exec.handler.endpoints.size());
                     ReadCallback<ReadResponse, Row> repairHandler = new 
ReadCallback<>(resolver,
                                                                                
        ConsistencyLevel.ALL,
                                                                                
        exec.getContactedReplicas().size(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java 
b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
new file mode 100644
index 0000000..3b5e5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * A simple append-only collection supporting an unbounded number of 
concurrent readers/writers,
+ * but a bounded number of items.
+ *
+ * @param <E>
+ */
+public class Accumulator<E> implements Iterable<E>
+{
+    private volatile int nextIndex;
+    private volatile int presentCount;
+    private final Object[] values;
+    private static final AtomicIntegerFieldUpdater<Accumulator> 
nextIndexUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, 
"nextIndex");
+    private static final AtomicIntegerFieldUpdater<Accumulator> 
presentCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, 
"presentCount");
+
+    public Accumulator(int size)
+    {
+        values = new Object[size];
+    }
+
+    /**
+     * Adds an item to the collection.
+     *
+     * Note it is not guaranteed to be visible on exiting the method, if 
another add was happening concurrently;
+     * it will be visible once all concurrent adds (which are non-blocking) 
complete, but it is not guaranteed
+     * that any size change occurs during the execution of any specific call.
+     *
+     * @param item add to collection
+     */
+    public void add(E item)
+    {
+        int insertPos;
+        while (true)
+        {
+            insertPos = nextIndex;
+            if (insertPos >= values.length)
+                throw new IllegalStateException();
+            if (nextIndexUpdater.compareAndSet(this, insertPos, insertPos + 1))
+                break;
+        }
+        values[insertPos] = item;
+        // we then try to increase presentCount for each consecutive value 
that is visible after the current size;
+        // this should hopefully extend past us, but if it doesn't this 
behaviour means the lagging write will fix up
+        // our state for us.
+        //
+        // we piggyback off presentCountUpdater to get volatile write 
semantics for our update to values
+        boolean volatileWrite = false;
+        while (true)
+        {
+            int cur = presentCount;
+            if (cur != insertPos && (cur == values.length || values[cur] == 
null))
+            {
+                // ensure our item has been made visible before aborting
+                if (!volatileWrite && cur < insertPos && 
!presentCountUpdater.compareAndSet(this, cur, cur))
+                {
+                    // if we fail to CAS it means an older write has 
completed, and may have not fixed us up
+                    // due to our write not being visible
+                    volatileWrite = true;
+                    continue;
+                }
+                return;
+            }
+            presentCountUpdater.compareAndSet(this, cur, cur + 1);
+            volatileWrite = true;
+        }
+    }
+
+    public boolean isEmpty()
+    {
+        return presentCount != 0;
+    }
+
+    /**
+     * @return the size of guaranteed-to-be-visible portion of the list
+     */
+    public int size()
+    {
+        return presentCount;
+    }
+
+    public Iterator<E> iterator()
+    {
+        return new Iterator<E>()
+        {
+            int p = 0;
+
+            public boolean hasNext()
+            {
+                return p < presentCount;
+            }
+
+            public E next()
+            {
+                return (E) values[p++];
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    public E get(int i)
+    {
+        // we read presentCount to guarantee a volatile read of values
+        if (i >= presentCount)
+            throw new IndexOutOfBoundsException();
+        return (E) values[i];
+    }
+}

Reply via email to