Merge branch 'cassandra-2.1' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90476352
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90476352
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90476352

Branch: refs/heads/trunk
Commit: 90476352a56568fc47418905dd31a16eb00a7981
Parents: 0656924 1f6bf36
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Thu Apr 30 19:04:35 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Thu Apr 30 19:04:35 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../db/commitlog/CommitLogReplayer.java         | 49 +++++++++++++++++---
 .../db/commitlog/CommitLogStressTest.java       |  6 +--
 3 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4af4f67,a01e8ed..0a69930
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,104 -1,5 +1,105 @@@
 +3.0
 + * Disable memory mapping of hsperfdata file for JVM statistics 
(CASSANDRA-9242)
 + * Add pre-startup checks to detect potential incompatibilities 
(CASSANDRA-8049)
 + * Distinguish between null and unset in protocol v4 (CASSANDRA-7304)
 + * Add user/role permissions for user-defined functions (CASSANDRA-7557)
 + * Allow cassandra config to be updated to restart daemon without unloading 
classes (CASSANDRA-9046)
 + * Don't initialize compaction writer before checking if iter is empty 
(CASSANDRA-9117)
 + * Remove line number generation from default logback.xml
 + * Don't execute any functions at prepare-time (CASSANDRA-9037)
 + * Share file handles between all instances of a SegmentedFile 
(CASSANDRA-8893)
 + * Make it possible to major compact LCS (CASSANDRA-7272)
 + * Make FunctionExecutionException extend RequestExecutionException
 +   (CASSANDRA-9055)
 + * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), 
fromJson()
 +   functions (CASSANDRA-7970)
 + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920)
 + * Constrain internode message buffer sizes, and improve IO class hierarchy 
(CASSANDRA-8670) 
 + * New tool added to validate all sstables in a node (CASSANDRA-5791)
 + * Push notification when tracing completes for an operation (CASSANDRA-7807)
 + * Delay "node up" and "node added" notifications until native protocol 
server is started (CASSANDRA-8236)
 + * Compressed Commit Log (CASSANDRA-6809)
 + * Optimise IntervalTree (CASSANDRA-8988)
 + * Add a key-value payload for third party usage (CASSANDRA-8553, 9212)
 + * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)
 + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789)
 + * Add WriteFailureException to native protocol, notify coordinator of
 +   write failures (CASSANDRA-8592)
 + * Convert SequentialWriter to nio (CASSANDRA-8709)
 + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 
8761, 8850)
 + * Record client ip address in tracing sessions (CASSANDRA-8162)
 + * Indicate partition key columns in response metadata for prepared
 +   statements (CASSANDRA-7660)
 + * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759)
 + * Avoid memory allocation when searching index summary (CASSANDRA-8793)
 + * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730)
 + * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836)
 + * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714)
 + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
 + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
 + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order and ignore duplicate values in partition key IN 
restrictions (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup 
(CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any partition key column (CASSANDRA-7855)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 
7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
 + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
 + * Generalize progress reporting (CASSANDRA-8901)
 + * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942)
 + * Allow scrub for secondary index (CASSANDRA-5174)
 + * Save repair data to system table (CASSANDRA-5839)
 + * fix nodetool names that reference column families (CASSANDRA-8872)
 +
  2.1.6
+  * Fix PITR commitlog replay (CASSANDRA-9195)
   * GCInspector logs very different times (CASSANDRA-9124)
   * Fix deleting from an empty list (CASSANDRA-9198)
   * Update tuple and collection types that use a user-defined type when that 
UDT

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 52d1251,57f4b90..f6d1cc4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -41,25 -32,15 +41,26 @@@ import org.apache.commons.lang3.StringU
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import com.github.tjake.ICRC32;
 +
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
+ import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.io.util.ByteBufferDataInput;
  import org.apache.cassandra.io.util.FastByteArrayInputStream;
 +import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.RandomAccessReader;
 -import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.CRC32Factory;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.WrappedRunnable;
  import org.cliffc.high_scale_lib.NonBlockingHashSet;
  
  public class CommitLogReplayer
@@@ -74,11 -55,12 +75,13 @@@
      private final AtomicInteger replayedCount;
      private final Map<UUID, ReplayPosition> cfPositions;
      private final ReplayPosition globalPosition;
 -    private final PureJavaCrc32 checksum;
 +    private final ICRC32 checksum;
      private byte[] buffer;
 +    private byte[] uncompressedBuffer;
  
-     CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, 
ReplayPosition> cfPositions)
+     private final ReplayFilter replayFilter;
+ 
 -    public CommitLogReplayer()
++    CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, 
ReplayPosition> cfPositions, ReplayFilter replayFilter)
      {
          this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
          this.futures = new ArrayList<Future<?>>();
@@@ -87,16 -68,13 +90,18 @@@
          this.invalidMutations = new HashMap<UUID, AtomicInteger>();
          // count the number of replayed mutation. We don't really care about 
atomicity, but we need it to be a reference.
          this.replayedCount = new AtomicInteger();
 -        this.checksum = new PureJavaCrc32();
 -
 -        replayFilter = ReplayFilter.create();
 +        this.checksum = CRC32Factory.instance.create();
 +        this.cfPositions = cfPositions;
 +        this.globalPosition = globalPosition;
++        this.replayFilter = replayFilter;
 +    }
  
 +    public static CommitLogReplayer create()
 +    {
          // compute per-CF and global replay positions
 -        cfPositions = new HashMap<UUID, ReplayPosition>();
 +        Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, 
ReplayPosition>();
          Ordering<ReplayPosition> replayPositionOrdering = 
Ordering.from(ReplayPosition.comparator);
++        ReplayFilter replayFilter = ReplayFilter.create();
          for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
          {
              // it's important to call RP.gRP per-cf, before aggregating all 
the positions w/ the Ordering.min call
@@@ -104,16 -82,35 +109,36 @@@
              // list (otherwise we'll just start replay from the first flush 
position that we do have, which is not correct).
              ReplayPosition rp = 
ReplayPosition.getReplayPosition(cfs.getSSTables());
  
--            // but, if we've truncted the cf in question, then we need to 
need to start replay after the truncation
++            // but, if we've truncated the cf in question, then we need to 
need to start replay after the truncation
              ReplayPosition truncatedAt = 
SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
              if (truncatedAt != null)
-                 rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
+             {
+                 // Point in time restore is taken to mean that the tables 
need to be recovered even if they were
+                 // deleted at a later point in time. Any truncation record 
after that point must thus be cleared prior
+                 // to recovery (CASSANDRA-9195).
+                 long restoreTime = 
CommitLog.instance.archiver.restorePointInTime;
+                 long truncatedTime = 
SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
+                 if (truncatedTime > restoreTime)
+                 {
+                     if (replayFilter.includes(cfs.metadata))
+                     {
+                         logger.info("Restore point in time is before latest 
truncation of table {}.{}. Clearing truncation record.",
+                                     cfs.metadata.ksName,
+                                     cfs.metadata.cfName);
+                         
SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
+                     }
+                 }
+                 else
+                 {
+                     rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
+                 }
+             }
  
              cfPositions.put(cfs.metadata.cfId, rp);
          }
 -        globalPosition = replayPositionOrdering.min(cfPositions.values());
 +        ReplayPosition globalPosition = 
replayPositionOrdering.min(cfPositions.values());
          logger.debug("Global replay position is {} from columnfamilies {}", 
globalPosition, FBUtilities.toString(cfPositions));
-         return new CommitLogReplayer(globalPosition, cfPositions);
++        return new CommitLogReplayer(globalPosition, cfPositions, 
replayFilter);
      }
  
      public void recover(File[] clogs) throws IOException
@@@ -233,18 -264,22 +271,17 @@@
  
      public void recover(File file) throws IOException
      {
-         final ReplayFilter replayFilter = ReplayFilter.create();
 -        logger.info("Replaying {}", file.getPath());
          CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
 -        final long segmentId = desc.id;
 -        logger.info("Replaying {} (CL version {}, messaging version {})",
 -                    file.getPath(),
 -                    desc.version,
 -                    desc.getMessagingVersion());
          RandomAccessReader reader = RandomAccessReader.open(new 
File(file.getAbsolutePath()));
 -
          try
          {
 -            assert reader.length() <= Integer.MAX_VALUE;
 -            int offset = getStartOffset(segmentId, desc.version);
 -            if (offset < 0)
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
              {
 -                logger.debug("skipping replay of fully-flushed {}", file);
 +                if (logAndCheckIfShouldSkip(file, desc))
 +                    return;
 +                if (globalPosition.segment == desc.id)
 +                    reader.seek(globalPosition.position);
 +                replaySyncSection(reader, -1, desc, replayFilter);
                  return;
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 758d2f9,0000000..644e2c2
mode 100644,000000..100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -1,412 -1,0 +1,412 @@@
 +package org.apache.cassandra.db.commitlog;
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +
 +
 +import java.io.DataInputStream;
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import junit.framework.Assert;
 +
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.Config.CommitLogSync;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.Cell;
 +import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.db.ColumnSerializer;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.io.util.FastByteArrayInputStream;
 +
 +public class CommitLogStressTest
 +{
 +
 +    public static ByteBuffer dataSource;
 +    
 +    public static int NUM_THREADS = 4 * 
Runtime.getRuntime().availableProcessors() - 1;
 +
 +    public static int numCells = 1;
 +
 +    public static int cellSize = 1024;
 +    
 +    public static int rateLimit = 0;
 +    
 +    public static int runTimeMs = 10000;
 +    
 +    public static String location = DatabaseDescriptor.getCommitLogLocation() 
+ "/stress";
 +    
 +    public static int hash(int hash, ByteBuffer bytes)
 +    {
 +        int shift = 0;
 +        for (int i=0; i<bytes.limit(); i++) {
 +            hash += (bytes.get(i) & 0xFF) << shift;
 +            shift = (shift + 8) & 0x1F;
 +        }
 +        return hash;
 +    }
 +    
 +    public static void main(String[] args) throws Exception {
 +        try {
 +            if (args.length >= 1) {
 +                NUM_THREADS = Integer.parseInt(args[0]);
 +                System.out.println("Setting num threads to: " + NUM_THREADS);
 +            }
 +    
 +            if (args.length >= 2) {
 +                numCells = Integer.parseInt(args[1]);
 +                System.out.println("Setting num cells to: " + numCells);
 +            }
 +    
 +            if (args.length >= 3) {
 +                cellSize = Integer.parseInt(args[1]);
 +                System.out.println("Setting cell size to: " + cellSize + " be 
aware the source corpus may be small");
 +            }
 +    
 +            if (args.length >= 4) {
 +                rateLimit = Integer.parseInt(args[1]);
 +                System.out.println("Setting per thread rate limit to: " + 
rateLimit);
 +            }
 +            initialize();
 +            
 +            CommitLogStressTest tester = new CommitLogStressTest();
 +            tester.testFixedSize();
 +        }
 +        catch (Exception e)
 +        {
 +            e.printStackTrace(System.err);
 +        }
 +        finally {
 +            System.exit(0);
 +        }
 +    }
 +    
 +    boolean failed = false;
 +    volatile boolean stop = false;
 +    boolean randomSize = false;
 +    boolean discardedRun = false;
 +    ReplayPosition discardedPos;
 +    
 +    @BeforeClass
 +    static public void initialize() throws FileNotFoundException, 
IOException, InterruptedException
 +    {
 +        try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
 +        {
 +            dataSource = 
ByteBuffer.allocateDirect((int)fis.getChannel().size());
 +            while (dataSource.hasRemaining()) {
 +                fis.getChannel().read(dataSource);
 +            }
 +            dataSource.flip();
 +        }
 +
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.schemaDefinition(""); // leave def. blank to maintain 
old behaviour
 +
 +        File dir = new File(location);
 +        if (dir.isDirectory())
 +        {
 +            File[] files = dir.listFiles();
 +    
 +            for (File f : files)
 +                if (!f.delete())
 +                    Assert.fail("Failed to delete " + f);
 +        } else {
 +            dir.mkdir();
 +        }
 +    }
 +
 +    @Test
 +    public void testRandomSize() throws Exception
 +    {
 +        randomSize = false;
 +        discardedRun = false;
 +        testAllLogConfigs();
 +    }
 +
 +    @Test
 +    public void testFixedSize() throws Exception
 +    {
 +        randomSize = false;
 +        discardedRun = false;
 +
 +        testAllLogConfigs();
 +    }
 +
 +    @Test
 +    public void testDiscardedRun() throws Exception
 +    {
 +        discardedRun = true;
 +        randomSize = true;
 +
 +        testAllLogConfigs();
 +    }
 +
 +    public void testAllLogConfigs() throws IOException, InterruptedException
 +    {
 +        failed = false;
 +        DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
 +        DatabaseDescriptor.setCommitLogSyncPeriod(30);
 +        DatabaseDescriptor.setCommitLogSegmentSize(32);
 +        for (ParameterizedClass compressor : new ParameterizedClass[] {
 +                null,
 +                new ParameterizedClass("LZ4Compressor", null),
 +                new ParameterizedClass("SnappyCompressor", null),
 +                new ParameterizedClass("DeflateCompressor", null)}) {
 +            DatabaseDescriptor.setCommitLogCompression(compressor);
 +            for (CommitLogSync sync : CommitLogSync.values())
 +            {
 +                DatabaseDescriptor.setCommitLogSync(sync);
 +                CommitLog commitLog = new CommitLog(location, 
CommitLog.instance.archiver);
 +                testLog(commitLog);
 +            }
 +        }
 +        assert !failed;
 +    }
 +
 +    public void testLog(CommitLog commitLog) throws IOException, 
InterruptedException {
 +        System.out.format("\nTesting commit log size %dmb, compressor %s, 
sync %s%s%s\n",
 +                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
 +                           commitLog.compressor != null ? 
commitLog.compressor.getClass().getSimpleName() : "none",
 +                           commitLog.executor.getClass().getSimpleName(),
 +                           randomSize ? " random size" : "",
 +                           discardedRun ? " with discarded run" : "");
 +        commitLog.allocator.enableReserveSegmentCreation();
 +        
 +        final List<CommitlogExecutor> threads = new ArrayList<>();
 +        ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 +
 +        discardedPos = ReplayPosition.NONE;
 +        if (discardedRun) {
 +            // Makes sure post-break data is not deleted, and that replayer 
correctly rejects earlier mutations.
 +            Thread.sleep(runTimeMs / 3);
 +            stop = true;
 +            scheduled.shutdown();
 +            scheduled.awaitTermination(2, TimeUnit.SECONDS);
 +
 +            for (CommitlogExecutor t: threads)
 +            {
 +                t.join();
 +                CommitLog.instance.discardCompletedSegments( 
Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp);
 +                if (t.rp.compareTo(discardedPos) > 0)
 +                    discardedPos = t.rp;
 +            }
 +            threads.clear();
 +            System.out.format("Discarded at %s\n", discardedPos);
 +
 +            scheduled = startThreads(commitLog, threads);
 +        }
 +
 +        
 +        Thread.sleep(runTimeMs);
 +        stop = true;
 +        scheduled.shutdown();
 +        scheduled.awaitTermination(2, TimeUnit.SECONDS);
 +
 +        int hash = 0;
 +        int cells = 0;
 +        for (CommitlogExecutor t: threads) {
 +            t.join();
 +            hash += t.hash;
 +            cells += t.cells;
 +        }
 +        
 +        commitLog.shutdownBlocking();
 +
 +        System.out.print("Stopped. Replaying... "); System.out.flush();
 +        Replayer repl = new Replayer();
 +        File[] files = new File(location).listFiles();
 +        repl.recover(files);
 +
 +        for (File f : files)
 +            if (!f.delete())
 +                Assert.fail("Failed to delete " + f);
 +        
 +        if (hash == repl.hash && cells == repl.cells)
 +            System.out.println("Test success.");
 +        else
 +        {
 +            System.out.format("Test failed. Cells %d expected %d, hash %d 
expected %d.\n", repl.cells, cells, repl.hash, hash);
 +            failed = true;
 +        }
 +    }
 +
 +    public ScheduledExecutorService startThreads(CommitLog commitLog, final 
List<CommitlogExecutor> threads)
 +    {
 +        stop = false;
 +        for (int ii = 0; ii < NUM_THREADS; ii++) {
 +            final CommitlogExecutor t = new CommitlogExecutor(commitLog);
 +            threads.add(t);
 +            t.start();
 +        }
 +
 +        final long start = System.currentTimeMillis();
 +        Runnable printRunnable = new Runnable() {
 +            long lastUpdate = 0;
 +
 +            public void run() {
 +              Runtime runtime = Runtime.getRuntime();
 +              long maxMemory = mb(runtime.maxMemory());
 +              long allocatedMemory = mb(runtime.totalMemory());
 +              long freeMemory = mb(runtime.freeMemory());
 +              long temp = 0;
 +              long sz = 0;
 +              for (CommitlogExecutor cle : threads) {
 +                  temp += cle.counter.get();
 +                  sz += cle.dataSize;
 +              }
 +              double time = (System.currentTimeMillis() - start) / 1000.0;
 +              double avg = (temp / time);
 +              System.out.println(String.format("second %d mem max %dmb 
allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
 +                      ((System.currentTimeMillis() - start) / 1000),
 +                      maxMemory, allocatedMemory, freeMemory, (temp - 
lastUpdate), lastUpdate, avg, mb(sz / time)));
 +              lastUpdate = temp;
 +            }
 +        };
 +        ScheduledExecutorService scheduled = 
Executors.newScheduledThreadPool(1);
 +        scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
 +        return scheduled;
 +    }
 +
 +    private static long mb(long maxMemory) {
 +        return maxMemory / (1024 * 1024);
 +    }
 +
 +    private static double mb(double maxMemory) {
 +        return maxMemory / (1024 * 1024);
 +    }
 +
 +    public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) 
{
 +        ByteBuffer slice = ByteBuffer.allocate(quantity);
 +        ByteBuffer source = dataSource.duplicate();
 +        source.position(tlr.nextInt(source.capacity() - quantity));
 +        source.limit(source.position() + quantity);
 +        slice.put(source);
 +        slice.flip();
 +        return slice;
 +    }
 +
 +    public class CommitlogExecutor extends Thread {
 +        final AtomicLong counter = new AtomicLong();
 +        int hash = 0;
 +        int cells = 0;
 +        int dataSize = 0;
 +        final CommitLog commitLog;
 +
 +        volatile ReplayPosition rp;
 +
 +        public CommitlogExecutor(CommitLog commitLog)
 +        {
 +            this.commitLog = commitLog;
 +        }
 +
 +        public void run() {
 +            RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : 
null;
 +            final ThreadLocalRandom tlr = ThreadLocalRandom.current();
 +            while (!stop) {
 +                if (rl != null)
 +                    rl.acquire();
 +                String ks = "Keyspace1";
 +                ByteBuffer key = randomBytes(16, tlr);
 +                Mutation mutation = new Mutation(ks, key);
 +
 +                for (int ii = 0; ii < numCells; ii++) {
 +                    int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
 +                    ByteBuffer bytes = randomBytes(sz, tlr);
 +                    mutation.add("Standard1", Util.cellname("name" + ii), 
bytes,
 +                            System.currentTimeMillis());
 +                    hash = hash(hash, bytes);
 +                    ++cells;
 +                    dataSize += sz;
 +                }
 +                rp = commitLog.add(mutation);
 +                counter.incrementAndGet();
 +            }
 +        }
 +    }
 +    
-     class Replayer extends CommitLogReplayer {
- 
++    class Replayer extends CommitLogReplayer
++    {
 +        Replayer()
 +        {
-             super(discardedPos, null);
++            super(discardedPos, null, ReplayFilter.create());
 +        }
 +
 +        int hash = 0;
 +        int cells = 0;
 +
 +        @Override
 +        void replayMutation(byte[] inputBuffer, int size,
 +                final long entryLocation, final CommitLogDescriptor desc, 
final ReplayFilter replayFilter)
 +        {
 +            if (desc.id < discardedPos.segment) {
 +                System.out.format("Mutation from discarded segment, segment 
%d pos %d\n", desc.id, entryLocation);
 +                return;
 +            } else if (desc.id == discardedPos.segment && entryLocation <= 
discardedPos.position)
 +                // Skip over this mutation.
 +                return;
 +                
 +            FastByteArrayInputStream bufIn = new 
FastByteArrayInputStream(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(new 
DataInputStream(bufIn),
 +                                                               
desc.getMessagingVersion(),
 +                                                               
ColumnSerializer.Flag.LOCAL);
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
 +
 +            for (ColumnFamily cf : mutation.getColumnFamilies()) {
 +                for (Cell c : cf.getSortedColumns()) {
 +                    if (new String(c.name().toByteBuffer().array(), 
StandardCharsets.UTF_8).startsWith("name"))
 +                    {
 +                        hash = hash(hash, c.value());
 +                        ++cells;
 +                    }
 +                }
 +            }
 +        }
 +        
 +    }
 +}

Reply via email to