Repository: cassandra Updated Branches: refs/heads/trunk bb4c5c3c4 -> 6f647aaa0
Make it possible to monitor an ideal consistency level separate from actual consistency level Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-13289 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f647aaa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f647aaa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f647aaa Branch: refs/heads/trunk Commit: 6f647aaa0df6f90ee298d372e624c9e3c1ae937e Parents: bb4c5c3 Author: Ariel Weisberg <aweisb...@apple.com> Authored: Thu Mar 2 16:46:13 2017 -0500 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Thu Mar 30 17:01:20 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 5 + .../org/apache/cassandra/config/Config.java | 8 + .../cassandra/config/DatabaseDescriptor.java | 11 + .../locator/AbstractReplicationStrategy.java | 49 +++- .../cassandra/metrics/KeyspaceMetrics.java | 9 + .../service/AbstractWriteResponseHandler.java | 105 ++++++++- .../DatacenterSyncWriteResponseHandler.java | 30 ++- .../service/DatacenterWriteResponseHandler.java | 8 + .../apache/cassandra/service/StorageProxy.java | 26 ++- .../cassandra/service/StorageProxyMBean.java | 5 + .../cassandra/service/WriteResponseHandler.java | 4 + .../config/DatabaseDescriptorRefTest.java | 1 + .../service/WriteResponseHandlerTest.java | 234 +++++++++++++++++++ 14 files changed, 474 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a164ee..d4b53d0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289) * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index d8392a0..f2c4c84 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1120,3 +1120,8 @@ back_pressure_strategy: # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128. # otc_coalescing_enough_coalesced_messages: 8 + +# Track a metric per keyspace indicating whether replication achieved the ideal consistency +# level for writes without timing out. This is different from the consistency level requested by +# each write which may be lower in order to facilitate availability. +# ideal_consistency_level: EACH_QUORUM http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 36ce576..1461cd4 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -32,6 +32,8 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.ConsistencyLevel; + /** * A class that contains configuration properties for the cassandra node it runs within. * @@ -271,6 +273,12 @@ public class Config public int tracetype_query_ttl = (int) TimeUnit.DAYS.toSeconds(1); public int tracetype_repair_ttl = (int) TimeUnit.DAYS.toSeconds(7); + /** + * Maintain statistics on whether writes achieve the ideal consistency level + * before expiring and becoming hints + */ + public volatile ConsistencyLevel ideal_consistency_level = null; + /* * Strategy to use for coalescing messages in OutboundTcpConnection. * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 465cd8a..debf161 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -44,6 +44,7 @@ import org.apache.cassandra.auth.IAuthorizer; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.auth.IRoleManager; import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; @@ -2269,4 +2270,14 @@ public class DatabaseDescriptor { return backPressureStrategy; } + + public static ConsistencyLevel getIdealConsistencyLevel() + { + return conf.ideal_consistency_level; + } + + public static void setIdealConsistencyLevel(ConsistencyLevel cl) + { + conf.ideal_consistency_level = cl; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 9c43486..c3498d9 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -28,6 +28,7 @@ import com.google.common.collect.Multimap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WriteType; @@ -40,6 +41,7 @@ import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler; import org.apache.cassandra.service.DatacenterWriteResponseHandler; import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.FBUtilities; + import org.cliffc.high_scale_lib.NonBlockingHashMap; /** @@ -135,16 +137,57 @@ public abstract class AbstractReplicationStrategy WriteType writeType, long queryStartNanoTime) { + return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); + } + + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, + Collection<InetAddress> pendingEndpoints, + ConsistencyLevel consistency_level, + Runnable callback, + WriteType writeType, + long queryStartNanoTime, + ConsistencyLevel idealConsistencyLevel) + { + AbstractWriteResponseHandler resultResponseHandler; if (consistency_level.isDatacenterLocal()) { // block for in this context will be localnodes block. - return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } - return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + else + { + resultResponseHandler = new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + } + + //Check if tracking the ideal consistency level is configured + if (idealConsistencyLevel != null) + { + //If ideal and requested are the same just use this handler to track the ideal consistency level + //This is also used so that the ideal consistency level handler when constructed knows it is the ideal + //one for tracking purposes + if (idealConsistencyLevel == consistency_level) + { + resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler); + } + else + { + //Construct a delegate response handler to use to track the ideal consistency level + AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(naturalEndpoints, + pendingEndpoints, + idealConsistencyLevel, + callback, + writeType, + queryStartNanoTime, + idealConsistencyLevel); + resultResponseHandler.setIdealCLResponseHandler(idealHandler); + } + } + + return resultResponseHandler; } private Keyspace getKeyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 2e1c384..63f8dd0 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -19,8 +19,10 @@ package org.apache.cassandra.metrics; import java.util.Set; +import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -91,6 +93,10 @@ public class KeyspaceMetrics public final LatencyMetrics casPropose; /** CAS Commit metrics */ public final LatencyMetrics casCommit; + /** Writes failed ideal consistency **/ + public final Counter writeFailedIdealCL; + /** Ideal CL write latency metrics */ + public final LatencyMetrics idealCLWriteLatency; public final MetricNameFactory factory; private Keyspace keyspace; @@ -236,6 +242,8 @@ public class KeyspaceMetrics casPrepare = new LatencyMetrics(factory, "CasPrepare"); casPropose = new LatencyMetrics(factory, "CasPropose"); casCommit = new LatencyMetrics(factory, "CasCommit"); + writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL")); + idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite"); } /** @@ -251,6 +259,7 @@ public class KeyspaceMetrics readLatency.release(); writeLatency.release(); rangeLatency.release(); + idealCLWriteLatency.release(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 8c30b89..b5eaadb 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.google.common.collect.Iterables; @@ -40,8 +41,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition; public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T> { - protected static final Logger logger = LoggerFactory.getLogger( AbstractWriteResponseHandler.class ); + protected static final Logger logger = LoggerFactory.getLogger(AbstractWriteResponseHandler.class); + //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached. + private AtomicInteger responsesAndExpirations; private final SimpleCondition condition = new SimpleCondition(); protected final Keyspace keyspace; protected final Collection<InetAddress> naturalEndpoints; @@ -50,14 +53,22 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW protected final Collection<InetAddress> pendingEndpoints; protected final WriteType writeType; private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater - = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); + = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; private final long queryStartNanoTime; private volatile boolean supportsBackPressure = true; /** - * @param callback A callback to be called when the write is successful. + * Delegate to another WriteReponseHandler or possibly this one to track if the ideal consistency level was reached. + * Will be set to null if ideal CL was not configured + * Will be set to an AWRH delegate if ideal CL was configured + * Will be same as "this" if this AWRH is the ideal consistency level + */ + private AbstractWriteResponseHandler idealCLDelegate; + + /** + * @param callback A callback to be called when the write is successful. * @param queryStartNanoTime */ protected AbstractWriteResponseHandler(Keyspace keyspace, @@ -119,6 +130,64 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW } /** + * Set a delegate ideal CL write response handler. Note that this could be the same as this + * if the ideal CL and requested CL are the same. + */ + public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler) + { + this.idealCLDelegate = handler; + idealCLDelegate.responsesAndExpirations = new AtomicInteger(naturalEndpoints.size() + pendingEndpoints.size()); + } + + /** + * This logs the response but doesn't do any further processing related to this write response handler + * on whether the CL was achieved. Only call this after the subclass has completed all it's processing + * since the subclass instance may be queried to find out if the CL was achieved. + */ + protected final void logResponseToIdealCLDelegate(MessageIn<T> m) + { + //Tracking ideal CL was not configured + if (idealCLDelegate == null) + { + return; + } + + if (idealCLDelegate == this) + { + //Processing of the message was already done since this is the handler for the + //ideal consistency level. Just decrement the counter. + decrementResponseOrExpired(); + } + else + { + //Let the delegate do full processing, this will loop back into the branch above + //with idealCLDelegate == this, because the ideal write handler idealCLDelegate will always + //be set to this in the delegate. + idealCLDelegate.response(m); + } + } + + public final void expired() + { + //Tracking ideal CL was not configured + if (idealCLDelegate == null) + { + return; + } + + //The requested CL matched ideal CL so reuse this object + if (idealCLDelegate == this) + { + decrementResponseOrExpired(); + } + else + { + //Have the delegate track the expired response + idealCLDelegate.decrementResponseOrExpired(); + } + } + + /** * @return the minimum number of endpoints that must reply. */ protected int totalBlockFor() @@ -149,7 +218,9 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW */ protected abstract int ackCount(); - /** null message means "response from local write" */ + /** + * null message means "response from local write" + */ public abstract void response(MessageIn<T> msg); public void assureSufficientLiveNodes() throws UnavailableException @@ -170,8 +241,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW logger.trace("Got failure from {}", from); int n = waitingFor(from) - ? failuresUpdater.incrementAndGet(this) - : failures; + ? failuresUpdater.incrementAndGet(this) + : failures; failureReasonByEndpoint.put(from, failureReason); @@ -189,4 +260,26 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW { this.supportsBackPressure = supportsBackPressure; } + + /** + * Decrement the counter for all responses/expirations and if the counter + * hits 0 check to see if the ideal consistency level (this write response handler) + * was reached using the signal. + */ + private final void decrementResponseOrExpired() + { + int decrementedValue = responsesAndExpirations.decrementAndGet(); + if (decrementedValue == 0) + { + //The condition being signaled is a valid proxy for the CL being achieved + if (!condition.isSignaled()) + { + keyspace.metric.writeFailedIdealCL.inc(); + } + else + { + keyspace.metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 9584611..4137e3a 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -71,21 +71,29 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse public void response(MessageIn<T> message) { - String dataCenter = message == null - ? DatabaseDescriptor.getLocalDataCenter() - : snitch.getDatacenter(message.from); + try + { + String dataCenter = message == null + ? DatabaseDescriptor.getLocalDataCenter() + : snitch.getDatacenter(message.from); + + responses.get(dataCenter).getAndDecrement(); + acks.incrementAndGet(); - responses.get(dataCenter).getAndDecrement(); - acks.incrementAndGet(); + for (AtomicInteger i : responses.values()) + { + if (i.get() > 0) + return; + } - for (AtomicInteger i : responses.values()) + // all the quorum conditions are met + signal(); + } + finally { - if (i.get() > 0) - return; + //Must be last after all subclass processing + logResponseToIdealCLDelegate(message); } - - // all the quorum conditions are met - signal(); } protected int ackCount() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 2309e87..83dddcf 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -46,7 +46,15 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> public void response(MessageIn<T> message) { if (message == null || waitingFor(message.from)) + { super.response(message); + } + else + { + //WriteResponseHandler.response will call logResonseToIdealCLDelegate so only do it if not calling WriteResponseHandler.response. + //Must be last after all subclass processing + logResponseToIdealCLDelegate(message); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0585717..6be5286 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -561,9 +561,16 @@ public class StorageProxy implements StorageProxyMBean MessagingService.instance().sendOneWay(message, destination); } } - else if (shouldHint) + else { - submitHint(proposal.makeMutation(), destination, null); + if (responseHandler != null) + { + responseHandler.expired(); + } + if (shouldHint) + { + submitHint(proposal.makeMutation(), destination, null); + } } } @@ -1257,6 +1264,8 @@ public class StorageProxy implements StorageProxyMBean } else { + //Immediately mark the response as expired since the request will not be sent + responseHandler.expired(); if (shouldHint(destination)) { if (endpointsToHint == null) @@ -2774,4 +2783,17 @@ public class StorageProxy implements StorageProxyMBean { return Schema.instance.getNumberOfTables(); } + + public String getIdealConsistencyLevel() + { + return DatabaseDescriptor.getIdealConsistencyLevel().toString(); + } + + public String setIdealConsistencyLevel(String cl) + { + ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel(); + ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase()); + DatabaseDescriptor.setIdealConsistencyLevel(newCL); + return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 0a4ba19..97f7615 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.cassandra.db.ConsistencyLevel; + public interface StorageProxyMBean { public long getTotalHints(); @@ -63,4 +65,7 @@ public interface StorageProxyMBean public Map<String, List<String>> getSchemaVersions(); public int getNumberOfTables(); + + public String getIdealConsistencyLevel(); + public String setIdealConsistencyLevel(String cl); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 46e4e93..55ca5aa 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -68,6 +68,10 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> { if (responsesUpdater.decrementAndGet(this) == 0) signal(); + //Must be last after all subclass processing + //The two current subclasses both assume logResponseToIdealCLDelegate is called + //here. + logResponseToIdealCLDelegate(m); } protected int ackCount() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index c8f8bc1..b915854 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -81,6 +81,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1", "org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor", "org.apache.cassandra.config.TransparentDataEncryptionOptions", + "org.apache.cassandra.db.ConsistencyLevel", "org.apache.cassandra.dht.IPartitioner", "org.apache.cassandra.exceptions.ConfigurationException", "org.apache.cassandra.exceptions.RequestValidationException", http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f647aaa/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java new file mode 100644 index 0000000..815dbf6 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + + +import java.net.InetAddress; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.schema.KeyspaceParams; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class WriteResponseHandlerTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + static List<InetAddress> targets; + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + // Register peers with expected DC for NetworkTopologyStrategy. + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.255")); + metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.2.0.255")); + + DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch() + { + public String getRack(InetAddress endpoint) + { + return null; + } + + public String getDatacenter(InetAddress endpoint) + { + byte[] address = endpoint.getAddress(); + if (address[1] == 1) + return "datacenter1"; + else + return "datacenter2"; + } + + public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress) + { + return null; + } + + public void sortByProximity(InetAddress address, List<InetAddress> addresses) + { + + } + + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return 0; + } + + public void gossiperStarting() + { + + } + + public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) + { + return false; + } + }); + DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.1.0.1")); + SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar")); + ks = Keyspace.open("Foo"); + cfs = ks.getColumnFamilyStore("Bar"); + targets = ImmutableList.of(InetAddress.getByName("127.1.0.255"), InetAddress.getByName("127.1.0.254"), InetAddress.getByName("127.1.0.253"), + InetAddress.getByName("127.2.0.255"), InetAddress.getByName("127.2.0.254"), InetAddress.getByName("127.2.0.253")); + } + + + @Before + public void resetCounters() + { + ks.metric.writeFailedIdealCL.dec(ks.metric.writeFailedIdealCL.getCount()); + } + + /** + * Validate that failing to achieve ideal CL increments the failure counter + * @throws Throwable + */ + @Test + public void failedIdealCLIncrementsStat() throws Throwable + { + AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM); + + //Succeed in local DC + awr.response(createDummyMessage(0)); + awr.response(createDummyMessage(1)); + awr.response(createDummyMessage(2)); + + //Fail in remote DC + awr.expired(); + awr.expired(); + awr.expired(); + assertEquals(1, ks.metric.writeFailedIdealCL.getCount()); + assertEquals(0, ks.metric.idealCLWriteLatency.totalLatency.getCount()); + } + + /** + * Validate that a successful write at ideal CL logs latency information. Also validates + * DatacenterSyncWriteResponseHandler + * @throws Throwable + */ + @Test + public void idealCLLatencyTracked() throws Throwable + { + long startingCount = ks.metric.idealCLWriteLatency.latency.getCount(); + //Specify query start time in past to ensure minimum latency measurement + AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM, System.nanoTime() - TimeUnit.DAYS.toNanos(1)); + + //dc1 + awr.response(createDummyMessage(0)); + awr.response(createDummyMessage(1)); + //dc2 + awr.response(createDummyMessage(4)); + awr.response(createDummyMessage(5)); + + //Don't need the others + awr.expired(); + awr.expired(); + + assertEquals(0, ks.metric.writeFailedIdealCL.getCount()); + assertTrue( TimeUnit.DAYS.toMicros(1) < ks.metric.idealCLWriteLatency.totalLatency.getCount()); + assertEquals(startingCount + 1, ks.metric.idealCLWriteLatency.latency.getCount()); + } + + /** + * Validate that WriteResponseHandler does the right thing on success. + * @throws Throwable + */ + @Test + public void idealCLWriteResponeHandlerWorks() throws Throwable + { + long startingCount = ks.metric.idealCLWriteLatency.latency.getCount(); + AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.ALL); + + //dc1 + awr.response(createDummyMessage(0)); + awr.response(createDummyMessage(1)); + awr.response(createDummyMessage(2)); + //dc2 + awr.response(createDummyMessage(3)); + awr.response(createDummyMessage(4)); + awr.response(createDummyMessage(5)); + + assertEquals(0, ks.metric.writeFailedIdealCL.getCount()); + assertEquals(startingCount + 1, ks.metric.idealCLWriteLatency.latency.getCount()); + } + + /** + * Validate that DatacenterWriteResponseHandler does the right thing on success. + * @throws Throwable + */ + @Test + public void idealCLDatacenterWriteResponeHandlerWorks() throws Throwable + { + long startingCount = ks.metric.idealCLWriteLatency.latency.getCount(); + AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.ONE, ConsistencyLevel.LOCAL_QUORUM); + + //dc1 + awr.response(createDummyMessage(0)); + awr.response(createDummyMessage(1)); + awr.response(createDummyMessage(2)); + //dc2 + awr.response(createDummyMessage(3)); + awr.response(createDummyMessage(4)); + awr.response(createDummyMessage(5)); + + assertEquals(0, ks.metric.writeFailedIdealCL.getCount()); + assertEquals(startingCount + 1, ks.metric.idealCLWriteLatency.latency.getCount()); + } + + private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal) + { + return createWriteResponseHandler(cl, ideal, System.nanoTime()); + } + + private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime) + { + return ks.getReplicationStrategy().getWriteResponseHandler(targets, ImmutableList.of(), cl, new Runnable() { + public void run() + { + + } + }, WriteType.SIMPLE, queryStartTime, ideal); + } + + private static MessageIn createDummyMessage(int target) + { + return MessageIn.create(targets.get(target), null, null, null, 0, 0L); + } +}