Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
        NEWS.txt
        src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95f1b5f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95f1b5f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95f1b5f2

Branch: refs/heads/cassandra-2.0
Commit: 95f1b5f29822fb2893dc7a100fb026729030a70e
Parents: be9a70e 2a7c20e
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Sun Jan 5 03:18:49 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Sun Jan 5 03:18:49 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        | 10 +++++++
 conf/cassandra.yaml                             |  4 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 ++++
 .../apache/cassandra/db/BatchlogManager.java    | 29 +++++++++++++-------
 6 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7946927,5a85977..df564dd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,16 +1,32 @@@
 -1.2.14
 - * Allow executing CREATE statements multiple times (CASSANDRA-6471)
 - * Don't send confusing info with timeouts (CASSANDRA-6491)
 - * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
 - * Don't drop local mutations without a trace (CASSANDRA-6510)
 - * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
 - * Validate SliceRange start and finish lengths (CASSANDRA-6521)
 +2.0.5
 +* Delete unfinished compaction incrementally (CASSANDRA-6086)
 +Merged from 1.2:
   * fsync compression metadata (CASSANDRA-6531)
   * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
+  * Add ability to throttle batchlog replay (CASSANDRA-6550)
  
  
 -1.2.13
 +2.0.4
 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
 + * add StorageService.stopDaemon() (CASSANDRA-4268)
 + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
 + * add client encryption support to sstableloader (CASSANDRA-6378)
 + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
 + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
 + * Fix cleanup ClassCastException (CASSANDRA-6462)
 + * Reduce gossip memory use by interning VersionedValue strings 
(CASSANDRA-6410)
 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
 + * Expose a total memtable size metric for a CF (CASSANDRA-6391)
 + * cqlsh: handle symlinks properly (CASSANDRA-6425)
 + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
 + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
 +Merged from 1.2:
   * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
   * Randomize batchlog candidates selection (CASSANDRA-6481)
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index fdc29ad,214fd05..2e40e9c
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,13 -13,27 +13,23 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
+ 
 -1.2.14
 -======
++2.0.5
++=====
+ 
 -Features
++New features
+ --------
+     - Batchlog replay can be, and is throttled by default now.
+       See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+ 
+ 
 -1.2.13
 -======
 -
 -Upgrading
 ----------
 -    - Nothing specific to this release, but please see 1.2.12 if you are 
upgrading
 -      from a previous version.
 +2.0.3
 +=====
  
 -
 -1.2.12
 -======
 +New features
 +------------
 +    - It's now possible to configure the maximum allowed size of the native
 +      protocol frames (native_transport_max_frame_size_in_mb in the yaml 
file).
  
  Upgrading
  ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 3a7407e,1c19a85..a4e4e92
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -146,10 -145,13 +146,11 @@@ public class Confi
  
      public InternodeCompression internode_compression = 
InternodeCompression.none;
  
 -    public Integer index_interval = 128;
 +    @Deprecated
 +    public Integer index_interval = null;
  
 -    public Double flush_largest_memtables_at = 1.0;
 -    public Double reduce_cache_sizes_at = 1.0;
 -    public double reduce_cache_capacity_to = 0.6;
      public int hinted_handoff_throttle_in_kb = 1024;
+     public int batchlog_replay_throttle_in_kb = 1024;
      public int max_hints_delivery_threads = 1;
      public boolean compaction_preheat_key_cache = true;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 0968df2,1af4909..cfa049a
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -161,11 -164,16 +162,16 @@@ public class BatchlogManager implement
  
          logger.debug("Started replayAllFailedBatches");
  
+         // rate limit is in bytes per second. Uses Double.MAX_VALUE if 
disabled (set to 0 in cassandra.yaml).
+         // max rate is scaled by the number of nodes in the cluster (same as 
for HHOM - see CASSANDRA-5272).
+         int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() 
/ StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
+ 
          try
          {
 -            for (UntypedResultSet.Row row : process("SELECT id, written_at 
FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
 +            for (UntypedResultSet.Row row : process("SELECT id, written_at 
FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF))
                  if (System.currentTimeMillis() > row.getLong("written_at") + 
TIMEOUT)
-                     replayBatch(row.getUUID("id"));
+                     replayBatch(row.getUUID("id"), rateLimiter);
              cleanup();
          }
          finally
@@@ -186,8 -194,7 +192,8 @@@
  
          try
          {
 -            replaySerializedMutations(result.one().getBytes("data"), 
result.one().getLong("written_at"), rateLimiter);
 +            UntypedResultSet.Row batch = result.one();
-             replaySerializedMutations(batch.getBytes("data"), 
batch.getLong("written_at"));
++            replaySerializedMutations(batch.getBytes("data"), 
batch.getLong("written_at"), rateLimiter);
          }
          catch (IOException e)
          {
@@@ -211,15 -218,17 +217,17 @@@
       * We try to deliver the mutations to the replicas ourselves if they are 
alive and only resort to writing hints
       * when a replica is down or a write request times out.
       */
-     private void replaySerializedMutation(RowMutation mutation, long 
writtenAt)
 -    private void replaySerializedMutation(RowMutation mutation, long 
writtenAt, RateLimiter rateLimiter) throws IOException
++    private void replaySerializedMutation(RowMutation mutation, long 
writtenAt, RateLimiter rateLimiter)
      {
          int ttl = calculateHintTTL(mutation, writtenAt);
          if (ttl <= 0)
              return; // the mutation isn't safe to replay.
  
--        Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
 -        String ks = mutation.getTable();
 -        Token tk = StorageService.getPartitioner().getToken(mutation.key());
++        Set<InetAddress> liveEndpoints = new HashSet<>();
 +        String ks = mutation.getKeyspaceName();
 +        Token<?> tk = 
StorageService.getPartitioner().getToken(mutation.key());
+         int mutationSize = (int) 
RowMutation.serializer.serializedSize(mutation, VERSION);
+ 
          for (InetAddress endpoint : 
Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                       
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
          {
@@@ -235,10 -245,10 +244,10 @@@
              attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
      }
  
 -    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, 
Set<InetAddress> endpoints) throws IOException
 +    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, 
Set<InetAddress> endpoints)
      {
          List<WriteResponseHandler> handlers = Lists.newArrayList();
--        final CopyOnWriteArraySet<InetAddress> undelivered = new 
CopyOnWriteArraySet<InetAddress>(endpoints);
++        final CopyOnWriteArraySet<InetAddress> undelivered = new 
CopyOnWriteArraySet<>(endpoints);
          for (final InetAddress ep : endpoints)
          {
              Runnable callback = new Runnable()
@@@ -290,9 -300,9 +299,9 @@@
      // force flush + compaction to reclaim space from the replayed batches
      private void cleanup() throws ExecutionException, InterruptedException
      {
 -        ColumnFamilyStore cfs = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
 +        ColumnFamilyStore cfs = 
Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF);
          cfs.forceBlockingFlush();
--        Collection<Descriptor> descriptors = new ArrayList<Descriptor>();
++        Collection<Descriptor> descriptors = new ArrayList<>();
          for (SSTableReader sstr : cfs.getSSTables())
              descriptors.add(sstr.descriptor);
          if (!descriptors.isEmpty()) // don't pollute the logs if there is 
nothing to compact.

Reply via email to