Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90476352 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90476352 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90476352 Branch: refs/heads/trunk Commit: 90476352a56568fc47418905dd31a16eb00a7981 Parents: 0656924 1f6bf36 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Apr 30 19:04:35 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Apr 30 19:04:35 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../db/commitlog/CommitLogReplayer.java | 49 +++++++++++++++++--- .../db/commitlog/CommitLogStressTest.java | 6 +-- 3 files changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 4af4f67,a01e8ed..0a69930 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,104 -1,5 +1,105 @@@ +3.0 + * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242) + * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049) + * Distinguish between null and unset in protocol v4 (CASSANDRA-7304) + * Add user/role permissions for user-defined functions (CASSANDRA-7557) + * Allow cassandra config to be updated to restart daemon without unloading classes (CASSANDRA-9046) + * Don't initialize compaction writer before checking if iter is empty (CASSANDRA-9117) + * Remove line number generation from default logback.xml + * Don't execute any functions at prepare-time (CASSANDRA-9037) + * Share file handles between all instances of a SegmentedFile (CASSANDRA-8893) + * Make it possible to major compact LCS (CASSANDRA-7272) + * Make FunctionExecutionException extend RequestExecutionException + (CASSANDRA-9055) + * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), fromJson() + functions (CASSANDRA-7970) + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920) + * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) + * New tool added to validate all sstables in a node (CASSANDRA-5791) + * Push notification when tracing completes for an operation (CASSANDRA-7807) + * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) + * Compressed Commit Log (CASSANDRA-6809) + * Optimise IntervalTree (CASSANDRA-8988) + * Add a key-value payload for third party usage (CASSANDRA-8553, 9212) + * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149) + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789) + * Add WriteFailureException to native protocol, notify coordinator of + write failures (CASSANDRA-8592) + * Convert SequentialWriter to nio (CASSANDRA-8709) + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850) + * Record client ip address in tracing sessions (CASSANDRA-8162) + * Indicate partition key columns in response metadata for prepared + statements (CASSANDRA-7660) + * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759) + * Avoid memory allocation when searching index summary (CASSANDRA-8793) + * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730) + * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836) + * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714) + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268) + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657) + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) + * Support direct buffer decompression for reads (CASSANDRA-8464) + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039) + * Group sstables for anticompaction correctly (CASSANDRA-8578) + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) + * Allow mixing token and partition key restrictions (CASSANDRA-7016) + * Support index key/value entries on map collections (CASSANDRA-8473) + * Modernize schema tables (CASSANDRA-8261) + * Support for user-defined aggregation functions (CASSANDRA-8053) + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) + * Refactor SelectStatement, return IN results in natural order instead + of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981) + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) + * Fix aggregate fn results on empty selection, result column name, + and cqlsh parsing (CASSANDRA-8229) + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any partition key column (CASSANDRA-7855) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Add Thrift get_multi_slice call (CASSANDRA-6757) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * Improve concurrency of repair (CASSANDRA-6455, 8208) + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) + * Generalize progress reporting (CASSANDRA-8901) + * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942) + * Allow scrub for secondary index (CASSANDRA-5174) + * Save repair data to system table (CASSANDRA-5839) + * fix nodetool names that reference column families (CASSANDRA-8872) + 2.1.6 + * Fix PITR commitlog replay (CASSANDRA-9195) * GCInspector logs very different times (CASSANDRA-9124) * Fix deleting from an empty list (CASSANDRA-9198) * Update tuple and collection types that use a user-defined type when that UDT http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 52d1251,57f4b90..f6d1cc4 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -41,25 -32,15 +41,26 @@@ import org.apache.commons.lang3.StringU import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.tjake.ICRC32; + import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ByteBufferDataInput; import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CRC32Factory; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.WrappedRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; public class CommitLogReplayer @@@ -74,11 -55,12 +75,13 @@@ private final AtomicInteger replayedCount; private final Map<UUID, ReplayPosition> cfPositions; private final ReplayPosition globalPosition; - private final PureJavaCrc32 checksum; + private final ICRC32 checksum; private byte[] buffer; + private byte[] uncompressedBuffer; - CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions) + private final ReplayFilter replayFilter; + - public CommitLogReplayer() ++ CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); this.futures = new ArrayList<Future<?>>(); @@@ -87,16 -68,13 +90,18 @@@ this.invalidMutations = new HashMap<UUID, AtomicInteger>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); - this.checksum = new PureJavaCrc32(); - - replayFilter = ReplayFilter.create(); + this.checksum = CRC32Factory.instance.create(); + this.cfPositions = cfPositions; + this.globalPosition = globalPosition; ++ this.replayFilter = replayFilter; + } + public static CommitLogReplayer create() + { // compute per-CF and global replay positions - cfPositions = new HashMap<UUID, ReplayPosition>(); + Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); ++ ReplayFilter replayFilter = ReplayFilter.create(); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call @@@ -104,16 -82,35 +109,36 @@@ // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); -- // but, if we've truncted the cf in question, then we need to need to start replay after the truncation ++ // but, if we've truncated the cf in question, then we need to need to start replay after the truncation ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); if (truncatedAt != null) - rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + { + // Point in time restore is taken to mean that the tables need to be recovered even if they were + // deleted at a later point in time. Any truncation record after that point must thus be cleared prior + // to recovery (CASSANDRA-9195). + long restoreTime = CommitLog.instance.archiver.restorePointInTime; + long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId); + if (truncatedTime > restoreTime) + { + if (replayFilter.includes(cfs.metadata)) + { + logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", + cfs.metadata.ksName, + cfs.metadata.cfName); + SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId); + } + } + else + { + rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + } + } cfPositions.put(cfs.metadata.cfId, rp); } - globalPosition = replayPositionOrdering.min(cfPositions.values()); + ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values()); logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions)); - return new CommitLogReplayer(globalPosition, cfPositions); ++ return new CommitLogReplayer(globalPosition, cfPositions, replayFilter); } public void recover(File[] clogs) throws IOException @@@ -233,18 -264,22 +271,17 @@@ public void recover(File file) throws IOException { - final ReplayFilter replayFilter = ReplayFilter.create(); - logger.info("Replaying {}", file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); - final long segmentId = desc.id; - logger.info("Replaying {} (CL version {}, messaging version {})", - file.getPath(), - desc.version, - desc.getMessagingVersion()); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); - try { - assert reader.length() <= Integer.MAX_VALUE; - int offset = getStartOffset(segmentId, desc.version); - if (offset < 0) + if (desc.version < CommitLogDescriptor.VERSION_21) { - logger.debug("skipping replay of fully-flushed {}", file); + if (logAndCheckIfShouldSkip(file, desc)) + return; + if (globalPosition.segment == desc.id) + reader.seek(globalPosition.position); + replaySyncSection(reader, -1, desc, replayFilter); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90476352/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 758d2f9,0000000..644e2c2 mode 100644,000000..100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@@ -1,412 -1,0 +1,412 @@@ +package org.apache.cassandra.db.commitlog; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.Assert; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.RateLimiter; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.util.FastByteArrayInputStream; + +public class CommitLogStressTest +{ + + public static ByteBuffer dataSource; + + public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; + + public static int numCells = 1; + + public static int cellSize = 1024; + + public static int rateLimit = 0; + + public static int runTimeMs = 10000; + + public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress"; + + public static int hash(int hash, ByteBuffer bytes) + { + int shift = 0; + for (int i=0; i<bytes.limit(); i++) { + hash += (bytes.get(i) & 0xFF) << shift; + shift = (shift + 8) & 0x1F; + } + return hash; + } + + public static void main(String[] args) throws Exception { + try { + if (args.length >= 1) { + NUM_THREADS = Integer.parseInt(args[0]); + System.out.println("Setting num threads to: " + NUM_THREADS); + } + + if (args.length >= 2) { + numCells = Integer.parseInt(args[1]); + System.out.println("Setting num cells to: " + numCells); + } + + if (args.length >= 3) { + cellSize = Integer.parseInt(args[1]); + System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small"); + } + + if (args.length >= 4) { + rateLimit = Integer.parseInt(args[1]); + System.out.println("Setting per thread rate limit to: " + rateLimit); + } + initialize(); + + CommitLogStressTest tester = new CommitLogStressTest(); + tester.testFixedSize(); + } + catch (Exception e) + { + e.printStackTrace(System.err); + } + finally { + System.exit(0); + } + } + + boolean failed = false; + volatile boolean stop = false; + boolean randomSize = false; + boolean discardedRun = false; + ReplayPosition discardedPos; + + @BeforeClass + static public void initialize() throws FileNotFoundException, IOException, InterruptedException + { + try (FileInputStream fis = new FileInputStream("CHANGES.txt")) + { + dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size()); + while (dataSource.hasRemaining()) { + fis.getChannel().read(dataSource); + } + dataSource.flip(); + } + + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour + + File dir = new File(location); + if (dir.isDirectory()) + { + File[] files = dir.listFiles(); + + for (File f : files) + if (!f.delete()) + Assert.fail("Failed to delete " + f); + } else { + dir.mkdir(); + } + } + + @Test + public void testRandomSize() throws Exception + { + randomSize = false; + discardedRun = false; + testAllLogConfigs(); + } + + @Test + public void testFixedSize() throws Exception + { + randomSize = false; + discardedRun = false; + + testAllLogConfigs(); + } + + @Test + public void testDiscardedRun() throws Exception + { + discardedRun = true; + randomSize = true; + + testAllLogConfigs(); + } + + public void testAllLogConfigs() throws IOException, InterruptedException + { + failed = false; + DatabaseDescriptor.setCommitLogSyncBatchWindow(1); + DatabaseDescriptor.setCommitLogSyncPeriod(30); + DatabaseDescriptor.setCommitLogSegmentSize(32); + for (ParameterizedClass compressor : new ParameterizedClass[] { + null, + new ParameterizedClass("LZ4Compressor", null), + new ParameterizedClass("SnappyCompressor", null), + new ParameterizedClass("DeflateCompressor", null)}) { + DatabaseDescriptor.setCommitLogCompression(compressor); + for (CommitLogSync sync : CommitLogSync.values()) + { + DatabaseDescriptor.setCommitLogSync(sync); + CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver); + testLog(commitLog); + } + } + assert !failed; + } + + public void testLog(CommitLog commitLog) throws IOException, InterruptedException { + System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n", + mb(DatabaseDescriptor.getCommitLogSegmentSize()), + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : "", + discardedRun ? " with discarded run" : ""); + commitLog.allocator.enableReserveSegmentCreation(); + + final List<CommitlogExecutor> threads = new ArrayList<>(); + ScheduledExecutorService scheduled = startThreads(commitLog, threads); + + discardedPos = ReplayPosition.NONE; + if (discardedRun) { + // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations. + Thread.sleep(runTimeMs / 3); + stop = true; + scheduled.shutdown(); + scheduled.awaitTermination(2, TimeUnit.SECONDS); + + for (CommitlogExecutor t: threads) + { + t.join(); + CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp); + if (t.rp.compareTo(discardedPos) > 0) + discardedPos = t.rp; + } + threads.clear(); + System.out.format("Discarded at %s\n", discardedPos); + + scheduled = startThreads(commitLog, threads); + } + + + Thread.sleep(runTimeMs); + stop = true; + scheduled.shutdown(); + scheduled.awaitTermination(2, TimeUnit.SECONDS); + + int hash = 0; + int cells = 0; + for (CommitlogExecutor t: threads) { + t.join(); + hash += t.hash; + cells += t.cells; + } + + commitLog.shutdownBlocking(); + + System.out.print("Stopped. Replaying... "); System.out.flush(); + Replayer repl = new Replayer(); + File[] files = new File(location).listFiles(); + repl.recover(files); + + for (File f : files) + if (!f.delete()) + Assert.fail("Failed to delete " + f); + + if (hash == repl.hash && cells == repl.cells) + System.out.println("Test success."); + else + { + System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash); + failed = true; + } + } + + public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads) + { + stop = false; + for (int ii = 0; ii < NUM_THREADS; ii++) { + final CommitlogExecutor t = new CommitlogExecutor(commitLog); + threads.add(t); + t.start(); + } + + final long start = System.currentTimeMillis(); + Runnable printRunnable = new Runnable() { + long lastUpdate = 0; + + public void run() { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = mb(runtime.maxMemory()); + long allocatedMemory = mb(runtime.totalMemory()); + long freeMemory = mb(runtime.freeMemory()); + long temp = 0; + long sz = 0; + for (CommitlogExecutor cle : threads) { + temp += cle.counter.get(); + sz += cle.dataSize; + } + double time = (System.currentTimeMillis() - start) / 1000.0; + double avg = (temp / time); + System.out.println(String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb", + ((System.currentTimeMillis() - start) / 1000), + maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate, avg, mb(sz / time))); + lastUpdate = temp; + } + }; + ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); + scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS); + return scheduled; + } + + private static long mb(long maxMemory) { + return maxMemory / (1024 * 1024); + } + + private static double mb(double maxMemory) { + return maxMemory / (1024 * 1024); + } + + public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) { + ByteBuffer slice = ByteBuffer.allocate(quantity); + ByteBuffer source = dataSource.duplicate(); + source.position(tlr.nextInt(source.capacity() - quantity)); + source.limit(source.position() + quantity); + slice.put(source); + slice.flip(); + return slice; + } + + public class CommitlogExecutor extends Thread { + final AtomicLong counter = new AtomicLong(); + int hash = 0; + int cells = 0; + int dataSize = 0; + final CommitLog commitLog; + + volatile ReplayPosition rp; + + public CommitlogExecutor(CommitLog commitLog) + { + this.commitLog = commitLog; + } + + public void run() { + RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; + final ThreadLocalRandom tlr = ThreadLocalRandom.current(); + while (!stop) { + if (rl != null) + rl.acquire(); + String ks = "Keyspace1"; + ByteBuffer key = randomBytes(16, tlr); + Mutation mutation = new Mutation(ks, key); + + for (int ii = 0; ii < numCells; ii++) { + int sz = randomSize ? tlr.nextInt(cellSize) : cellSize; + ByteBuffer bytes = randomBytes(sz, tlr); + mutation.add("Standard1", Util.cellname("name" + ii), bytes, + System.currentTimeMillis()); + hash = hash(hash, bytes); + ++cells; + dataSize += sz; + } + rp = commitLog.add(mutation); + counter.incrementAndGet(); + } + } + } + - class Replayer extends CommitLogReplayer { - ++ class Replayer extends CommitLogReplayer ++ { + Replayer() + { - super(discardedPos, null); ++ super(discardedPos, null, ReplayFilter.create()); + } + + int hash = 0; + int cells = 0; + + @Override + void replayMutation(byte[] inputBuffer, int size, + final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) + { + if (desc.id < discardedPos.segment) { + System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); + return; + } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) + // Skip over this mutation. + return; + + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + Mutation mutation; + try + { + mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); + } + catch (IOException e) + { + // Test fails. + throw new AssertionError(e); + } + + for (ColumnFamily cf : mutation.getColumnFamilies()) { + for (Cell c : cf.getSortedColumns()) { + if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name")) + { + hash = hash(hash, c.value()); + ++cells; + } + } + } + } + + } +}