Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 c33ebcd2f -> af0cd32be


Fix NullPointerException creating digest

patch by Stefania Alborghetti; review by Benjamin Lerer for CASSANDRA-9460


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b16d8c9c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b16d8c9c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b16d8c9c

Branch: refs/heads/cassandra-2.2
Commit: b16d8c9caa8afa0c03cd6e4077d272b0c575404b
Parents: 9d44186
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Wed Jul 15 11:55:26 2015 +0200
Committer: blerer <benjamin.le...@datastax.com>
Committed: Wed Jul 15 11:59:23 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadResponse.java   | 26 ++++++++++++++++----
 .../service/RangeSliceResponseResolver.java     | 14 +++++------
 .../apache/cassandra/service/ReadCallback.java  |  1 +
 .../cassandra/service/RowDigestResolver.java    | 10 +++++++-
 4 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index 39022a4..6a8aa8c 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -32,22 +33,27 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public class ReadResponse
 {
     public static final IVersionedSerializer<ReadResponse> serializer = new 
ReadResponseSerializer();
+    private static final AtomicReferenceFieldUpdater<ReadResponse, ByteBuffer> 
digestUpdater = AtomicReferenceFieldUpdater.newUpdater(ReadResponse.class, 
ByteBuffer.class, "digest");
 
     private final Row row;
-    private final ByteBuffer digest;
+    private volatile ByteBuffer digest;
 
     public ReadResponse(ByteBuffer digest)
     {
+        this(null, digest);
         assert digest != null;
-        this.digest= digest;
-        this.row = null;
     }
 
     public ReadResponse(Row row)
     {
+        this(row, null);
         assert row != null;
+    }
+
+    public ReadResponse(Row row, ByteBuffer digest)
+    {
         this.row = row;
-        this.digest = null;
+        this.digest = digest;
     }
 
     public Row row()
@@ -60,9 +66,19 @@ public class ReadResponse
         return digest;
     }
 
+    public void setDigest(ByteBuffer digest)
+    {
+        ByteBuffer curr = this.digest;
+        if (!digestUpdater.compareAndSet(this, curr, digest))
+        {
+            assert digest.equals(this.digest) :
+                String.format("Digest mismatch : %s vs %s", digest.array(), 
this.digest.array());
+        }
+    }
+
     public boolean isDigestQuery()
     {
-        return digest != null;
+        return digest != null && row == null;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java 
b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 640681b..4242481 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -50,8 +50,8 @@ public class RangeSliceResponseResolver implements 
IResponseResolver<RangeSliceR
     private final String keyspaceName;
     private final long timestamp;
     private List<InetAddress> sources;
-    protected final Collection<MessageIn<RangeSliceReply>> responses = new 
ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
-    public final List<AsyncOneResponse> repairResults = new 
ArrayList<AsyncOneResponse>();
+    protected final Queue<MessageIn<RangeSliceReply>> responses = new 
ConcurrentLinkedQueue<>();
+    public final List<AsyncOneResponse> repairResults = new ArrayList<>();
 
     public RangeSliceResponseResolver(String keyspaceName, long timestamp)
     {
@@ -66,15 +66,15 @@ public class RangeSliceResponseResolver implements 
IResponseResolver<RangeSliceR
 
     public List<Row> getData()
     {
-        MessageIn<RangeSliceReply> response = responses.iterator().next();
-        return response.payload.rows;
+        assert !responses.isEmpty();
+        return responses.peek().payload.rows;
     }
 
     // Note: this would deserialize the response a 2nd time if getData was 
called first.
     // (this is not currently an issue since we don't do read repair for range 
queries.)
     public Iterable<Row> resolve()
     {
-        ArrayList<RowIterator> iters = new 
ArrayList<RowIterator>(responses.size());
+        ArrayList<RowIterator> iters = new ArrayList<>(responses.size());
         int n = 0;
         for (MessageIn<RangeSliceReply> response : responses)
         {
@@ -86,7 +86,7 @@ public class RangeSliceResponseResolver implements 
IResponseResolver<RangeSliceR
         // TODO do we need to call close?
         CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, 
new Reducer());
 
-        List<Row> resolvedRows = new ArrayList<Row>(n);
+        List<Row> resolvedRows = new ArrayList<>(n);
         while (iter.hasNext())
             resolvedRows.add(iter.next());
 
@@ -129,7 +129,7 @@ public class RangeSliceResponseResolver implements 
IResponseResolver<RangeSliceR
 
     private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, 
Row>
     {
-        List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(sources.size());
+        List<ColumnFamily> versions = new ArrayList<>(sources.size());
         List<InetAddress> versionSources = new 
ArrayList<InetAddress>(sources.size());
         DecoratedKey key;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index cf9be55..e0646a9 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -121,6 +121,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
         if (n >= blockfor && resolver.isDataPresent())
         {
             condition.signalAll();
+
             // kick off a background digest comparison if this is a result 
that (may have) arrived after
             // the original resolve that get() kicks off as soon as the 
condition is signaled
             if (blockfor < endpoints.size() && n == endpoints.size())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b16d8c9c/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java 
b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index 21b16bf..7f2e17d 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -41,7 +41,12 @@ public class RowDigestResolver extends AbstractRowResolver
         {
             ReadResponse result = message.payload;
             if (!result.isDigestQuery())
+            {
+                if (result.digest() == null)
+                    result.setDigest(ColumnFamily.digest(result.row().cf));
+
                 return result.row();
+            }
         }
         return null;
     }
@@ -81,7 +86,10 @@ public class RowDigestResolver extends AbstractRowResolver
             {
                 // note that this allows for multiple data replies, 
post-CASSANDRA-5932
                 data = response.row().cf;
-                newDigest = ColumnFamily.digest(data);
+                if (response.digest() == null)
+                    message.payload.setDigest(ColumnFamily.digest(data));
+
+                newDigest = response.digest();
             }
 
             if (digest == null)

Reply via email to