Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440824c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440824c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440824c1

Branch: refs/heads/trunk
Commit: 440824c1a60a344bc3e8a5ad35ae2fac879bd61d
Parents: 014d328 e916dff
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Fri Oct 17 03:40:17 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Fri Oct 17 03:40:17 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 64 ++++++++++----------
 .../cassandra/service/StorageService.java       | 25 ++++++--
 .../cassandra/db/BatchlogManagerTest.java       |  8 +--
 4 files changed, 57 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b40e14b,73aaab0..d7a8904
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,89 -1,5 +1,90 @@@
 -2.0.11:
 +2.1.1
 + * Fix IllegalArgumentException when a list of IN values containing tuples
 +   is passed as a single arg to a prepared statement with the v1 or v2
 +   protocol (CASSANDRA-8062)
 + * Fix ClassCastException in DISTINCT query on static columns with
 +   query paging (CASSANDRA-8108)
 + * Fix NPE on null nested UDT inside a set (CASSANDRA-8105)
 + * Fix exception when querying secondary index on set items or map keys
 +   when some clustering columns are specified (CASSANDRA-8073)
 + * Send proper error response when there is an error during native
 +   protocol message decode (CASSANDRA-8118)
 + * Gossip should ignore generation numbers too far in the future 
(CASSANDRA-8113)
 + * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104)
 + * Fix high memory use due to tracking reads on incrementally opened sstable
 +   readers (CASSANDRA-8066)
 + * Fix EXECUTE request with skipMetadata=false returning no metadata
 +   (CASSANDRA-8054)
 + * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776)
 + * Shutdown JVM on OOM (CASSANDRA-7507)
 + * Upgrade netty version and enable epoll event loop (CASSANDRA-7761)
 + * Don't duplicate sstables smaller than split size when using
 +   the sstablesplitter tool (CASSANDRA-7616)
 + * Avoid re-parsing already prepared statements (CASSANDRA-7923)
 + * Fix some Thrift slice deletions and updates of COMPACT STORAGE
 +   tables with some clustering columns omitted (CASSANDRA-7990)
 + * Fix filtering for CONTAINS on sets (CASSANDRA-8033)
 + * Properly track added size (CASSANDRA-7239)
 + * Allow compilation in java 8 (CASSANDRA-7208)
 + * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013)
 + * Release references to overlapping sstables during compaction 
(CASSANDRA-7819)
 + * Send notification when opening compaction results early (CASSANDRA-8034)
 + * Make native server start block until properly bound (CASSANDRA-7885)
 + * (cqlsh) Fix IPv6 support (CASSANDRA-7988)
 + * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939)
 + * Make sstablerepairedset take a list of files (CASSANDRA-7995)
 + * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972)
 + * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891)
 + * Fix resource leak in event of corrupt sstable
 + * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
 + * Provide visibility into prepared statements churn (CASSANDRA-7921, 
CASSANDRA-7930)
 + * Invalidate prepared statements when their keyspace or table is
 +   dropped (CASSANDRA-7566)
 + * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
 + * Fix saving caches when a table is dropped (CASSANDRA-7784)
 + * Add better error checking of new stress profile (CASSANDRA-7716)
 + * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom 
(CASSANDRA-7934)
 + * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069)
 + * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658)
 + * GCInspector more closely tracks GC; cassandra-stress and nodetool report 
it (CASSANDRA-7916)
 + * nodetool won't output bogus ownership info without a keyspace 
(CASSANDRA-7173)
 + * Add human readable option to nodetool commands (CASSANDRA-5433)
 + * Don't try to set repairedAt on old sstables (CASSANDRA-7913)
 + * Add metrics for tracking PreparedStatement use (CASSANDRA-7719)
 + * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
 + * (cqlsh) Support for query paging (CASSANDRA-7514)
 + * (cqlsh) Show progress of COPY operations (CASSANDRA-7789)
 + * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
 + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
 + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
 + * (cqlsh) Display the current logged-in user (CASSANDRA-7785)
 + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815)
 + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE
 +   output (CASSANDRA-7659)
 + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671)
 + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
 + * Support list index operations with conditions (CASSANDRA-7499)
 + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
 + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
 + * (cqlsh) Error when tracing query (CASSANDRA-7613)
 + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
 + * SSTableExport uses correct validator to create string representation of 
partition
 +   keys (CASSANDRA-7498)
 + * Avoid NPEs when receiving type changes for an unknown keyspace 
(CASSANDRA-7689)
 + * Add support for custom 2i validation (CASSANDRA-7575)
 + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
 + * Add listen_interface and rpc_interface options (CASSANDRA-7417)
 + * Improve schema merge performance (CASSANDRA-7444)
 + * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
 + * Optimise NativeCell comparisons (CASSANDRA-6755)
 + * Configurable client timeout for cqlsh (CASSANDRA-7516)
 + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
 + * Make repair -pr work with -local (CASSANDRA-7450)
 + * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007)
 + * Fix snapshot repair error on indexed tables (CASSANDRA-8020)
 + * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909)
 +Merged from 2.0:
+  * Force batchlog replay before decommissioning a node (CASSANDRA-7446)
   * Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
   * Fix duplicate results in DISTINCT queries on static columns with query
     paging (CASSANDRA-8108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 7f8d355,48f4c3c..18d9a17
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -38,9 -38,11 +37,8 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
--import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.marshal.LongType;
 -import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.marshal.UUIDType;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
@@@ -117,12 -119,13 +120,13 @@@ public class BatchlogManager implement
                  replayAllFailedBatches();
              }
          };
-         batchlogTasks.execute(runnable);
+         // If a replay is already in progress this request will be executed 
after it completes.
+         return batchlogTasks.submit(runnable);
      }
  
 -    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> 
mutations, UUID uuid)
 +    public static Mutation getBatchlogMutationFor(Collection<Mutation> 
mutations, UUID uuid, int version)
      {
 -        return getBatchlogMutationFor(mutations, uuid, 
FBUtilities.timestampMicros());
 +        return getBatchlogMutationFor(mutations, uuid, version, 
FBUtilities.timestampMicros());
      }
  
      @VisibleForTesting
@@@ -151,15 -158,11 +155,11 @@@
              throw new AssertionError(); // cannot happen.
          }
  
 -        return ByteBuffer.wrap(bos.toByteArray());
 +        return buf.asByteBuffer();
      }
  
-     @VisibleForTesting
-     void replayAllFailedBatches() throws ExecutionException, 
InterruptedException
+     private void replayAllFailedBatches() throws ExecutionException, 
InterruptedException
      {
-         if (!isReplaying.compareAndSet(false, true))
-             return;
- 
          logger.debug("Started replayAllFailedBatches");
  
          // rate limit is in bytes per second. Uses Double.MAX_VALUE if 
disabled (set to 0 in cassandra.yaml).
@@@ -167,34 -170,27 +167,27 @@@
          int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() 
/ StorageService.instance.getTokenMetadata().getAllEndpoints().size();
          RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
  
-         try
-         {
-             UntypedResultSet page = executeInternal(String.format("SELECT id, 
data, written_at, version FROM %s.%s LIMIT %d",
-                                                                   
Keyspace.SYSTEM_KS,
-                                                                   
SystemKeyspace.BATCHLOG_CF,
-                                                                   PAGE_SIZE));
- 
-             while (!page.isEmpty())
-             {
-                 UUID id = processBatchlogPage(page, rateLimiter);
 -        UntypedResultSet page = process("SELECT id, data, written_at, version 
FROM %s.%s LIMIT %d",
 -                                        Keyspace.SYSTEM_KS,
 -                                        SystemKeyspace.BATCHLOG_CF,
 -                                        PAGE_SIZE);
++        UntypedResultSet page = executeInternal(String.format("SELECT id, 
data, written_at, version FROM %s.%s LIMIT %d",
++                                                              
Keyspace.SYSTEM_KS,
++                                                              
SystemKeyspace.BATCHLOG_CF,
++                                                              PAGE_SIZE));
  
-                 if (page.size() < PAGE_SIZE)
-                     break; // we've exhausted the batchlog, next query would 
be empty.
+         while (!page.isEmpty())
+         {
+             UUID id = processBatchlogPage(page, rateLimiter);
  
-                 page = executeInternal(String.format("SELECT id, data, 
written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
-                                                      Keyspace.SYSTEM_KS,
-                                                      
SystemKeyspace.BATCHLOG_CF,
-                                                      PAGE_SIZE), 
-                                        id);
-             }
+             if (page.size() < PAGE_SIZE)
+                 break; // we've exhausted the batchlog, next query would be 
empty.
  
-             cleanup();
-         }
-         finally
-         {
-             isReplaying.set(false);
 -            page = process("SELECT id, data, written_at, version FROM %s.%s 
WHERE token(id) > token(%s) LIMIT %d",
 -                           Keyspace.SYSTEM_KS,
 -                           SystemKeyspace.BATCHLOG_CF,
 -                           id,
 -                           PAGE_SIZE);
++            page = executeInternal(String.format("SELECT id, data, 
written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
++                                                 Keyspace.SYSTEM_KS,
++                                                 SystemKeyspace.BATCHLOG_CF,
++                                                 PAGE_SIZE),
++                                   id);
          }
  
+         cleanup();
+ 
          logger.debug("Finished replayAllFailedBatches");
      }
  
@@@ -215,172 -202,89 +208,177 @@@
          {
              id = row.getUUID("id");
              long writtenAt = row.getLong("written_at");
 -            int version = row.has("version") ? row.getInt("version") : 
MessagingService.VERSION_12;
              // enough time for the actual write + batchlog entry mutation 
delivery (two separate requests).
-             long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // 
enough time for the actual write + BM removal mutation
+             long timeout = getBatchlogTimeout();
              if (System.currentTimeMillis() < writtenAt + timeout)
                  continue; // not ready to replay yet, might still get a 
deletion.
 -            replayBatch(id, row.getBytes("data"), writtenAt, version, 
rateLimiter);
 +
 +            int version = row.has("version") ? row.getInt("version") : 
MessagingService.VERSION_12;
 +            Batch batch = new Batch(id, writtenAt, row.getBytes("data"), 
version);
 +            try
 +            {
 +                if (batch.replay(rateLimiter) > 0)
 +                {
 +                    batches.add(batch);
 +                }
 +                else
 +                {
 +                    deleteBatch(id); // no write mutations were sent (either 
expired or all CFs involved truncated).
 +                    totalBatchesReplayed.incrementAndGet();
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Skipped batch replay of {} due to {}", id, e);
 +                deleteBatch(id);
 +            }
          }
 +
 +        // now waiting for all batches to complete their processing
 +        // schedule hints for timed out deliveries
 +        for (Batch batch : batches)
 +        {
 +            batch.finish();
 +            deleteBatch(batch.id);
 +        }
 +
 +        totalBatchesReplayed.addAndGet(batches.size());
 +
          return id;
      }
  
+     public long getBatchlogTimeout()
+     {
+         return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time 
for the actual write + BM removal mutation
+     }
+ 
 -    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int 
version, RateLimiter rateLimiter)
 +    private static class Batch
      {
 -        logger.debug("Replaying batch {}", id);
 +        private final UUID id;
 +        private final long writtenAt;
 +        private final ByteBuffer data;
 +        private final int version;
  
 -        try
 +        private List<ReplayWriteResponseHandler> replayHandlers;
 +
 +        public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
          {
 -            replaySerializedMutations(data, writtenAt, version, rateLimiter);
 +            this.id = id;
 +            this.writtenAt = writtenAt;
 +            this.data = data;
 +            this.version = version;
          }
 -        catch (IOException e)
 +
 +        public int replay(RateLimiter rateLimiter) throws IOException
          {
 -            logger.warn("Skipped batch replay of {} due to {}", id, e);
 -        }
 +            logger.debug("Replaying batch {}", id);
  
 -        deleteBatch(id);
 +            List<Mutation> mutations = replayingMutations();
  
 -        totalBatchesReplayed.incrementAndGet();
 -    }
 +            if (mutations.isEmpty())
 +                return 0;
  
 -    private void deleteBatch(UUID id)
 -    {
 -        RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, 
UUIDType.instance.decompose(id));
 -        mutation.delete(SystemKeyspace.BATCHLOG_CF, 
FBUtilities.timestampMicros());
 -        mutation.apply();
 -    }
 +            int ttl = calculateHintTTL(mutations);
 +            if (ttl <= 0)
 +                return 0;
  
 -    private void replaySerializedMutations(ByteBuffer data, long writtenAt, 
int version, RateLimiter rateLimiter) throws IOException
 -    {
 -        DataInputStream in = new 
DataInputStream(ByteBufferUtil.inputStream(data));
 -        int size = in.readInt();
 -        List<RowMutation> mutations = new ArrayList<>(size);
 +            replayHandlers = sendReplays(mutations, writtenAt, ttl);
 +
 +            rateLimiter.acquire(data.remaining()); // acquire afterwards, to 
not mess up ttl calculation.
 +
 +            return replayHandlers.size();
 +        }
 +
 +        public void finish()
 +        {
 +            for (int i = 0; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler handler = replayHandlers.get(i);
 +                try
 +                {
 +                    handler.get();
 +                }
 +                catch (WriteTimeoutException e)
 +                {
 +                    logger.debug("Timed out replaying a batched mutation to a 
node, will write a hint");
 +                    // writing hints for the rest to hints, starting from i
 +                    writeHintsForUndeliveredEndpoints(i);
 +                    return;
 +                }
 +            }
 +        }
  
 -        for (int i = 0; i < size; i++)
 +        private List<Mutation> replayingMutations() throws IOException
          {
 -            RowMutation mutation = RowMutation.serializer.deserialize(in, 
version);
 +            DataInputStream in = new 
DataInputStream(ByteBufferUtil.inputStream(data));
 +            int size = in.readInt();
 +            List<Mutation> mutations = new ArrayList<>(size);
 +            for (int i = 0; i < size; i++)
 +            {
 +                Mutation mutation = Mutation.serializer.deserialize(in, 
version);
  
 -            // Remove CFs that have been truncated since. writtenAt and 
SystemTable#getTruncatedAt() both return millis.
 -            // We don't abort the replay entirely b/c this can be considered 
a succes (truncated is same as delivered then
 -            // truncated.
 -            for (UUID cfId : mutation.getColumnFamilyIds())
 -                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
 -                    mutation = mutation.without(cfId);
 +                // Remove CFs that have been truncated since. writtenAt and 
SystemTable#getTruncatedAt() both return millis.
 +                // We don't abort the replay entirely b/c this can be 
considered a success (truncated is same as delivered then
 +                // truncated.
 +                for (UUID cfId : mutation.getColumnFamilyIds())
 +                    if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
 +                        mutation = mutation.without(cfId);
  
 -            if (!mutation.isEmpty())
 -                mutations.add(mutation);
 +                if (!mutation.isEmpty())
 +                    mutations.add(mutation);
 +            }
 +            return mutations;
          }
  
 -        if (!mutations.isEmpty())
 -            replayMutations(mutations, writtenAt, version, rateLimiter);
 -    }
 +        private void writeHintsForUndeliveredEndpoints(int startFrom)
 +        {
 +            try
 +            {
 +                // Here we deserialize mutations 2nd time from byte buffer.
 +                // but this is ok, because timeout on batch direct delivery 
is rare
 +                // (it can happen only several seconds until node is marked 
dead)
 +                // so trading some cpu to keep less objects
 +                List<Mutation> replayingMutations = replayingMutations();
 +                for (int i = startFrom; i < replayHandlers.size(); i++)
 +                {
 +                    Mutation undeliveredMutation = replayingMutations.get(i);
 +                    int ttl = calculateHintTTL(replayingMutations);
 +                    ReplayWriteResponseHandler handler = 
replayHandlers.get(i);
  
 -    /*
 -     * We try to deliver the mutations to the replicas ourselves if they are 
alive and only resort to writing hints
 -     * when a replica is down or a write request times out.
 -     */
 -    private void replayMutations(List<RowMutation> mutations, long writtenAt, 
int version, RateLimiter rateLimiter) throws IOException
 -    {
 -        int ttl = calculateHintTTL(mutations, writtenAt);
 -        if (ttl <= 0)
 -            return; // this batchlog entry has 'expired'
 -
 -        List<InetAddress> liveEndpoints = new ArrayList<>();
 -        List<InetAddress> hintEndpoints = new ArrayList<>();
 -        
 -        for (RowMutation mutation : mutations)
 +                    if (ttl > 0 && handler != null)
 +                        for (InetAddress endpoint : handler.undelivered)
 +                            
StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, 
endpoint);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                logger.error("Cannot schedule hints for undelivered batch", 
e);
 +            }
 +        }
 +
 +        private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> 
mutations, long writtenAt, int ttl)
          {
 +            List<ReplayWriteResponseHandler> handlers = new 
ArrayList<>(mutations.size());
 +            for (Mutation mutation : mutations)
 +            {
 +                ReplayWriteResponseHandler handler = 
sendSingleReplayMutation(mutation, writtenAt, ttl);
 +                if (handler != null)
 +                    handlers.add(handler);
 +            }
 +            return handlers;
 +        }
 +
 +        /**
 +         * We try to deliver the mutations to the replicas ourselves if they 
are alive and only resort to writing hints
 +         * when a replica is down or a write request times out.
 +         *
 +         * @return direct delivery handler to wait on or null, if no live 
nodes found
 +         */
 +        private ReplayWriteResponseHandler sendSingleReplayMutation(final 
Mutation mutation, long writtenAt, int ttl)
 +        {
 +            Set<InetAddress> liveEndpoints = new HashSet<>();
              String ks = mutation.getKeyspaceName();
 -            Token tk = 
StorageService.getPartitioner().getToken(mutation.key());
 -            int mutationSize = (int) 
RowMutation.serializer.serializedSize(mutation, version);
 +            Token<?> tk = 
StorageService.getPartitioner().getToken(mutation.key());
  
              for (InetAddress endpoint : 
Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                           
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------

Reply via email to