Log a warning for large batches patch by Lyuben Todorov; reviewed by Benedict Elliott Smith for CASSANDRA-6487
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de720b4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de720b4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de720b4a Branch: refs/heads/cassandra-2.1 Commit: de720b4aa31198076abbd76a53644df341577126 Parents: 364282a Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Apr 22 07:40:32 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Apr 22 07:40:32 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 5 +++ .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 +++ .../apache/cassandra/cql3/QueryProcessor.java | 2 +- .../cql3/statements/BatchStatement.java | 42 +++++++++++++++++++- 6 files changed, 54 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 791586c..fffb2a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.8 + * Log a warning for large batches (CASSANDRA-6487) * Queries on compact tables can return more rows that requested (CASSANDRA-7052) * USING TIMESTAMP for batches does not work (CASSANDRA-7053) Merged from 1.2: http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 2edd498..2de6753 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -429,6 +429,11 @@ tombstone_failure_threshold: 100000 # that wastefully either. column_index_size_in_kb: 64 + +# Log WARN on any batch size exceeding this value. 5kb per batch by default. +# Caution should be taken on increasing the size of this threshold as it can lead to node instability. +batch_size_warn_threshold_in_kb: 5 + # Size limit for rows being compacted in memory. Larger rows will spill # over to disk and use a slower two-pass compaction process. A message # will be logged specifying the row key. http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 5317fb8..7a3185a 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -120,6 +120,7 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; + public Integer batch_size_warn_threshold_in_kb = 5; public Integer in_memory_compaction_limit_in_mb = 64; public Integer concurrent_compactors = FBUtilities.getAvailableProcessors(); public volatile Integer compaction_throughput_mb_per_sec = 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9e06601..6417524 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -658,6 +658,11 @@ public class DatabaseDescriptor return conf.column_index_size_in_kb * 1024; } + public static int getBatchSizeWarnThreshold() + { + return conf.batch_size_warn_threshold_in_kb * 1024; + } + public static Collection<String> getInitialTokens() { return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index ab0ea40..15ee59f 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -30,13 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.*; -import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.thrift.ThriftClientState; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.SemanticVersion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/de720b4a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 7f26341..8e61ae5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -20,8 +20,12 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.collect.Iterables; +import com.google.common.base.Function; +import com.google.common.collect.*; +import org.apache.cassandra.config.DatabaseDescriptor; import org.github.jamm.MemoryMeter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.*; @@ -47,6 +51,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache private final List<ModificationStatement> statements; private final Attributes attrs; private final boolean hasConditions; + private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class); /** * Creates a new BatchStatement from a list of statements and a @@ -177,6 +182,29 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache } } + /** + * Checks batch size to ensure threshold is met. If not, a warning is logged. + * @param cfs ColumnFamilies that will store the batch's mutations. + */ + private void verifyBatchSize(Iterable<ColumnFamily> cfs) + { + long size = 0; + long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold(); + + for (ColumnFamily cf : cfs) + size += cf.dataSize(); + + if (size > warnThreshold) + { + Set<String> ksCfPairs = new HashSet<>(); + for (ColumnFamily cf : cfs) + ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName); + + String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}."; + logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold); + } + } + public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { if (options.getConsistency() == null) @@ -207,10 +235,21 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { + // Extract each collection of cfs from it's IMutation and then lazily concatenate all of them into a single Iterable. + Iterable<ColumnFamily> cfs = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<ColumnFamily>>() + { + public Collection<ColumnFamily> apply(IMutation im) + { + return im.getColumnFamilies(); + } + })); + verifyBatchSize(cfs); + boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); } + private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now) throws RequestExecutionException, RequestValidationException { @@ -259,6 +298,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache } } + verifyBatchSize(Collections.singleton(updates)); ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl); return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true)); }