Repository: cassandra Updated Branches: refs/heads/trunk 691d5308a -> b86705027
Introduce new append-only concurrent collection, Accumulator, and use for AbstractRowResolver.replies patch by benedict; reviewed by mishail for CASSANDRA-7873 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b8670502 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b8670502 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b8670502 Branch: refs/heads/trunk Commit: b867050270408ed6cc77c03d78d75cce9799e38e Parents: 691d530 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Tue Sep 9 09:47:17 2014 +0700 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Tue Sep 9 09:47:17 2014 +0700 ---------------------------------------------------------------------- .../cassandra/service/AbstractReadExecutor.java | 2 +- .../cassandra/service/AbstractRowResolver.java | 11 +- .../apache/cassandra/service/ReadCallback.java | 2 +- .../cassandra/service/RowDataResolver.java | 4 +- .../cassandra/service/RowDigestResolver.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/utils/concurrent/Accumulator.java | 133 +++++++++++++++++++ 7 files changed, 145 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 2c3261f..d08c63e 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -66,7 +66,7 @@ public abstract class AbstractReadExecutor { this.command = command; this.targetReplicas = targetReplicas; - resolver = new RowDigestResolver(command.ksName, command.key); + resolver = new RowDigestResolver(command.ksName, command.key, targetReplicas.size()); handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/AbstractRowResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java index 6db2569..f362047 100644 --- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java +++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java @@ -18,9 +18,6 @@ package org.apache.cassandra.service; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,20 +26,22 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.utils.concurrent.Accumulator; public abstract class AbstractRowResolver implements IResponseResolver<ReadResponse, Row> { protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class); protected final String keyspaceName; - // synchronizedList gives us thread-safety without the overhead of guaranteeing uniqueness like a Set would - protected final List<MessageIn<ReadResponse>> replies = Collections.synchronizedList(new ArrayList<MessageIn<ReadResponse>>()); + // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints + protected final Accumulator<MessageIn<ReadResponse>> replies; protected final DecoratedKey key; - public AbstractRowResolver(ByteBuffer key, String keyspaceName) + public AbstractRowResolver(ByteBuffer key, String keyspaceName, int maxResponseCount) { this.key = StorageService.getPartitioner().decorateKey(key); this.keyspaceName = keyspaceName; + this.replies = new Accumulator<>(maxResponseCount); } public void preprocess(MessageIn<ReadResponse> message) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 29eaadf..51e1818 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -185,7 +185,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag ReadRepairMetrics.repairedBackground.mark(); ReadCommand readCommand = (ReadCommand) command; - final RowDataResolver repairResolver = new RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), readCommand.timestamp); + final RowDataResolver repairResolver = new RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), readCommand.timestamp, endpoints.size()); AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java index 9b3684b..6c222d5 100644 --- a/src/java/org/apache/cassandra/service/RowDataResolver.java +++ b/src/java/org/apache/cassandra/service/RowDataResolver.java @@ -41,9 +41,9 @@ public class RowDataResolver extends AbstractRowResolver private final IDiskAtomFilter filter; private final long timestamp; - public RowDataResolver(String keyspaceName, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp) + public RowDataResolver(String keyspaceName, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp, int maxResponseCount) { - super(key, keyspaceName); + super(key, keyspaceName, maxResponseCount); this.filter = qFilter; this.timestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/RowDigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java index 21b16bf..82ccc1a 100644 --- a/src/java/org/apache/cassandra/service/RowDigestResolver.java +++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java @@ -27,9 +27,9 @@ import org.apache.cassandra.net.MessageIn; public class RowDigestResolver extends AbstractRowResolver { - public RowDigestResolver(String keyspaceName, ByteBuffer key) + public RowDigestResolver(String keyspaceName, ByteBuffer key, int maxResponseCount) { - super(key, keyspaceName); + super(key, keyspaceName, maxResponseCount); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ff6d89c..52910e8 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1261,7 +1261,7 @@ public class StorageProxy implements StorageProxyMBean ReadRepairMetrics.repairedBlocking.mark(); // Do a full data read to resolve the correct response (and repair node that need be) - RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp); + RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size()); ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver, ConsistencyLevel.ALL, exec.getContactedReplicas().size(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8670502/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java new file mode 100644 index 0000000..3b5e5c9 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@ -0,0 +1,133 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.utils.concurrent; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * A simple append-only collection supporting an unbounded number of concurrent readers/writers, + * but a bounded number of items. + * + * @param <E> + */ +public class Accumulator<E> implements Iterable<E> +{ + private volatile int nextIndex; + private volatile int presentCount; + private final Object[] values; + private static final AtomicIntegerFieldUpdater<Accumulator> nextIndexUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, "nextIndex"); + private static final AtomicIntegerFieldUpdater<Accumulator> presentCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Accumulator.class, "presentCount"); + + public Accumulator(int size) + { + values = new Object[size]; + } + + /** + * Adds an item to the collection. + * + * Note it is not guaranteed to be visible on exiting the method, if another add was happening concurrently; + * it will be visible once all concurrent adds (which are non-blocking) complete, but it is not guaranteed + * that any size change occurs during the execution of any specific call. + * + * @param item add to collection + */ + public void add(E item) + { + int insertPos; + while (true) + { + insertPos = nextIndex; + if (insertPos >= values.length) + throw new IllegalStateException(); + if (nextIndexUpdater.compareAndSet(this, insertPos, insertPos + 1)) + break; + } + values[insertPos] = item; + // we then try to increase presentCount for each consecutive value that is visible after the current size; + // this should hopefully extend past us, but if it doesn't this behaviour means the lagging write will fix up + // our state for us. + // + // we piggyback off presentCountUpdater to get volatile write semantics for our update to values + boolean volatileWrite = false; + while (true) + { + int cur = presentCount; + if (cur != insertPos && (cur == values.length || values[cur] == null)) + { + // ensure our item has been made visible before aborting + if (!volatileWrite && cur < insertPos && !presentCountUpdater.compareAndSet(this, cur, cur)) + { + // if we fail to CAS it means an older write has completed, and may have not fixed us up + // due to our write not being visible + volatileWrite = true; + continue; + } + return; + } + presentCountUpdater.compareAndSet(this, cur, cur + 1); + volatileWrite = true; + } + } + + public boolean isEmpty() + { + return presentCount != 0; + } + + /** + * @return the size of guaranteed-to-be-visible portion of the list + */ + public int size() + { + return presentCount; + } + + public Iterator<E> iterator() + { + return new Iterator<E>() + { + int p = 0; + + public boolean hasNext() + { + return p < presentCount; + } + + public E next() + { + return (E) values[p++]; + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + public E get(int i) + { + // we read presentCount to guarantee a volatile read of values + if (i >= presentCount) + throw new IndexOutOfBoundsException(); + return (E) values[i]; + } +}