Repository: cassandra Updated Branches: refs/heads/trunk 5250d7ffd -> 6c14c3af7
Fix NullPointerException creating digest patch by Stefania Alborghetti; review by Benjamin Lerer for CASSANDRA-9460 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b16d8c9c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b16d8c9c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b16d8c9c Branch: refs/heads/trunk Commit: b16d8c9caa8afa0c03cd6e4077d272b0c575404b Parents: 9d44186 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Jul 15 11:55:26 2015 +0200 Committer: blerer <benjamin.le...@datastax.com> Committed: Wed Jul 15 11:59:23 2015 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ReadResponse.java | 26 ++++++++++++++++---- .../service/RangeSliceResponseResolver.java | 14 +++++------ .../apache/cassandra/service/ReadCallback.java | 1 + .../cassandra/service/RowDigestResolver.java | 10 +++++++- 4 files changed, 38 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 39022a4..6a8aa8c 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; @@ -32,22 +33,27 @@ import org.apache.cassandra.utils.ByteBufferUtil; public class ReadResponse { public static final IVersionedSerializer<ReadResponse> serializer = new ReadResponseSerializer(); + private static final AtomicReferenceFieldUpdater<ReadResponse, ByteBuffer> digestUpdater = AtomicReferenceFieldUpdater.newUpdater(ReadResponse.class, ByteBuffer.class, "digest"); private final Row row; - private final ByteBuffer digest; + private volatile ByteBuffer digest; public ReadResponse(ByteBuffer digest) { + this(null, digest); assert digest != null; - this.digest= digest; - this.row = null; } public ReadResponse(Row row) { + this(row, null); assert row != null; + } + + public ReadResponse(Row row, ByteBuffer digest) + { this.row = row; - this.digest = null; + this.digest = digest; } public Row row() @@ -60,9 +66,19 @@ public class ReadResponse return digest; } + public void setDigest(ByteBuffer digest) + { + ByteBuffer curr = this.digest; + if (!digestUpdater.compareAndSet(this, curr, digest)) + { + assert digest.equals(this.digest) : + String.format("Digest mismatch : %s vs %s", digest.array(), this.digest.array()); + } + } + public boolean isDigestQuery() { - return digest != null; + return digest != null && row == null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java index 640681b..4242481 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java @@ -50,8 +50,8 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR private final String keyspaceName; private final long timestamp; private List<InetAddress> sources; - protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>(); - public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>(); + protected final Queue<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<>(); + public final List<AsyncOneResponse> repairResults = new ArrayList<>(); public RangeSliceResponseResolver(String keyspaceName, long timestamp) { @@ -66,15 +66,15 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR public List<Row> getData() { - MessageIn<RangeSliceReply> response = responses.iterator().next(); - return response.payload.rows; + assert !responses.isEmpty(); + return responses.peek().payload.rows; } // Note: this would deserialize the response a 2nd time if getData was called first. // (this is not currently an issue since we don't do read repair for range queries.) public Iterable<Row> resolve() { - ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size()); + ArrayList<RowIterator> iters = new ArrayList<>(responses.size()); int n = 0; for (MessageIn<RangeSliceReply> response : responses) { @@ -86,7 +86,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR // TODO do we need to call close? CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); - List<Row> resolvedRows = new ArrayList<Row>(n); + List<Row> resolvedRows = new ArrayList<>(n); while (iter.hasNext()) resolvedRows.add(iter.next()); @@ -129,7 +129,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row> { - List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size()); + List<ColumnFamily> versions = new ArrayList<>(sources.size()); List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size()); DecoratedKey key; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index cf9be55..e0646a9 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -121,6 +121,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag if (n >= blockfor && resolver.isDataPresent()) { condition.signalAll(); + // kick off a background digest comparison if this is a result that (may have) arrived after // the original resolve that get() kicks off as soon as the condition is signaled if (blockfor < endpoints.size() && n == endpoints.size()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/RowDigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java index 21b16bf..7f2e17d 100644 --- a/src/java/org/apache/cassandra/service/RowDigestResolver.java +++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java @@ -41,7 +41,12 @@ public class RowDigestResolver extends AbstractRowResolver { ReadResponse result = message.payload; if (!result.isDigestQuery()) + { + if (result.digest() == null) + result.setDigest(ColumnFamily.digest(result.row().cf)); + return result.row(); + } } return null; } @@ -81,7 +86,10 @@ public class RowDigestResolver extends AbstractRowResolver { // note that this allows for multiple data replies, post-CASSANDRA-5932 data = response.row().cf; - newDigest = ColumnFamily.digest(data); + if (response.digest() == null) + message.payload.setDigest(ColumnFamily.digest(data)); + + newDigest = response.digest(); } if (digest == null)