Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: NEWS.txt src/java/org/apache/cassandra/db/BatchlogManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95f1b5f2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95f1b5f2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95f1b5f2 Branch: refs/heads/cassandra-2.0 Commit: 95f1b5f29822fb2893dc7a100fb026729030a70e Parents: be9a70e 2a7c20e Author: Aleksey Yeschenko <alek...@apache.org> Authored: Sun Jan 5 03:18:49 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Sun Jan 5 03:18:49 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 10 +++++++ conf/cassandra.yaml | 4 +++ .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 ++++ .../apache/cassandra/db/BatchlogManager.java | 29 +++++++++++++------- 6 files changed, 40 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7946927,5a85977..df564dd --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,31 -1,16 +1,32 @@@ -1.2.14 - * Allow executing CREATE statements multiple times (CASSANDRA-6471) - * Don't send confusing info with timeouts (CASSANDRA-6491) - * Don't resubmit counter mutation runnables internally (CASSANDRA-6427) - * Don't drop local mutations without a trace (CASSANDRA-6510) - * Don't allow null max_hint_window_in_ms (CASSANDRA-6419) - * Validate SliceRange start and finish lengths (CASSANDRA-6521) +2.0.5 +* Delete unfinished compaction incrementally (CASSANDRA-6086) +Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) + * Add ability to throttle batchlog replay (CASSANDRA-6550) -1.2.13 +2.0.4 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418) + * add StorageService.stopDaemon() (CASSANDRA-4268) + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701) + * add client encryption support to sstableloader (CASSANDRA-6378) + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468) + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496) + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483) + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008) + * Fix cleanup ClassCastException (CASSANDRA-6462) + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410) + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218) + * Fix divide-by-zero in PCI (CASSANDRA-6403) + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284) + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395) + * Expose a total memtable size metric for a CF (CASSANDRA-6391) + * cqlsh: handle symlinks properly (CASSANDRA-6425) + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464) + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447) + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527) +Merged from 1.2: * Improved error message on bad properties in DDL queries (CASSANDRA-6453) * Randomize batchlog candidates selection (CASSANDRA-6481) * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485) http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index fdc29ad,214fd05..2e40e9c --- a/NEWS.txt +++ b/NEWS.txt @@@ -13,13 -13,27 +13,23 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. + -1.2.14 -====== ++2.0.5 ++===== + -Features ++New features + -------- + - Batchlog replay can be, and is throttled by default now. + See batchlog_replay_throttle_in_kb setting in cassandra.yaml. + + -1.2.13 -====== - -Upgrading ---------- - - Nothing specific to this release, but please see 1.2.12 if you are upgrading - from a previous version. +2.0.3 +===== - -1.2.12 -====== +New features +------------ + - It's now possible to configure the maximum allowed size of the native + protocol frames (native_transport_max_frame_size_in_mb in the yaml file). Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index 3a7407e,1c19a85..a4e4e92 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -146,10 -145,13 +146,11 @@@ public class Confi public InternodeCompression internode_compression = InternodeCompression.none; - public Integer index_interval = 128; + @Deprecated + public Integer index_interval = null; - public Double flush_largest_memtables_at = 1.0; - public Double reduce_cache_sizes_at = 1.0; - public double reduce_cache_capacity_to = 0.6; public int hinted_handoff_throttle_in_kb = 1024; + public int batchlog_replay_throttle_in_kb = 1024; public int max_hints_delivery_threads = 1; public boolean compaction_preheat_key_cache = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java index 0968df2,1af4909..cfa049a --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@@ -161,11 -164,16 +162,16 @@@ public class BatchlogManager implement logger.debug("Started replayAllFailedBatches"); + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); + RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + try { - for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF)) + for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF)) if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT) - replayBatch(row.getUUID("id")); + replayBatch(row.getUUID("id"), rateLimiter); cleanup(); } finally @@@ -186,8 -194,7 +192,8 @@@ try { - replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter); + UntypedResultSet.Row batch = result.one(); - replaySerializedMutations(batch.getBytes("data"), batch.getLong("written_at")); ++ replaySerializedMutations(batch.getBytes("data"), batch.getLong("written_at"), rateLimiter); } catch (IOException e) { @@@ -211,15 -218,17 +217,17 @@@ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints * when a replica is down or a write request times out. */ - private void replaySerializedMutation(RowMutation mutation, long writtenAt) - private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException ++ private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) { int ttl = calculateHintTTL(mutation, writtenAt); if (ttl <= 0) return; // the mutation isn't safe to replay. -- Set<InetAddress> liveEndpoints = new HashSet<InetAddress>(); - String ks = mutation.getTable(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); ++ Set<InetAddress> liveEndpoints = new HashSet<>(); + String ks = mutation.getKeyspaceName(); + Token<?> tk = StorageService.getPartitioner().getToken(mutation.key()); + int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION); + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) { @@@ -235,10 -245,10 +244,10 @@@ attemptDirectDelivery(mutation, writtenAt, liveEndpoints); } - private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException + private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) { List<WriteResponseHandler> handlers = Lists.newArrayList(); -- final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints); ++ final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints); for (final InetAddress ep : endpoints) { Runnable callback = new Runnable() @@@ -290,9 -300,9 +299,9 @@@ // force flush + compaction to reclaim space from the replayed batches private void cleanup() throws ExecutionException, InterruptedException { - ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); + ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF); cfs.forceBlockingFlush(); -- Collection<Descriptor> descriptors = new ArrayList<Descriptor>(); ++ Collection<Descriptor> descriptors = new ArrayList<>(); for (SSTableReader sstr : cfs.getSSTables()) descriptors.add(sstr.descriptor); if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.