This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3b3dcb  Separate exceptions for CAS write timeout exceptions caused 
by contention and unkown result
d3b3dcb is described below

commit d3b3dcbb353de97220a11f55391babf149410905
Author: yifan-c <yc25c...@gmail.com>
AuthorDate: Sun Oct 27 21:01:31 2019 -0700

    Separate exceptions for CAS write timeout exceptions caused by contention 
and unkown result
    
    Patch by Yifan Cai; reviewed by Alex Petrov and Dinesh Joshi for 
CASSANDRA-15350
---
 doc/native_protocol_v5.spec                        |  40 ++-
 ...xception.java => CasWriteTimeoutException.java} |  10 +-
 ...on.java => CasWriteUnknownResultException.java} |   7 +-
 .../apache/cassandra/exceptions/ExceptionCode.java |   3 +-
 .../exceptions/RequestTimeoutException.java        |   8 +
 .../exceptions/WriteTimeoutException.java          |   6 +
 .../cassandra/metrics/CASClientRequestMetrics.java |   7 +-
 .../org/apache/cassandra/service/StorageProxy.java |  32 ++-
 .../cassandra/transport/messages/ErrorMessage.java | 110 ++++++--
 .../cassandra/distributed/test/CasWriteTest.java   | 276 +++++++++++++++++++++
 .../distributed/test/DistributedTestBase.java      |   4 +-
 .../cassandra/transport/ErrorMessageTest.java      |  80 ++++++
 12 files changed, 525 insertions(+), 58 deletions(-)

diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index d1b3915..d279453 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -1120,7 +1120,7 @@ Table of Contents
     0x1003    Truncate_error: error during a truncation error.
     0x1100    Write_timeout: Timeout exception during a write request. The rest
               of the ERROR message body will be
-                <cl><received><blockfor><writeType>
+                <cl><received><blockfor><writeType><contentions>
               where:
                 <cl> is the [consistency] level of the query having triggered
                      the exception.
@@ -1144,12 +1144,14 @@ Table of Contents
                              - "BATCH_LOG": the timeout occurred during the
                                write to the batch log when a (logged) batch
                                write was requested.
-                            - "CAS": the timeout occured during the Compare 
And Set write/update.
-                            - "VIEW": the timeout occured when a write involves
-                              VIEW update and failure to acqiure local view(MV)
-                              lock for key within timeout
-                            - "CDC": the timeout occured when 
cdc_total_space_in_mb is
-                              exceeded when doing a write to data tracked by 
cdc.
+                             - "CAS": the timeout occured during the Compare 
And Set write/update.
+                             - "VIEW": the timeout occured when a write 
involves
+                               VIEW update and failure to acqiure local 
view(MV)
+                               lock for key within timeout
+                             - "CDC": the timeout occured when 
cdc_total_space_in_mb is
+                               exceeded when doing a write to data tracked by 
cdc.
+                <contentions> is a [short] that describes the number of 
contentions occured during the CAS operation.
+                              The field only presents when the <writeType> is 
"CAS".
     0x1200    Read_timeout: Timeout exception during a read request. The rest
               of the ERROR message body will be
                 <cl><received><blockfor><data_present>
@@ -1225,12 +1227,24 @@ Table of Contents
                              - "BATCH_LOG": the failure occured during the
                                write to the batch log when a (logged) batch
                                write was requested.
-                            - "CAS": the failure occured during the Compare 
And Set write/update.
-                            - "VIEW": the failure occured when a write involves
-                              VIEW update and failure to acqiure local view(MV)
-                              lock for key within timeout
-                            - "CDC": the failure occured when 
cdc_total_space_in_mb is
-                              exceeded when doing a write to data tracked by 
cdc.
+                             - "CAS": the failure occured during the Compare 
And Set write/update.
+                             - "VIEW": the failure occured when a write 
involves
+                               VIEW update and failure to acqiure local 
view(MV)
+                               lock for key within timeout
+                             - "CDC": the failure occured when 
cdc_total_space_in_mb is
+                               exceeded when doing a write to data tracked by 
cdc.
+    0x1600    CDC_WRITE_FAILURE: // todo
+    0x1700    CAS_WRITE_UNKNOWN: An exception occured due to contended Compare 
And Set write/update.
+              The CAS operation was only partially completed and the operation 
may or may not get completed by
+              the contending CAS write or SERIAL/LOCAL_SERIAL read. The rest 
of the ERROR message body will be
+                <cl><received><blockfor>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           acknowledged the request.
+                <blockfor> is an [int] representing the number of replicas 
whose
+                           acknowledgement is required to achieve <cl>.
 
     0x2000    Syntax_error: The submitted query has a syntax error.
     0x2100    Unauthorized: The logged user doesn't have the right to perform
diff --git 
a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java 
b/src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java
similarity index 69%
copy from src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
copy to src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java
index af8d42b..b134764 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.exceptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 
-public class WriteTimeoutException extends RequestTimeoutException
+public class CasWriteTimeoutException extends WriteTimeoutException
 {
-    public final WriteType writeType;
+    public final int contentions;
 
-    public WriteTimeoutException(WriteType writeType, ConsistencyLevel 
consistency, int received, int blockFor)
+    public CasWriteTimeoutException(WriteType writeType, ConsistencyLevel 
consistency, int received, int blockFor, int contentions)
     {
-        super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
-        this.writeType = writeType;
+        super(writeType, consistency, received, blockFor, String.format("CAS 
operation timed out - encountered contentions: %d", contentions));
+        this.contentions = contentions;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java 
b/src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java
similarity index 77%
copy from src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java
copy to 
src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java
index 156dce7..d5dda84 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java
+++ 
b/src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.exceptions;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 
-public class RequestTimeoutException extends RequestExecutionException
+public class CasWriteUnknownResultException extends RequestExecutionException
 {
     public final ConsistencyLevel consistency;
     public final int received;
     public final int blockFor;
 
-    protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int blockFor)
+    public CasWriteUnknownResultException(ConsistencyLevel consistency, int 
received, int blockFor)
     {
-        super(code, String.format("Operation timed out - received only %d 
responses.", received));
+        super(ExceptionCode.CAS_WRITE_UNKNOWN, String.format("CAS operation 
result is unknown - proposal accepted by %d but not a quorum.", received));
         this.consistency = consistency;
         this.received = received;
         this.blockFor = blockFor;
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java 
b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 9324110..1766951 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -43,6 +43,7 @@ public enum ExceptionCode
     FUNCTION_FAILURE    (0x1400),
     WRITE_FAILURE       (0x1500),
     CDC_WRITE_FAILURE   (0x1600),
+    CAS_WRITE_UNKNOWN   (0x1700),
 
     // 2xx: problem validating the request
     SYNTAX_ERROR    (0x2000),
@@ -60,7 +61,7 @@ public enum ExceptionCode
             valueToCode.put(code.value, code);
     }
 
-    private ExceptionCode(int value)
+    ExceptionCode(int value)
     {
         this.value = value;
     }
diff --git 
a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java 
b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java
index 156dce7..853ba2f 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java
@@ -32,4 +32,12 @@ public class RequestTimeoutException extends 
RequestExecutionException
         this.received = received;
         this.blockFor = blockFor;
     }
+
+    protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int blockFor, String msg)
+    {
+        super(code, msg);
+        this.consistency = consistency;
+        this.received = received;
+        this.blockFor = blockFor;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java 
b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
index af8d42b..4b4ce38 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
@@ -29,4 +29,10 @@ public class WriteTimeoutException extends 
RequestTimeoutException
         super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
         this.writeType = writeType;
     }
+
+    public WriteTimeoutException(WriteType writeType, ConsistencyLevel 
consistency, int received, int blockFor, String msg)
+    {
+        super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor, 
msg);
+        this.writeType = writeType;
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java 
b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
index 9884ff1..c6d3921 100644
--- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
@@ -20,20 +20,22 @@ package org.apache.cassandra.metrics;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 public class CASClientRequestMetrics extends ClientRequestMetrics
 {
     public final Histogram contention;
-
     public final Counter unfinishedCommit;
+    public final Meter unknownResult;
 
     public CASClientRequestMetrics(String scope) 
     {
         super(scope);
         contention = 
Metrics.histogram(factory.createMetricName("ContentionHistogram"), false);
-        unfinishedCommit =  
Metrics.counter(factory.createMetricName("UnfinishedCommit"));
+        unfinishedCommit = 
Metrics.counter(factory.createMetricName("UnfinishedCommit"));
+        unknownResult = 
Metrics.meter(factory.createMetricName("UnknownResult"));
     }
 
     public void release()
@@ -41,5 +43,6 @@ public class CASClientRequestMetrics extends 
ClientRequestMetrics
         super.release();
         Metrics.remove(factory.createMetricName("ContentionHistogram"));
         Metrics.remove(factory.createMetricName("UnfinishedCommit"));
+        Metrics.remove(factory.createMetricName("UnknownResult"));
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 11c72ec..9fc6b52 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -214,7 +214,7 @@ public class StorageProxy implements StorageProxyMBean
                                   ClientState state,
                                   int nowInSeconds,
                                   long queryStartNanoTime)
-    throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
+    throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException, 
CasWriteUnknownResultException
     {
         final long startTimeForMetrics = System.nanoTime();
         TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspaceName, cfName);
@@ -287,7 +287,18 @@ public class StorageProxy implements StorageProxyMBean
 
             throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
         }
-        catch (WriteTimeoutException|ReadTimeoutException e)
+        catch (CasWriteUnknownResultException e)
+        {
+            casWriteMetrics.unknownResult.mark();
+            throw e;
+        }
+        catch (WriteTimeoutException wte)
+        {
+            casWriteMetrics.timeouts.mark();
+            writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
+            throw new CasWriteTimeoutException(wte.writeType, wte.consistency, 
wte.received, wte.blockFor, contentions);
+        }
+        catch (ReadTimeoutException e)
         {
             casWriteMetrics.timeouts.mark();
             writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
@@ -299,7 +310,7 @@ public class StorageProxy implements StorageProxyMBean
             writeMetricsMap.get(consistencyForPaxos).failures.mark();
             throw e;
         }
-        catch(UnavailableException e)
+        catch (UnavailableException e)
         {
             casWriteMetrics.unavailables.mark();
             writeMetricsMap.get(consistencyForPaxos).unavailables.mark();
@@ -465,10 +476,15 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, 
ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long 
queryStartNanoTime)
-    throws WriteTimeoutException
+    /**
+     * Propose the {@param proposal} accoding to the {@param replicaPlan}.
+     * When {@param backoffIfPartial} is true, the proposer backs off when 
seeing the proposal being accepted by some but not a quorum.
+     * The result of the cooresponding CAS in uncertain as the accepted 
proposal may or may not be spread to other nodes in later rounds.
+     */
+    private static boolean proposePaxos(Commit proposal, 
ReplicaPlan.ForPaxosWrite replicaPlan, boolean backoffIfPartial, long 
queryStartNanoTime)
+    throws WriteTimeoutException, CasWriteUnknownResultException
     {
-        ProposeCallback callback = new 
ProposeCallback(replicaPlan.contacts().size(), 
replicaPlan.requiredParticipants(), !timeoutIfPartial, 
replicaPlan.consistencyLevel(), queryStartNanoTime);
+        ProposeCallback callback = new 
ProposeCallback(replicaPlan.contacts().size(), 
replicaPlan.requiredParticipants(), !backoffIfPartial, 
replicaPlan.consistencyLevel(), queryStartNanoTime);
         Message<Commit> message = Message.out(PAXOS_PROPOSE_REQ, proposal);
         for (Replica replica : replicaPlan.contacts())
         {
@@ -496,8 +512,8 @@ public class StorageProxy implements StorageProxyMBean
         if (callback.isSuccessful())
             return true;
 
-        if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, 
replicaPlan.consistencyLevel(), callback.getAcceptCount(), 
replicaPlan.requiredParticipants());
+        if (backoffIfPartial && !callback.isFullyRefused())
+            throw new 
CasWriteUnknownResultException(replicaPlan.consistencyLevel(), 
callback.getAcceptCount(), replicaPlan.requiredParticipants());
 
         return false;
     }
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7b97be4..cd2af54 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -114,20 +114,30 @@ public class ErrorMessage extends Message.Response
                     break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
-                    ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
-                    int received = body.readInt();
-                    int blockFor = body.readInt();
-                    if (code == ExceptionCode.WRITE_TIMEOUT)
                     {
-                        WriteType writeType = Enum.valueOf(WriteType.class, 
CBUtil.readString(body));
-                        te = new WriteTimeoutException(writeType, cl, 
received, blockFor);
-                    }
-                    else
-                    {
-                        byte dataPresent = body.readByte();
-                        te = new ReadTimeoutException(cl, received, blockFor, 
dataPresent != 0);
+                        ConsistencyLevel cl = 
CBUtil.readConsistencyLevel(body);
+                        int received = body.readInt();
+                        int blockFor = body.readInt();
+                        if (code == ExceptionCode.WRITE_TIMEOUT)
+                        {
+                            WriteType writeType = 
Enum.valueOf(WriteType.class, CBUtil.readString(body));
+                            if (version.isGreaterOrEqualTo(ProtocolVersion.V5) 
&& writeType == WriteType.CAS)
+                            {
+                                int contentions = body.readShort();
+                                te = new CasWriteTimeoutException(writeType, 
cl, received, blockFor, contentions);
+                            }
+                            else
+                            {
+                                te = new WriteTimeoutException(writeType, cl, 
received, blockFor);
+                            }
+                        }
+                        else
+                        {
+                            byte dataPresent = body.readByte();
+                            te = new ReadTimeoutException(cl, received, 
blockFor, dataPresent != 0);
+                        }
+                        break;
                     }
-                    break;
                 case FUNCTION_FAILURE:
                     String fKeyspace = CBUtil.readString(body);
                     String fName = CBUtil.readString(body);
@@ -163,6 +173,14 @@ public class ErrorMessage extends Message.Response
                     else
                         te = new AlreadyExistsException(ksName, cfName);
                     break;
+                case CAS_WRITE_UNKNOWN:
+                    assert version.isGreaterOrEqualTo(ProtocolVersion.V5);
+
+                    ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
+                    int received = body.readInt();
+                    int blockFor = body.readInt();
+                    te = new CasWriteUnknownResultException(cl, received, 
blockFor);
+                    break;
             }
             return new ErrorMessage(te);
         }
@@ -185,8 +203,7 @@ public class ErrorMessage extends Message.Response
                 case WRITE_FAILURE:
                 case READ_FAILURE:
                     {
-                        RequestFailureException rfe = 
(RequestFailureException)err;
-                        boolean isWrite = err.code() == 
ExceptionCode.WRITE_FAILURE;
+                        RequestFailureException rfe = 
(RequestFailureException) err;
 
                         CBUtil.writeConsistencyLevel(rfe.consistency, dest);
                         dest.writeInt(rfe.received);
@@ -203,24 +220,30 @@ public class ErrorMessage extends Message.Response
                             }
                         }
 
-                        if (isWrite)
-                            
CBUtil.writeAsciiString(((WriteFailureException)rfe).writeType.toString(), 
dest);
+                        if (err.code() == ExceptionCode.WRITE_FAILURE)
+                            CBUtil.writeAsciiString(((WriteFailureException) 
rfe).writeType.toString(), dest);
                         else
-                            
dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0));
+                            dest.writeByte((byte) (((ReadFailureException) 
rfe).dataPresent ? 1 : 0));
+                        break;
                     }
-                    break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
                     RequestTimeoutException rte = (RequestTimeoutException)err;
-                    boolean isWrite = err.code() == 
ExceptionCode.WRITE_TIMEOUT;
 
                     CBUtil.writeConsistencyLevel(rte.consistency, dest);
                     dest.writeInt(rte.received);
                     dest.writeInt(rte.blockFor);
-                    if (isWrite)
+                    if (err.code() == ExceptionCode.WRITE_TIMEOUT)
+                    {
                         
CBUtil.writeAsciiString(((WriteTimeoutException)rte).writeType.toString(), 
dest);
+                        // CasWriteTimeoutException already implies protocol 
V5, but double check to be safe.
+                        if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && 
rte instanceof CasWriteTimeoutException)
+                            
dest.writeShort(((CasWriteTimeoutException)rte).contentions);
+                    }
                     else
+                    {
                         
dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
+                    }
                     break;
                 case FUNCTION_FAILURE:
                     FunctionExecutionException fee = 
(FunctionExecutionException)msg.error;
@@ -237,12 +260,18 @@ public class ErrorMessage extends Message.Response
                     CBUtil.writeAsciiString(aee.ksName, dest);
                     CBUtil.writeAsciiString(aee.cfName, dest);
                     break;
+                case CAS_WRITE_UNKNOWN:
+                    assert version.isGreaterOrEqualTo(ProtocolVersion.V5);
+                    CasWriteUnknownResultException cwue = 
(CasWriteUnknownResultException)err;
+                    CBUtil.writeConsistencyLevel(cwue.consistency, dest);
+                    dest.writeInt(cwue.received);
+                    dest.writeInt(cwue.blockFor);
             }
         }
 
         public int encodedSize(ErrorMessage msg, ProtocolVersion version)
         {
-            final TransportException err = 
getBackwardsCompatibleException(msg, version);
+            TransportException err = getBackwardsCompatibleException(msg, 
version);
             String errorString = err.getMessage() == null ? "" : 
err.getMessage();
             int size = 4 + CBUtil.sizeOfString(errorString);
             switch (err.code())
@@ -255,9 +284,12 @@ public class ErrorMessage extends Message.Response
                 case READ_FAILURE:
                     {
                         RequestFailureException rfe = 
(RequestFailureException)err;
-                        boolean isWrite = err.code() == 
ExceptionCode.WRITE_FAILURE;
+
                         size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) 
+ 4 + 4 + 4;
-                        size += isWrite ? 
CBUtil.sizeOfAsciiString(((WriteFailureException)rfe).writeType.toString()) : 1;
+                        if (err.code() == ExceptionCode.WRITE_FAILURE)
+                            size += 
CBUtil.sizeOfAsciiString(((WriteFailureException)rfe).writeType.toString());
+                        else
+                            size += 1;
 
                         if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
                         {
@@ -274,7 +306,14 @@ public class ErrorMessage extends Message.Response
                     RequestTimeoutException rte = (RequestTimeoutException)err;
                     boolean isWrite = err.code() == 
ExceptionCode.WRITE_TIMEOUT;
                     size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8;
-                    size += isWrite ? 
CBUtil.sizeOfAsciiString(((WriteTimeoutException)rte).writeType.toString()) : 1;
+                    if (isWrite)
+                        size += 
CBUtil.sizeOfAsciiString(((WriteTimeoutException)rte).writeType.toString());
+                    else
+                        size += 1;
+
+                    // CasWriteTimeoutException already implies protocol V5, 
but double check to be safe.
+                    if (isWrite && 
version.isGreaterOrEqualTo(ProtocolVersion.V5) && rte instanceof 
CasWriteTimeoutException)
+                        size += 2; // CasWriteTimeoutException appends a short 
for contentions occured.
                     break;
                 case FUNCTION_FAILURE:
                     FunctionExecutionException fee = 
(FunctionExecutionException)msg.error;
@@ -291,6 +330,11 @@ public class ErrorMessage extends Message.Response
                     size += CBUtil.sizeOfAsciiString(aee.ksName);
                     size += CBUtil.sizeOfAsciiString(aee.cfName);
                     break;
+                case CAS_WRITE_UNKNOWN:
+                    assert version.isGreaterOrEqualTo(ProtocolVersion.V5);
+                    CasWriteUnknownResultException cwue = 
(CasWriteUnknownResultException)err;
+                    size += CBUtil.sizeOfConsistencyLevel(cwue.consistency) + 
4 + 4; // receivedFor: 4, blockFor: 4
+                    break;
             }
             return size;
         }
@@ -309,12 +353,28 @@ public class ErrorMessage extends Message.Response
                     WriteFailureException wfe = (WriteFailureException) 
msg.error;
                     return new WriteTimeoutException(wfe.writeType, 
wfe.consistency, wfe.received, wfe.blockFor);
                 case FUNCTION_FAILURE:
-                    return new InvalidRequestException(msg.toString());
                 case CDC_WRITE_FAILURE:
                     return new InvalidRequestException(msg.toString());
             }
         }
 
+        if (version.isSmallerThan(ProtocolVersion.V5))
+        {
+            switch (msg.error.code())
+            {
+                case WRITE_TIMEOUT:
+                    if (msg.error instanceof CasWriteTimeoutException)
+                    {
+                        CasWriteTimeoutException cwte = 
(CasWriteTimeoutException) msg.error;
+                        return new WriteTimeoutException(WriteType.CAS, 
cwte.consistency, cwte.received, cwte.blockFor);
+                    }
+                    break;
+                case CAS_WRITE_UNKNOWN:
+                    CasWriteUnknownResultException cwue = 
(CasWriteUnknownResultException) msg.error;
+                    return new WriteTimeoutException(WriteType.CAS, 
cwue.consistency, cwue.received, cwue.blockFor);
+            }
+        }
+
         return msg.error;
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java
new file mode 100644
index 0000000..7e7c629
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.InstanceClassLoader;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.fail;
+
+public class CasWriteTest extends DistributedTestBase
+{
+    // Sharing the same cluster to boost test speed. Using a pkGen to make 
sure queries has distinct pk value for paxos instances.
+    private static Cluster cluster;
+    private static final AtomicInteger pkGen = new AtomicInteger(1_000); // 
preserve any pk values less than 1000 for manual queries.
+    private static final Logger logger = 
LoggerFactory.getLogger(CasWriteTest.class);
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupCluster() throws Throwable
+    {
+        cluster = init(Cluster.create(3));
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
+    }
+
+    @AfterClass
+    public static void close()
+    {
+        cluster.close();
+        cluster = null;
+    }
+
+    @Before @After
+    public void resetFilters()
+    {
+        cluster.filters().reset();
+    }
+
+    @Test
+    public void testCasWriteSuccessWithNoContention()
+    {
+        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, 1) IF NOT EXISTS",
+                                       ConsistencyLevel.QUORUM);
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1",
+                                                  ConsistencyLevel.QUORUM),
+                   row(1, 1, 1));
+
+        cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 
WHERE pk = 1 AND ck = 1 IF v = 1",
+                                       ConsistencyLevel.QUORUM);
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1",
+                                                  ConsistencyLevel.QUORUM),
+                   row(1, 1, 2));
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtPreparePhase_ReqLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(2, 3).drop().on(); // 
drop the internode messages to acceptors
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtPreparePhase_RspLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_PREPARE_RSP).from(2, 3).to(1).drop().on(); // 
drop the internode messages to acceptors
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtProposePhase_ReqLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2, 3).drop().on();
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtProposePhase_RspLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_PROPOSE_RSP).from(2, 3).to(1).drop().on();
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtCommitPhase_ReqLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_COMMIT_REQ).from(1).to(2, 3).drop().on();
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+    @Test
+    public void testCasWriteTimeoutAtCommitPhase_RspLost()
+    {
+        expectCasWriteTimeout();
+        cluster.verbs(Verb.PAXOS_COMMIT_RSP).from(2, 3).to(1).drop().on();
+        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), 
ConsistencyLevel.QUORUM);
+    }
+
+
+
+    @Test
+    public void casWriteContentionTimeoutTest() throws InterruptedException
+    {
+        testWithContention(101,
+                           Arrays.asList(1, 3),
+                           c -> {
+                               c.filters().reset();
+                               
c.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(3).drop();
+                               
c.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2).drop();
+                           },
+                           failure ->
+                               failure.get() != null &&
+                               failure.get()
+                                      .getMessage()
+                                      
.contains(CasWriteTimeoutException.class.getCanonicalName()),
+                           "Expecting cause to be CasWriteTimeoutException");
+    }
+
+    private void testWithContention(int testUid,
+                                    List<Integer> contendingNodes,
+                                    Consumer<Cluster> setupForEachRound,
+                                    Function<AtomicReference<Throwable>, 
Boolean> expectedException,
+                                    String assertHintMessage) throws 
InterruptedException
+    {
+        assert contendingNodes.size() == 2;
+        AtomicInteger curPk = new AtomicInteger(1);
+        ExecutorService es = Executors.newFixedThreadPool(3);
+        AtomicReference<Throwable> failure = new AtomicReference<>();
+        Supplier<Boolean> hasExpectedException = () -> 
expectedException.apply(failure);
+        while (!hasExpectedException.get())
+        {
+            failure.set(null);
+            setupForEachRound.accept(cluster);
+
+            List<Future<?>> futures = new ArrayList<>();
+            CountDownLatch latch = new CountDownLatch(3);
+            contendingNodes.forEach(nodeId -> {
+                String query = mkCasInsertQuery((a) -> curPk.get(), testUid, 
nodeId);
+                futures.add(es.submit(() -> {
+                    try
+                    {
+                        latch.countDown();
+                        latch.await(1, TimeUnit.SECONDS); // help threads 
start at approximately same time
+                        cluster.coordinator(nodeId).execute(query, 
ConsistencyLevel.QUORUM);
+                    }
+                    catch (Throwable t)
+                    {
+                        failure.set(t);
+                    }
+                }));
+            });
+
+            FBUtilities.waitOnFutures(futures);
+            curPk.incrementAndGet();
+        }
+
+        es.shutdownNow();
+        es.awaitTermination(1, TimeUnit.MINUTES);
+        Assert.assertTrue(assertHintMessage, hasExpectedException.get());
+    }
+
+    private void expectCasWriteTimeout()
+    {
+        thrown.expect(RuntimeException.class);
+        thrown.expectCause(new BaseMatcher<Throwable>()
+        {
+            public boolean matches(Object item)
+            {
+                return 
InstanceClassLoader.wasLoadedByAnInstanceClassLoader(item.getClass());
+            }
+
+            public void describeTo(Description description)
+            {
+                description.appendText("Cause should be loaded by 
InstanceClassLoader");
+            }
+        });
+        // unable to assert on class becuase the exception thrown was loaded 
by a differnet classloader, InstanceClassLoader
+        // therefor asserts the FQCN name present in the message as a 
workaround
+        
thrown.expectMessage(containsString(CasWriteTimeoutException.class.getCanonicalName()));
+        thrown.expectMessage(containsString("CAS operation timed out"));
+    }
+
+    @Test
+    public void testWriteUnknownResult()
+    {
+        while (true)
+        {
+            cluster.filters().reset();
+            int pk = pkGen.getAndIncrement();
+            
cluster.filters().verbs(Verb.PAXOS_PROPOSE_REQ.id).from(1).to(3).messagesMatching((from,
 to, msg) -> {
+                // Inject a single CAS request in-between prepare and propose 
phases
+                cluster.coordinator(2).execute(mkCasInsertQuery((a) -> pk, 1, 
2),
+                                               ConsistencyLevel.QUORUM);
+                return false;
+            }).drop();
+
+            try
+            {
+                cluster.coordinator(1).execute(mkCasInsertQuery((a) -> pk, 1, 
1), ConsistencyLevel.QUORUM);
+            }
+            catch (Throwable t)
+            {
+                Assert.assertTrue("Expecting cause to be 
CasWriteUncertainException",
+                                  
t.getMessage().contains(CasWriteUnknownResultException.class.getCanonicalName()));
+                return;
+            }
+        }
+    }
+
+    // every invokation returns a query with an unique pk
+    private String mkUniqueCasInsertQuery(int v)
+    {
+        return mkCasInsertQuery(AtomicInteger::getAndIncrement, 1, v);
+    }
+
+    private String mkCasInsertQuery(Function<AtomicInteger, Integer> pkFunc, 
int ck, int v)
+    {
+        String query = String.format("INSERT INTO %s.tbl (pk, ck, v) VALUES 
(%d, %d, %d) IF NOT EXISTS", KEYSPACE, pkFunc.apply(pkGen), ck, v);
+        logger.info("Generated query: " + query);
+        return query;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index ece369a..64dee64 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -29,6 +29,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 
 import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
 import org.apache.cassandra.distributed.impl.RowUtil;
@@ -67,6 +68,7 @@ public class DistributedTestBase
         System.setProperty("org.apache.cassandra.disable_mbean_registration", 
"true");
         nativeLibraryWorkaround();
         processReaperWorkaround();
+        DatabaseDescriptor.clientInitialization();
     }
 
     static String withKeyspace(String replaceIn)
@@ -87,7 +89,7 @@ public class DistributedTestBase
 
     public static void assertRows(Object[][] actual, Object[]... expected)
     {
-        Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
+        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
                             expected.length, actual.length);
 
         for (int i = 0; i < expected.length; i++)
diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java 
b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
index 8497005..cfeddba 100644
--- a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
+++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
@@ -27,14 +27,19 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.transport.messages.EncodeAndDecodeTestBase;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ErrorMessageTest extends EncodeAndDecodeTestBase<ErrorMessage>
 {
@@ -90,6 +95,81 @@ public class ErrorMessageTest extends 
EncodeAndDecodeTestBase<ErrorMessage>
         assertEquals(writeType, deserializedWfe.writeType);
     }
 
+    @Test
+    public void testV5CasWriteTimeoutSerDeser()
+    {
+        int contentions = 1;
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL;
+        CasWriteTimeoutException ex = new 
CasWriteTimeoutException(WriteType.CAS, consistencyLevel, receivedBlockFor, 
receivedBlockFor, contentions);
+
+        ErrorMessage deserialized = 
encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V5);
+        assertTrue(deserialized.error instanceof CasWriteTimeoutException);
+        CasWriteTimeoutException deserializedEx = (CasWriteTimeoutException) 
deserialized.error;
+
+        assertEquals(WriteType.CAS, deserializedEx.writeType);
+        assertEquals(contentions, deserializedEx.contentions);
+        assertEquals(consistencyLevel, deserializedEx.consistency);
+        assertEquals(receivedBlockFor, deserializedEx.received);
+        assertEquals(receivedBlockFor, deserializedEx.blockFor);
+        assertEquals(ex.getMessage(), deserializedEx.getMessage());
+        assertTrue(deserializedEx.getMessage().contains("CAS operation timed 
out - encountered contentions"));
+    }
+
+    @Test
+    public void testV4CasWriteTimeoutSerDeser()
+    {
+        int contentions = 1;
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL;
+        CasWriteTimeoutException ex = new 
CasWriteTimeoutException(WriteType.CAS, consistencyLevel, receivedBlockFor, 
receivedBlockFor, contentions);
+
+        ErrorMessage deserialized = 
encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V4);
+        assertTrue(deserialized.error instanceof WriteTimeoutException);
+        assertFalse(deserialized.error instanceof CasWriteTimeoutException);
+        WriteTimeoutException deserializedEx = (WriteTimeoutException) 
deserialized.error;
+
+        assertEquals(WriteType.CAS, deserializedEx.writeType);
+        assertEquals(consistencyLevel, deserializedEx.consistency);
+        assertEquals(receivedBlockFor, deserializedEx.received);
+        assertEquals(receivedBlockFor, deserializedEx.blockFor);
+    }
+
+    @Test
+    public void testV5CasWriteResultUnknownSerDeser()
+    {
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL;
+        CasWriteUnknownResultException ex = new 
CasWriteUnknownResultException(consistencyLevel, receivedBlockFor, 
receivedBlockFor);
+
+        ErrorMessage deserialized = 
encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V5);
+        assertTrue(deserialized.error instanceof 
CasWriteUnknownResultException);
+        CasWriteUnknownResultException deserializedEx = 
(CasWriteUnknownResultException) deserialized.error;
+
+        assertEquals(consistencyLevel, deserializedEx.consistency);
+        assertEquals(receivedBlockFor, deserializedEx.received);
+        assertEquals(receivedBlockFor, deserializedEx.blockFor);
+        assertEquals(ex.getMessage(), deserializedEx.getMessage());
+        assertTrue(deserializedEx.getMessage().contains("CAS operation result 
is unknown"));
+    }
+
+    @Test
+    public void testV4CasWriteResultUnknownSerDeser()
+    {
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL;
+        CasWriteUnknownResultException ex = new 
CasWriteUnknownResultException(consistencyLevel, receivedBlockFor, 
receivedBlockFor);
+
+        ErrorMessage deserialized = 
encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V4);
+        assertTrue(deserialized.error instanceof WriteTimeoutException);
+        assertFalse(deserialized.error instanceof 
CasWriteUnknownResultException);
+        WriteTimeoutException deserializedEx = (WriteTimeoutException) 
deserialized.error;
+
+        assertEquals(consistencyLevel, deserializedEx.consistency);
+        assertEquals(receivedBlockFor, deserializedEx.received);
+        assertEquals(receivedBlockFor, deserializedEx.blockFor);
+    }
+
     /**
      * Make sure that the map passed in to create a Read/WriteFailureException 
is copied
      * so later modifications to the map passed in don't affect the map in the 
exception.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to