This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new d3b3dcb Separate exceptions for CAS write timeout exceptions caused by contention and unkown result d3b3dcb is described below commit d3b3dcbb353de97220a11f55391babf149410905 Author: yifan-c <yc25c...@gmail.com> AuthorDate: Sun Oct 27 21:01:31 2019 -0700 Separate exceptions for CAS write timeout exceptions caused by contention and unkown result Patch by Yifan Cai; reviewed by Alex Petrov and Dinesh Joshi for CASSANDRA-15350 --- doc/native_protocol_v5.spec | 40 ++- ...xception.java => CasWriteTimeoutException.java} | 10 +- ...on.java => CasWriteUnknownResultException.java} | 7 +- .../apache/cassandra/exceptions/ExceptionCode.java | 3 +- .../exceptions/RequestTimeoutException.java | 8 + .../exceptions/WriteTimeoutException.java | 6 + .../cassandra/metrics/CASClientRequestMetrics.java | 7 +- .../org/apache/cassandra/service/StorageProxy.java | 32 ++- .../cassandra/transport/messages/ErrorMessage.java | 110 ++++++-- .../cassandra/distributed/test/CasWriteTest.java | 276 +++++++++++++++++++++ .../distributed/test/DistributedTestBase.java | 4 +- .../cassandra/transport/ErrorMessageTest.java | 80 ++++++ 12 files changed, 525 insertions(+), 58 deletions(-) diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index d1b3915..d279453 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -1120,7 +1120,7 @@ Table of Contents 0x1003 Truncate_error: error during a truncation error. 0x1100 Write_timeout: Timeout exception during a write request. The rest of the ERROR message body will be - <cl><received><blockfor><writeType> + <cl><received><blockfor><writeType><contentions> where: <cl> is the [consistency] level of the query having triggered the exception. @@ -1144,12 +1144,14 @@ Table of Contents - "BATCH_LOG": the timeout occurred during the write to the batch log when a (logged) batch write was requested. - - "CAS": the timeout occured during the Compare And Set write/update. - - "VIEW": the timeout occured when a write involves - VIEW update and failure to acqiure local view(MV) - lock for key within timeout - - "CDC": the timeout occured when cdc_total_space_in_mb is - exceeded when doing a write to data tracked by cdc. + - "CAS": the timeout occured during the Compare And Set write/update. + - "VIEW": the timeout occured when a write involves + VIEW update and failure to acqiure local view(MV) + lock for key within timeout + - "CDC": the timeout occured when cdc_total_space_in_mb is + exceeded when doing a write to data tracked by cdc. + <contentions> is a [short] that describes the number of contentions occured during the CAS operation. + The field only presents when the <writeType> is "CAS". 0x1200 Read_timeout: Timeout exception during a read request. The rest of the ERROR message body will be <cl><received><blockfor><data_present> @@ -1225,12 +1227,24 @@ Table of Contents - "BATCH_LOG": the failure occured during the write to the batch log when a (logged) batch write was requested. - - "CAS": the failure occured during the Compare And Set write/update. - - "VIEW": the failure occured when a write involves - VIEW update and failure to acqiure local view(MV) - lock for key within timeout - - "CDC": the failure occured when cdc_total_space_in_mb is - exceeded when doing a write to data tracked by cdc. + - "CAS": the failure occured during the Compare And Set write/update. + - "VIEW": the failure occured when a write involves + VIEW update and failure to acqiure local view(MV) + lock for key within timeout + - "CDC": the failure occured when cdc_total_space_in_mb is + exceeded when doing a write to data tracked by cdc. + 0x1600 CDC_WRITE_FAILURE: // todo + 0x1700 CAS_WRITE_UNKNOWN: An exception occured due to contended Compare And Set write/update. + The CAS operation was only partially completed and the operation may or may not get completed by + the contending CAS write or SERIAL/LOCAL_SERIAL read. The rest of the ERROR message body will be + <cl><received><blockfor> + where: + <cl> is the [consistency] level of the query having triggered + the exception. + <received> is an [int] representing the number of nodes having + acknowledged the request. + <blockfor> is an [int] representing the number of replicas whose + acknowledgement is required to achieve <cl>. 0x2000 Syntax_error: The submitted query has a syntax error. 0x2100 Unauthorized: The logged user doesn't have the right to perform diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java similarity index 69% copy from src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java copy to src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java index af8d42b..b134764 100644 --- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/CasWriteTimeoutException.java @@ -20,13 +20,13 @@ package org.apache.cassandra.exceptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; -public class WriteTimeoutException extends RequestTimeoutException +public class CasWriteTimeoutException extends WriteTimeoutException { - public final WriteType writeType; + public final int contentions; - public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor) + public CasWriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor, int contentions) { - super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor); - this.writeType = writeType; + super(writeType, consistency, received, blockFor, String.format("CAS operation timed out - encountered contentions: %d", contentions)); + this.contentions = contentions; } } diff --git a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java b/src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java similarity index 77% copy from src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java copy to src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java index 156dce7..d5dda84 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/CasWriteUnknownResultException.java @@ -15,19 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.exceptions; import org.apache.cassandra.db.ConsistencyLevel; -public class RequestTimeoutException extends RequestExecutionException +public class CasWriteUnknownResultException extends RequestExecutionException { public final ConsistencyLevel consistency; public final int received; public final int blockFor; - protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor) + public CasWriteUnknownResultException(ConsistencyLevel consistency, int received, int blockFor) { - super(code, String.format("Operation timed out - received only %d responses.", received)); + super(ExceptionCode.CAS_WRITE_UNKNOWN, String.format("CAS operation result is unknown - proposal accepted by %d but not a quorum.", received)); this.consistency = consistency; this.received = received; this.blockFor = blockFor; diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index 9324110..1766951 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -43,6 +43,7 @@ public enum ExceptionCode FUNCTION_FAILURE (0x1400), WRITE_FAILURE (0x1500), CDC_WRITE_FAILURE (0x1600), + CAS_WRITE_UNKNOWN (0x1700), // 2xx: problem validating the request SYNTAX_ERROR (0x2000), @@ -60,7 +61,7 @@ public enum ExceptionCode valueToCode.put(code.value, code); } - private ExceptionCode(int value) + ExceptionCode(int value) { this.value = value; } diff --git a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java index 156dce7..853ba2f 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java @@ -32,4 +32,12 @@ public class RequestTimeoutException extends RequestExecutionException this.received = received; this.blockFor = blockFor; } + + protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, String msg) + { + super(code, msg); + this.consistency = consistency; + this.received = received; + this.blockFor = blockFor; + } } diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java index af8d42b..4b4ce38 100644 --- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java @@ -29,4 +29,10 @@ public class WriteTimeoutException extends RequestTimeoutException super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor); this.writeType = writeType; } + + public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor, String msg) + { + super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor, msg); + this.writeType = writeType; + } } diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java index 9884ff1..c6d3921 100644 --- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java @@ -20,20 +20,22 @@ package org.apache.cassandra.metrics; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; public class CASClientRequestMetrics extends ClientRequestMetrics { public final Histogram contention; - public final Counter unfinishedCommit; + public final Meter unknownResult; public CASClientRequestMetrics(String scope) { super(scope); contention = Metrics.histogram(factory.createMetricName("ContentionHistogram"), false); - unfinishedCommit = Metrics.counter(factory.createMetricName("UnfinishedCommit")); + unfinishedCommit = Metrics.counter(factory.createMetricName("UnfinishedCommit")); + unknownResult = Metrics.meter(factory.createMetricName("UnknownResult")); } public void release() @@ -41,5 +43,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics super.release(); Metrics.remove(factory.createMetricName("ContentionHistogram")); Metrics.remove(factory.createMetricName("UnfinishedCommit")); + Metrics.remove(factory.createMetricName("UnknownResult")); } } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 11c72ec..9fc6b52 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -214,7 +214,7 @@ public class StorageProxy implements StorageProxyMBean ClientState state, int nowInSeconds, long queryStartNanoTime) - throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException + throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException, CasWriteUnknownResultException { final long startTimeForMetrics = System.nanoTime(); TableMetadata metadata = Schema.instance.getTableMetadata(keyspaceName, cfName); @@ -287,7 +287,18 @@ public class StorageProxy implements StorageProxyMBean throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } - catch (WriteTimeoutException|ReadTimeoutException e) + catch (CasWriteUnknownResultException e) + { + casWriteMetrics.unknownResult.mark(); + throw e; + } + catch (WriteTimeoutException wte) + { + casWriteMetrics.timeouts.mark(); + writeMetricsMap.get(consistencyForPaxos).timeouts.mark(); + throw new CasWriteTimeoutException(wte.writeType, wte.consistency, wte.received, wte.blockFor, contentions); + } + catch (ReadTimeoutException e) { casWriteMetrics.timeouts.mark(); writeMetricsMap.get(consistencyForPaxos).timeouts.mark(); @@ -299,7 +310,7 @@ public class StorageProxy implements StorageProxyMBean writeMetricsMap.get(consistencyForPaxos).failures.mark(); throw e; } - catch(UnavailableException e) + catch (UnavailableException e) { casWriteMetrics.unavailables.mark(); writeMetricsMap.get(consistencyForPaxos).unavailables.mark(); @@ -465,10 +476,15 @@ public class StorageProxy implements StorageProxyMBean return callback; } - private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime) - throws WriteTimeoutException + /** + * Propose the {@param proposal} accoding to the {@param replicaPlan}. + * When {@param backoffIfPartial} is true, the proposer backs off when seeing the proposal being accepted by some but not a quorum. + * The result of the cooresponding CAS in uncertain as the accepted proposal may or may not be spread to other nodes in later rounds. + */ + private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean backoffIfPartial, long queryStartNanoTime) + throws WriteTimeoutException, CasWriteUnknownResultException { - ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime); + ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !backoffIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime); Message<Commit> message = Message.out(PAXOS_PROPOSE_REQ, proposal); for (Replica replica : replicaPlan.contacts()) { @@ -496,8 +512,8 @@ public class StorageProxy implements StorageProxyMBean if (callback.isSuccessful()) return true; - if (timeoutIfPartial && !callback.isFullyRefused()) - throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants()); + if (backoffIfPartial && !callback.isFullyRefused()) + throw new CasWriteUnknownResultException(replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants()); return false; } diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 7b97be4..cd2af54 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -114,20 +114,30 @@ public class ErrorMessage extends Message.Response break; case WRITE_TIMEOUT: case READ_TIMEOUT: - ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); - int received = body.readInt(); - int blockFor = body.readInt(); - if (code == ExceptionCode.WRITE_TIMEOUT) { - WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); - te = new WriteTimeoutException(writeType, cl, received, blockFor); - } - else - { - byte dataPresent = body.readByte(); - te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0); + ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); + int received = body.readInt(); + int blockFor = body.readInt(); + if (code == ExceptionCode.WRITE_TIMEOUT) + { + WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && writeType == WriteType.CAS) + { + int contentions = body.readShort(); + te = new CasWriteTimeoutException(writeType, cl, received, blockFor, contentions); + } + else + { + te = new WriteTimeoutException(writeType, cl, received, blockFor); + } + } + else + { + byte dataPresent = body.readByte(); + te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0); + } + break; } - break; case FUNCTION_FAILURE: String fKeyspace = CBUtil.readString(body); String fName = CBUtil.readString(body); @@ -163,6 +173,14 @@ public class ErrorMessage extends Message.Response else te = new AlreadyExistsException(ksName, cfName); break; + case CAS_WRITE_UNKNOWN: + assert version.isGreaterOrEqualTo(ProtocolVersion.V5); + + ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); + int received = body.readInt(); + int blockFor = body.readInt(); + te = new CasWriteUnknownResultException(cl, received, blockFor); + break; } return new ErrorMessage(te); } @@ -185,8 +203,7 @@ public class ErrorMessage extends Message.Response case WRITE_FAILURE: case READ_FAILURE: { - RequestFailureException rfe = (RequestFailureException)err; - boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE; + RequestFailureException rfe = (RequestFailureException) err; CBUtil.writeConsistencyLevel(rfe.consistency, dest); dest.writeInt(rfe.received); @@ -203,24 +220,30 @@ public class ErrorMessage extends Message.Response } } - if (isWrite) - CBUtil.writeAsciiString(((WriteFailureException)rfe).writeType.toString(), dest); + if (err.code() == ExceptionCode.WRITE_FAILURE) + CBUtil.writeAsciiString(((WriteFailureException) rfe).writeType.toString(), dest); else - dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0)); + dest.writeByte((byte) (((ReadFailureException) rfe).dataPresent ? 1 : 0)); + break; } - break; case WRITE_TIMEOUT: case READ_TIMEOUT: RequestTimeoutException rte = (RequestTimeoutException)err; - boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT; CBUtil.writeConsistencyLevel(rte.consistency, dest); dest.writeInt(rte.received); dest.writeInt(rte.blockFor); - if (isWrite) + if (err.code() == ExceptionCode.WRITE_TIMEOUT) + { CBUtil.writeAsciiString(((WriteTimeoutException)rte).writeType.toString(), dest); + // CasWriteTimeoutException already implies protocol V5, but double check to be safe. + if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && rte instanceof CasWriteTimeoutException) + dest.writeShort(((CasWriteTimeoutException)rte).contentions); + } else + { dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0)); + } break; case FUNCTION_FAILURE: FunctionExecutionException fee = (FunctionExecutionException)msg.error; @@ -237,12 +260,18 @@ public class ErrorMessage extends Message.Response CBUtil.writeAsciiString(aee.ksName, dest); CBUtil.writeAsciiString(aee.cfName, dest); break; + case CAS_WRITE_UNKNOWN: + assert version.isGreaterOrEqualTo(ProtocolVersion.V5); + CasWriteUnknownResultException cwue = (CasWriteUnknownResultException)err; + CBUtil.writeConsistencyLevel(cwue.consistency, dest); + dest.writeInt(cwue.received); + dest.writeInt(cwue.blockFor); } } public int encodedSize(ErrorMessage msg, ProtocolVersion version) { - final TransportException err = getBackwardsCompatibleException(msg, version); + TransportException err = getBackwardsCompatibleException(msg, version); String errorString = err.getMessage() == null ? "" : err.getMessage(); int size = 4 + CBUtil.sizeOfString(errorString); switch (err.code()) @@ -255,9 +284,12 @@ public class ErrorMessage extends Message.Response case READ_FAILURE: { RequestFailureException rfe = (RequestFailureException)err; - boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE; + size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4; - size += isWrite ? CBUtil.sizeOfAsciiString(((WriteFailureException)rfe).writeType.toString()) : 1; + if (err.code() == ExceptionCode.WRITE_FAILURE) + size += CBUtil.sizeOfAsciiString(((WriteFailureException)rfe).writeType.toString()); + else + size += 1; if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) { @@ -274,7 +306,14 @@ public class ErrorMessage extends Message.Response RequestTimeoutException rte = (RequestTimeoutException)err; boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT; size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8; - size += isWrite ? CBUtil.sizeOfAsciiString(((WriteTimeoutException)rte).writeType.toString()) : 1; + if (isWrite) + size += CBUtil.sizeOfAsciiString(((WriteTimeoutException)rte).writeType.toString()); + else + size += 1; + + // CasWriteTimeoutException already implies protocol V5, but double check to be safe. + if (isWrite && version.isGreaterOrEqualTo(ProtocolVersion.V5) && rte instanceof CasWriteTimeoutException) + size += 2; // CasWriteTimeoutException appends a short for contentions occured. break; case FUNCTION_FAILURE: FunctionExecutionException fee = (FunctionExecutionException)msg.error; @@ -291,6 +330,11 @@ public class ErrorMessage extends Message.Response size += CBUtil.sizeOfAsciiString(aee.ksName); size += CBUtil.sizeOfAsciiString(aee.cfName); break; + case CAS_WRITE_UNKNOWN: + assert version.isGreaterOrEqualTo(ProtocolVersion.V5); + CasWriteUnknownResultException cwue = (CasWriteUnknownResultException)err; + size += CBUtil.sizeOfConsistencyLevel(cwue.consistency) + 4 + 4; // receivedFor: 4, blockFor: 4 + break; } return size; } @@ -309,12 +353,28 @@ public class ErrorMessage extends Message.Response WriteFailureException wfe = (WriteFailureException) msg.error; return new WriteTimeoutException(wfe.writeType, wfe.consistency, wfe.received, wfe.blockFor); case FUNCTION_FAILURE: - return new InvalidRequestException(msg.toString()); case CDC_WRITE_FAILURE: return new InvalidRequestException(msg.toString()); } } + if (version.isSmallerThan(ProtocolVersion.V5)) + { + switch (msg.error.code()) + { + case WRITE_TIMEOUT: + if (msg.error instanceof CasWriteTimeoutException) + { + CasWriteTimeoutException cwte = (CasWriteTimeoutException) msg.error; + return new WriteTimeoutException(WriteType.CAS, cwte.consistency, cwte.received, cwte.blockFor); + } + break; + case CAS_WRITE_UNKNOWN: + CasWriteUnknownResultException cwue = (CasWriteUnknownResultException) msg.error; + return new WriteTimeoutException(WriteType.CAS, cwue.consistency, cwue.received, cwue.blockFor); + } + } + return msg.error; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java new file mode 100644 index 0000000..7e7c629 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.impl.InstanceClassLoader; +import org.apache.cassandra.exceptions.CasWriteTimeoutException; +import org.apache.cassandra.exceptions.CasWriteUnknownResultException; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.utils.FBUtilities; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.fail; + +public class CasWriteTest extends DistributedTestBase +{ + // Sharing the same cluster to boost test speed. Using a pkGen to make sure queries has distinct pk value for paxos instances. + private static Cluster cluster; + private static final AtomicInteger pkGen = new AtomicInteger(1_000); // preserve any pk values less than 1000 for manual queries. + private static final Logger logger = LoggerFactory.getLogger(CasWriteTest.class); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void setupCluster() throws Throwable + { + cluster = init(Cluster.create(3)); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + } + + @AfterClass + public static void close() + { + cluster.close(); + cluster = null; + } + + @Before @After + public void resetFilters() + { + cluster.filters().reset(); + } + + @Test + public void testCasWriteSuccessWithNoContention() + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", + ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 AND ck = 1 IF v = 1", + ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 2)); + } + + @Test + public void testCasWriteTimeoutAtPreparePhase_ReqLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(2, 3).drop().on(); // drop the internode messages to acceptors + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + @Test + public void testCasWriteTimeoutAtPreparePhase_RspLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_PREPARE_RSP).from(2, 3).to(1).drop().on(); // drop the internode messages to acceptors + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + @Test + public void testCasWriteTimeoutAtProposePhase_ReqLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2, 3).drop().on(); + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + @Test + public void testCasWriteTimeoutAtProposePhase_RspLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_PROPOSE_RSP).from(2, 3).to(1).drop().on(); + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + @Test + public void testCasWriteTimeoutAtCommitPhase_ReqLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_COMMIT_REQ).from(1).to(2, 3).drop().on(); + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + @Test + public void testCasWriteTimeoutAtCommitPhase_RspLost() + { + expectCasWriteTimeout(); + cluster.verbs(Verb.PAXOS_COMMIT_RSP).from(2, 3).to(1).drop().on(); + cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM); + } + + + + @Test + public void casWriteContentionTimeoutTest() throws InterruptedException + { + testWithContention(101, + Arrays.asList(1, 3), + c -> { + c.filters().reset(); + c.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(3).drop(); + c.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2).drop(); + }, + failure -> + failure.get() != null && + failure.get() + .getMessage() + .contains(CasWriteTimeoutException.class.getCanonicalName()), + "Expecting cause to be CasWriteTimeoutException"); + } + + private void testWithContention(int testUid, + List<Integer> contendingNodes, + Consumer<Cluster> setupForEachRound, + Function<AtomicReference<Throwable>, Boolean> expectedException, + String assertHintMessage) throws InterruptedException + { + assert contendingNodes.size() == 2; + AtomicInteger curPk = new AtomicInteger(1); + ExecutorService es = Executors.newFixedThreadPool(3); + AtomicReference<Throwable> failure = new AtomicReference<>(); + Supplier<Boolean> hasExpectedException = () -> expectedException.apply(failure); + while (!hasExpectedException.get()) + { + failure.set(null); + setupForEachRound.accept(cluster); + + List<Future<?>> futures = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(3); + contendingNodes.forEach(nodeId -> { + String query = mkCasInsertQuery((a) -> curPk.get(), testUid, nodeId); + futures.add(es.submit(() -> { + try + { + latch.countDown(); + latch.await(1, TimeUnit.SECONDS); // help threads start at approximately same time + cluster.coordinator(nodeId).execute(query, ConsistencyLevel.QUORUM); + } + catch (Throwable t) + { + failure.set(t); + } + })); + }); + + FBUtilities.waitOnFutures(futures); + curPk.incrementAndGet(); + } + + es.shutdownNow(); + es.awaitTermination(1, TimeUnit.MINUTES); + Assert.assertTrue(assertHintMessage, hasExpectedException.get()); + } + + private void expectCasWriteTimeout() + { + thrown.expect(RuntimeException.class); + thrown.expectCause(new BaseMatcher<Throwable>() + { + public boolean matches(Object item) + { + return InstanceClassLoader.wasLoadedByAnInstanceClassLoader(item.getClass()); + } + + public void describeTo(Description description) + { + description.appendText("Cause should be loaded by InstanceClassLoader"); + } + }); + // unable to assert on class becuase the exception thrown was loaded by a differnet classloader, InstanceClassLoader + // therefor asserts the FQCN name present in the message as a workaround + thrown.expectMessage(containsString(CasWriteTimeoutException.class.getCanonicalName())); + thrown.expectMessage(containsString("CAS operation timed out")); + } + + @Test + public void testWriteUnknownResult() + { + while (true) + { + cluster.filters().reset(); + int pk = pkGen.getAndIncrement(); + cluster.filters().verbs(Verb.PAXOS_PROPOSE_REQ.id).from(1).to(3).messagesMatching((from, to, msg) -> { + // Inject a single CAS request in-between prepare and propose phases + cluster.coordinator(2).execute(mkCasInsertQuery((a) -> pk, 1, 2), + ConsistencyLevel.QUORUM); + return false; + }).drop(); + + try + { + cluster.coordinator(1).execute(mkCasInsertQuery((a) -> pk, 1, 1), ConsistencyLevel.QUORUM); + } + catch (Throwable t) + { + Assert.assertTrue("Expecting cause to be CasWriteUncertainException", + t.getMessage().contains(CasWriteUnknownResultException.class.getCanonicalName())); + return; + } + } + } + + // every invokation returns a query with an unique pk + private String mkUniqueCasInsertQuery(int v) + { + return mkCasInsertQuery(AtomicInteger::getAndIncrement, 1, v); + } + + private String mkCasInsertQuery(Function<AtomicInteger, Integer> pkFunc, int ck, int v) + { + String query = String.format("INSERT INTO %s.tbl (pk, ck, v) VALUES (%d, %d, %d) IF NOT EXISTS", KEYSPACE, pkFunc.apply(pkGen), ck, v); + logger.info("Generated query: " + query); + return query; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java index ece369a..64dee64 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java @@ -29,6 +29,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import com.datastax.driver.core.ResultSet; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.distributed.impl.AbstractCluster; import org.apache.cassandra.distributed.impl.IsolatedExecutor; import org.apache.cassandra.distributed.impl.RowUtil; @@ -67,6 +68,7 @@ public class DistributedTestBase System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); nativeLibraryWorkaround(); processReaperWorkaround(); + DatabaseDescriptor.clientInitialization(); } static String withKeyspace(String replaceIn) @@ -87,7 +89,7 @@ public class DistributedTestBase public static void assertRows(Object[][] actual, Object[]... expected) { - Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual), + Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected), expected.length, actual.length); for (int i = 0; i < expected.length; i++) diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java index 8497005..cfeddba 100644 --- a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java +++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java @@ -27,14 +27,19 @@ import org.junit.Test; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.exceptions.CasWriteTimeoutException; +import org.apache.cassandra.exceptions.CasWriteUnknownResultException; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.transport.messages.EncodeAndDecodeTestBase; import org.apache.cassandra.transport.messages.ErrorMessage; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class ErrorMessageTest extends EncodeAndDecodeTestBase<ErrorMessage> { @@ -90,6 +95,81 @@ public class ErrorMessageTest extends EncodeAndDecodeTestBase<ErrorMessage> assertEquals(writeType, deserializedWfe.writeType); } + @Test + public void testV5CasWriteTimeoutSerDeser() + { + int contentions = 1; + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL; + CasWriteTimeoutException ex = new CasWriteTimeoutException(WriteType.CAS, consistencyLevel, receivedBlockFor, receivedBlockFor, contentions); + + ErrorMessage deserialized = encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V5); + assertTrue(deserialized.error instanceof CasWriteTimeoutException); + CasWriteTimeoutException deserializedEx = (CasWriteTimeoutException) deserialized.error; + + assertEquals(WriteType.CAS, deserializedEx.writeType); + assertEquals(contentions, deserializedEx.contentions); + assertEquals(consistencyLevel, deserializedEx.consistency); + assertEquals(receivedBlockFor, deserializedEx.received); + assertEquals(receivedBlockFor, deserializedEx.blockFor); + assertEquals(ex.getMessage(), deserializedEx.getMessage()); + assertTrue(deserializedEx.getMessage().contains("CAS operation timed out - encountered contentions")); + } + + @Test + public void testV4CasWriteTimeoutSerDeser() + { + int contentions = 1; + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL; + CasWriteTimeoutException ex = new CasWriteTimeoutException(WriteType.CAS, consistencyLevel, receivedBlockFor, receivedBlockFor, contentions); + + ErrorMessage deserialized = encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V4); + assertTrue(deserialized.error instanceof WriteTimeoutException); + assertFalse(deserialized.error instanceof CasWriteTimeoutException); + WriteTimeoutException deserializedEx = (WriteTimeoutException) deserialized.error; + + assertEquals(WriteType.CAS, deserializedEx.writeType); + assertEquals(consistencyLevel, deserializedEx.consistency); + assertEquals(receivedBlockFor, deserializedEx.received); + assertEquals(receivedBlockFor, deserializedEx.blockFor); + } + + @Test + public void testV5CasWriteResultUnknownSerDeser() + { + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL; + CasWriteUnknownResultException ex = new CasWriteUnknownResultException(consistencyLevel, receivedBlockFor, receivedBlockFor); + + ErrorMessage deserialized = encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V5); + assertTrue(deserialized.error instanceof CasWriteUnknownResultException); + CasWriteUnknownResultException deserializedEx = (CasWriteUnknownResultException) deserialized.error; + + assertEquals(consistencyLevel, deserializedEx.consistency); + assertEquals(receivedBlockFor, deserializedEx.received); + assertEquals(receivedBlockFor, deserializedEx.blockFor); + assertEquals(ex.getMessage(), deserializedEx.getMessage()); + assertTrue(deserializedEx.getMessage().contains("CAS operation result is unknown")); + } + + @Test + public void testV4CasWriteResultUnknownSerDeser() + { + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.SERIAL; + CasWriteUnknownResultException ex = new CasWriteUnknownResultException(consistencyLevel, receivedBlockFor, receivedBlockFor); + + ErrorMessage deserialized = encodeThenDecode(ErrorMessage.fromException(ex), ProtocolVersion.V4); + assertTrue(deserialized.error instanceof WriteTimeoutException); + assertFalse(deserialized.error instanceof CasWriteUnknownResultException); + WriteTimeoutException deserializedEx = (WriteTimeoutException) deserialized.error; + + assertEquals(consistencyLevel, deserializedEx.consistency); + assertEquals(receivedBlockFor, deserializedEx.received); + assertEquals(receivedBlockFor, deserializedEx.blockFor); + } + /** * Make sure that the map passed in to create a Read/WriteFailureException is copied * so later modifications to the map passed in don't affect the map in the exception. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org