Updated Branches: refs/heads/trunk 25777e1f4 -> 59c996212
Add ability to throttle batchlog replay patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6550 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a7c20ea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a7c20ea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a7c20ea Branch: refs/heads/trunk Commit: 2a7c20ea9111c05964400fe0a30e7b75ff719277 Parents: 7171b7a Author: Aleksey Yeschenko <alek...@apache.org> Authored: Sun Jan 5 03:06:47 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Sun Jan 5 03:06:47 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 9 ++++++++ conf/cassandra.yaml | 4 ++++ .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 +++++ .../apache/cassandra/db/BatchlogManager.java | 23 ++++++++++++++------ 6 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64146c1..5a85977 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Validate SliceRange start and finish lengths (CASSANDRA-6521) * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) + * Add ability to throttle batchlog replay (CASSANDRA-6550) 1.2.13 http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 6293448..214fd05 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -14,6 +14,15 @@ restore snapshots created with the previous major version using the using the provided 'sstableupgrade' tool. +1.2.14 +====== + +Features +-------- + - Batchlog replay can be, and is throttled by default now. + See batchlog_replay_throttle_in_kb setting in cassandra.yaml. + + 1.2.13 ====== http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 712f134..d038cde 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -53,6 +53,10 @@ hinted_handoff_throttle_in_kb: 1024 # cross-dc handoff tends to be slower max_hints_delivery_threads: 2 +# Maximum throttle in KBs per second, total. This will be +# reduced proportionally to the number of nodes in the cluster. +batchlog_replay_throttle_in_kb: 1024 + # The following setting populates the page cache on memtable flush and compaction # WARNING: Enable this setting only when the whole node's data fits in memory. # Defaults to: false http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 292161b..1c19a85 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -151,6 +151,7 @@ public class Config public Double reduce_cache_sizes_at = 1.0; public double reduce_cache_capacity_to = 0.6; public int hinted_handoff_throttle_in_kb = 1024; + public int batchlog_replay_throttle_in_kb = 1024; public int max_hints_delivery_threads = 1; public boolean compaction_preheat_key_cache = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 0db2f85..3ed82f5 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1231,6 +1231,11 @@ public class DatabaseDescriptor return conf.hinted_handoff_throttle_in_kb; } + public static int getBatchlogReplayThrottleInKB() + { + return conf.batchlog_replay_throttle_in_kb; + } + public static int getMaxHintsThread() { return conf.max_hints_delivery_threads; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 5fd55a3..1af4909 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -35,6 +35,7 @@ import javax.management.ObjectName; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +76,7 @@ public class BatchlogManager implements BatchlogManagerMBean private final AtomicBoolean isReplaying = new AtomicBoolean(); private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); - + public void start() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -163,11 +164,16 @@ public class BatchlogManager implements BatchlogManagerMBean logger.debug("Started replayAllFailedBatches"); + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); + RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + try { for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF)) if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT) - replayBatch(row.getUUID("id")); + replayBatch(row.getUUID("id"), rateLimiter); cleanup(); } finally @@ -178,7 +184,7 @@ public class BatchlogManager implements BatchlogManagerMBean logger.debug("Finished replayAllFailedBatches"); } - private void replayBatch(UUID id) + private void replayBatch(UUID id, RateLimiter rateLimiter) { logger.debug("Replaying batch {}", id); @@ -188,7 +194,7 @@ public class BatchlogManager implements BatchlogManagerMBean try { - replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at")); + replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter); } catch (IOException e) { @@ -200,19 +206,19 @@ public class BatchlogManager implements BatchlogManagerMBean totalBatchesReplayed.incrementAndGet(); } - private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException + private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException { DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); int size = in.readInt(); for (int i = 0; i < size; i++) - replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt); + replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter); } /* * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints * when a replica is down or a write request times out. */ - private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException + private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException { int ttl = calculateHintTTL(mutation, writtenAt); if (ttl <= 0) @@ -221,9 +227,12 @@ public class BatchlogManager implements BatchlogManagerMBean Set<InetAddress> liveEndpoints = new HashSet<InetAddress>(); String ks = mutation.getTable(); Token tk = StorageService.getPartitioner().getToken(mutation.key()); + int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION); + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) { + rateLimiter.acquire(mutationSize); if (endpoint.equals(FBUtilities.getBroadcastAddress())) mutation.apply(); else if (FailureDetector.instance.isAlive(endpoint))