Repository: cassandra Updated Branches: refs/heads/trunk bc9b0be32 -> 3f0dd5dff
Expand upgrade testing for commitlog changes Patch by blambov; reviewed by jmckenzie for CASSANDRA-9346 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a384faaa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a384faaa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a384faaa Branch: refs/heads/trunk Commit: a384faaa8aa2c5f0f313011a30ef64e7e795ab1e Parents: 01115f7 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Jun 24 12:47:59 2015 -0400 Committer: Josh McKenzie <josh.mcken...@datastax.com> Committed: Wed Jun 24 12:47:59 2015 -0400 ---------------------------------------------------------------------- .../db/commitlog/CommitLogReplayer.java | 2 +- .../2.0/CommitLog-3-1431528750790.log | Bin 0 -> 2097152 bytes .../2.0/CommitLog-3-1431528750791.log | Bin 0 -> 2097152 bytes .../2.0/CommitLog-3-1431528750792.log | Bin 0 -> 2097152 bytes .../2.0/CommitLog-3-1431528750793.log | Bin 0 -> 2097152 bytes test/data/legacy-commitlog/2.0/hash.txt | 3 + .../2.1/CommitLog-4-1431529069529.log | Bin 0 -> 2097152 bytes .../2.1/CommitLog-4-1431529069530.log | Bin 0 -> 2097152 bytes test/data/legacy-commitlog/2.1/hash.txt | 3 + .../db/commitlog/CommitLogStressTest.java | 217 +++++++++------- .../db/commitlog/CommitLogUpgradeTest.java | 143 +++++++++++ .../db/commitlog/CommitLogUpgradeTestMaker.java | 250 +++++++++++++++++++ 12 files changed, 527 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index a59e70e..176f64b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -281,7 +281,7 @@ public class CommitLogReplayer return; if (globalPosition.segment == desc.id) reader.seek(globalPosition.position); - replaySyncSection(reader, -1, desc); + replaySyncSection(reader, (int) reader.getPositionLimit(), desc); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log new file mode 100644 index 0000000..3301331 Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log new file mode 100644 index 0000000..04314d6 Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log new file mode 100644 index 0000000..a9af9e4 Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log new file mode 100644 index 0000000..3301331 Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.0/hash.txt b/test/data/legacy-commitlog/2.0/hash.txt new file mode 100644 index 0000000..4bbec02 --- /dev/null +++ b/test/data/legacy-commitlog/2.0/hash.txt @@ -0,0 +1,3 @@ +cfid = 4d331c44-f018-302b-91c2-2dcf94c4bfad +cells = 9724 +hash = -682777064 http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log new file mode 100644 index 0000000..60064ee Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log new file mode 100644 index 0000000..fdf7071 Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.1/hash.txt b/test/data/legacy-commitlog/2.1/hash.txt new file mode 100644 index 0000000..f05cf97 --- /dev/null +++ b/test/data/legacy-commitlog/2.1/hash.txt @@ -0,0 +1,3 @@ +cfid = 6c622920-f980-11e4-b8a0-e7d448d5e26d +cells = 5165 +hash = -1915888171 http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index f5fd2cf..5897dec 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -1,4 +1,5 @@ package org.apache.cassandra.db.commitlog; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,7 +21,6 @@ package org.apache.cassandra.db.commitlog; * */ - import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -60,54 +60,56 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream; public class CommitLogStressTest { - public static ByteBuffer dataSource; - - public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; + public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; public static int numCells = 1; - public static int cellSize = 1024; - public static int rateLimit = 0; - public static int runTimeMs = 10000; - + public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress"; - + public static int hash(int hash, ByteBuffer bytes) { int shift = 0; - for (int i=0; i<bytes.limit(); i++) { + for (int i = 0; i < bytes.limit(); i++) + { hash += (bytes.get(i) & 0xFF) << shift; shift = (shift + 8) & 0x1F; } return hash; } - - public static void main(String[] args) throws Exception { - try { - if (args.length >= 1) { + + public static void main(String[] args) throws Exception + { + try + { + if (args.length >= 1) + { NUM_THREADS = Integer.parseInt(args[0]); System.out.println("Setting num threads to: " + NUM_THREADS); } - - if (args.length >= 2) { + + if (args.length >= 2) + { numCells = Integer.parseInt(args[1]); System.out.println("Setting num cells to: " + numCells); } - - if (args.length >= 3) { + + if (args.length >= 3) + { cellSize = Integer.parseInt(args[1]); System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small"); } - - if (args.length >= 4) { + + if (args.length >= 4) + { rateLimit = Integer.parseInt(args[1]); System.out.println("Setting per thread rate limit to: " + rateLimit); } initialize(); - + CommitLogStressTest tester = new CommitLogStressTest(); tester.testFixedSize(); } @@ -115,24 +117,26 @@ public class CommitLogStressTest { e.printStackTrace(System.err); } - finally { + finally + { System.exit(0); } } - + boolean failed = false; volatile boolean stop = false; boolean randomSize = false; boolean discardedRun = false; ReplayPosition discardedPos; - + @BeforeClass - static public void initialize() throws FileNotFoundException, IOException, InterruptedException + static public void initialize() throws IOException { try (FileInputStream fis = new FileInputStream("CHANGES.txt")) { - dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size()); - while (dataSource.hasRemaining()) { + dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size()); + while (dataSource.hasRemaining()) + { fis.getChannel().read(dataSource); } dataSource.flip(); @@ -141,7 +145,7 @@ public class CommitLogStressTest SchemaLoader.loadSchema(); SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour } - + @Before public void cleanDir() { @@ -149,11 +153,13 @@ public class CommitLogStressTest if (dir.isDirectory()) { File[] files = dir.listFiles(); - + for (File f : files) if (!f.delete()) Assert.fail("Failed to delete " + f); - } else { + } + else + { dir.mkdir(); } } @@ -194,7 +200,8 @@ public class CommitLogStressTest null, new ParameterizedClass("LZ4Compressor", null), new ParameterizedClass("SnappyCompressor", null), - new ParameterizedClass("DeflateCompressor", null)}) { + new ParameterizedClass("DeflateCompressor", null) }) + { DatabaseDescriptor.setCommitLogCompression(compressor); for (CommitLogSync sync : CommitLogSync.values()) { @@ -206,27 +213,29 @@ public class CommitLogStressTest assert !failed; } - public void testLog(CommitLog commitLog) throws IOException, InterruptedException { + public void testLog(CommitLog commitLog) throws IOException, InterruptedException + { System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n", - mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.executor.getClass().getSimpleName(), - randomSize ? " random size" : "", - discardedRun ? " with discarded run" : ""); + mb(DatabaseDescriptor.getCommitLogSegmentSize()), + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : "", + discardedRun ? " with discarded run" : ""); commitLog.allocator.enableReserveSegmentCreation(); - + final List<CommitlogExecutor> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); discardedPos = ReplayPosition.NONE; - if (discardedRun) { + if (discardedRun) + { // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations. Thread.sleep(runTimeMs / 3); stop = true; scheduled.shutdown(); scheduled.awaitTermination(2, TimeUnit.SECONDS); - for (CommitlogExecutor t: threads) + for (CommitlogExecutor t : threads) { t.join(); if (t.rp.compareTo(discardedPos) > 0) @@ -234,15 +243,15 @@ public class CommitLogStressTest } verifySizes(commitLog); - commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos); + commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, + discardedPos); threads.clear(); System.out.format("Discarded at %s\n", discardedPos); verifySizes(commitLog); - + scheduled = startThreads(commitLog, threads); } - Thread.sleep(runTimeMs); stop = true; scheduled.shutdown(); @@ -250,16 +259,18 @@ public class CommitLogStressTest int hash = 0; int cells = 0; - for (CommitlogExecutor t: threads) { + for (CommitlogExecutor t : threads) + { t.join(); hash += t.hash; cells += t.cells; } verifySizes(commitLog); - + commitLog.shutdownBlocking(); - System.out.print("Stopped. Replaying... "); System.out.flush(); + System.out.print("Stopped. Replaying... "); + System.out.flush(); Replayer repl = new Replayer(); File[] files = new File(location).listFiles(); repl.recover(files); @@ -267,12 +278,16 @@ public class CommitLogStressTest for (File f : files) if (!f.delete()) Assert.fail("Failed to delete " + f); - + if (hash == repl.hash && cells == repl.cells) System.out.println("Test success."); else { - System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash); + System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", + repl.cells, + cells, + repl.hash, + hash); failed = true; } } @@ -287,7 +302,7 @@ public class CommitLogStressTest commitLog.executor.requestExtraSync().awaitUninterruptibly(); // Wait for any pending deletes or segment allocations to complete. commitLog.allocator.awaitManagementTasksCompletion(); - + long combinedSize = 0; for (File f : new File(commitLog.location).listFiles()) combinedSize += f.length(); @@ -297,11 +312,11 @@ public class CommitLogStressTest Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios(); Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments(); - for (CommitLogSegment segment: segments) + for (CommitLogSegment segment : segments) { Assert.assertTrue(logFileNames.remove(segment.getName())); Double ratio = ratios.remove(segment.getName()); - + Assert.assertEquals(segment.logFile.length(), segment.onDiskSize()); Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio, 0.01); } @@ -312,35 +327,47 @@ public class CommitLogStressTest public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads) { stop = false; - for (int ii = 0; ii < NUM_THREADS; ii++) { + for (int ii = 0; ii < NUM_THREADS; ii++) + { final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii)); threads.add(t); t.start(); } final long start = System.currentTimeMillis(); - Runnable printRunnable = new Runnable() { + Runnable printRunnable = new Runnable() + { long lastUpdate = 0; - public void run() { - Runtime runtime = Runtime.getRuntime(); - long maxMemory = runtime.maxMemory(); - long allocatedMemory = runtime.totalMemory(); - long freeMemory = runtime.freeMemory(); - long temp = 0; - long sz = 0; - for (CommitlogExecutor cle : threads) { - temp += cle.counter.get(); - sz += cle.dataSize; - } - double time = (System.currentTimeMillis() - start) / 1000.0; - double avg = (temp / time); - System.out.println( - String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb", - ((System.currentTimeMillis() - start) / 1000), - mb(maxMemory), mb(allocatedMemory), mb(freeMemory), (temp - lastUpdate), lastUpdate, avg, - mb(commitLog.getActiveContentSize()), mb(commitLog.getActiveOnDiskSize()), mb(sz / time))); - lastUpdate = temp; + public void run() + { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = runtime.maxMemory(); + long allocatedMemory = runtime.totalMemory(); + long freeMemory = runtime.freeMemory(); + long temp = 0; + long sz = 0; + for (CommitlogExecutor cle : threads) + { + temp += cle.counter.get(); + sz += cle.dataSize; + } + double time = (System.currentTimeMillis() - start) / 1000.0; + double avg = (temp / time); + System.out + .println( + String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb", + ((System.currentTimeMillis() - start) / 1000), + mb(maxMemory), + mb(allocatedMemory), + mb(freeMemory), + (temp - lastUpdate), + lastUpdate, + avg, + mb(commitLog.getActiveContentSize()), + mb(commitLog.getActiveOnDiskSize()), + mb(sz / time))); + lastUpdate = temp; } }; ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); @@ -348,15 +375,18 @@ public class CommitLogStressTest return scheduled; } - private static double mb(long maxMemory) { + private static double mb(long maxMemory) + { return maxMemory / (1024.0 * 1024); } - private static double mb(double maxMemory) { + private static double mb(double maxMemory) + { return maxMemory / (1024 * 1024); } - public static ByteBuffer randomBytes(int quantity, Random tlr) { + public static ByteBuffer randomBytes(int quantity, Random tlr) + { ByteBuffer slice = ByteBuffer.allocate(quantity); ByteBuffer source = dataSource.duplicate(); source.position(tlr.nextInt(source.capacity() - quantity)); @@ -366,7 +396,8 @@ public class CommitLogStressTest return slice; } - public class CommitlogExecutor extends Thread { + public class CommitlogExecutor extends Thread + { final AtomicLong counter = new AtomicLong(); int hash = 0; int cells = 0; @@ -382,21 +413,23 @@ public class CommitLogStressTest this.random = rand; } - public void run() { + public void run() + { RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; final Random rand = random != null ? random : ThreadLocalRandom.current(); - while (!stop) { + while (!stop) + { if (rl != null) rl.acquire(); String ks = "Keyspace1"; ByteBuffer key = randomBytes(16, rand); Mutation mutation = new Mutation(ks, key); - for (int ii = 0; ii < numCells; ii++) { + for (int ii = 0; ii < numCells; ii++) + { int sz = randomSize ? rand.nextInt(cellSize) : cellSize; ByteBuffer bytes = randomBytes(sz, rand); - mutation.add("Standard1", Util.cellname("name" + ii), bytes, - System.currentTimeMillis()); + mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis()); hash = hash(hash, bytes); ++cells; dataSize += sz; @@ -406,7 +439,7 @@ public class CommitLogStressTest } } } - + class Replayer extends CommitLogReplayer { Replayer() @@ -420,20 +453,22 @@ public class CommitLogStressTest @Override void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) { - if (desc.id < discardedPos.segment) { + if (desc.id < discardedPos.segment) + { System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); return; - } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) + } + else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) // Skip over this mutation. return; - + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); Mutation mutation; try { mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), - desc.getMessagingVersion(), - ColumnSerializer.Flag.LOCAL); + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); } catch (IOException e) { @@ -441,8 +476,10 @@ public class CommitLogStressTest throw new AssertionError(e); } - for (ColumnFamily cf : mutation.getColumnFamilies()) { - for (Cell c : cf.getSortedColumns()) { + for (ColumnFamily cf : mutation.getColumnFamilies()) + { + for (Cell c : cf.getSortedColumns()) + { if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name")) { hash = hash(hash, c.value()); @@ -451,6 +488,6 @@ public class CommitLogStressTest } } } - + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java new file mode 100644 index 0000000..1655078 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java @@ -0,0 +1,143 @@ +package org.apache.cassandra.db.commitlog; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.UUID; + +import junit.framework.Assert; + +import com.google.common.base.Predicate; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.Mutation; + +public class CommitLogUpgradeTest +{ + static final String DATA_DIR = "test/data/legacy-commitlog/"; + static final String PROPERTIES_FILE = "hash.txt"; + static final String CFID_PROPERTY = "cfid"; + static final String CELLS_PROPERTY = "cells"; + static final String HASH_PROPERTY = "hash"; + + static final String TABLE = "Standard1"; + static final String KEYSPACE = "Keyspace1"; + static final String CELLNAME = "name"; + + @Test + public void test20() throws Exception + { + testRestore(DATA_DIR + "2.0"); + } + + @Test + public void test21() throws Exception + { + testRestore(DATA_DIR + "2.1"); + } + + @BeforeClass + static public void initialize() throws FileNotFoundException, IOException, InterruptedException + { + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(""); + } + + public void testRestore(String location) throws IOException, InterruptedException + { + Properties prop = new Properties(); + prop.load(new FileInputStream(new File(location + File.separatorChar + PROPERTIES_FILE))); + int hash = Integer.parseInt(prop.getProperty(HASH_PROPERTY)); + int cells = Integer.parseInt(prop.getProperty(CELLS_PROPERTY)); + + String cfidString = prop.getProperty(CFID_PROPERTY); + if (cfidString != null) + { + UUID cfid = UUID.fromString(cfidString); + if (Schema.instance.getCF(cfid) == null) + { + CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + Schema.instance.purge(cfm); + Schema.instance.load(cfm.copy(cfid)); + } + } + + Hasher hasher = new Hasher(); + CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher); + File[] files = new File(location).listFiles(new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith(".log"); + } + }); + replayer.recover(files); + + Assert.assertEquals(cells, hasher.cells); + Assert.assertEquals(hash, hasher.hash); + } + + public static int hash(int hash, ByteBuffer bytes) + { + int shift = 0; + for (int i = 0; i < bytes.limit(); i++) + { + hash += (bytes.get(i) & 0xFF) << shift; + shift = (shift + 8) & 0x1F; + } + return hash; + } + + class Hasher implements Predicate<Mutation> + { + int hash = 0; + int cells = 0; + + @Override + public boolean apply(Mutation mutation) + { + for (ColumnFamily cf : mutation.getColumnFamilies()) + { + for (Cell c : cf.getSortedColumns()) + { + if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith(CELLNAME)) + { + hash = hash(hash, c.value()); + ++cells; + } + } + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java new file mode 100644 index 0000000..7b07c8e --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java @@ -0,0 +1,250 @@ +package org.apache.cassandra.db.commitlog; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.commitlog.CommitLogUpgradeTest.*; + +public class CommitLogUpgradeTestMaker +{ + public static ByteBuffer dataSource; + + private static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; + public static int numCells = 1; + public static int cellSize = 256; + public static int rateLimit = 0; + public static int runTimeMs = 1000; + + public static void main(String[] args) throws Exception + { + try + { + initialize(); + + CommitLogUpgradeTestMaker tester = new CommitLogUpgradeTestMaker(); + tester.makeLog(); + } + catch (Throwable e) + { + e.printStackTrace(System.err); + } + finally + { + System.exit(0); + } + } + + volatile boolean stop = false; + boolean randomSize = true; + + static public void initialize() throws IOException, ConfigurationException + { + try (FileInputStream fis = new FileInputStream("CHANGES.txt")) + { + dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size()); + while (dataSource.hasRemaining()) + { + fis.getChannel().read(dataSource); + } + dataSource.flip(); + } + + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(""); + } + + public void makeLog() throws IOException, InterruptedException + { + CommitLog commitLog = CommitLog.instance; + System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n", + mb(DatabaseDescriptor.getCommitLogSegmentSize()), + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : ""); + final List<CommitlogExecutor> threads = new ArrayList<>(); + ScheduledExecutorService scheduled = startThreads(commitLog, threads); + + Thread.sleep(runTimeMs); + stop = true; + scheduled.shutdown(); + scheduled.awaitTermination(2, TimeUnit.SECONDS); + + int hash = 0; + int cells = 0; + for (CommitlogExecutor t : threads) + { + t.join(); + hash += t.hash; + cells += t.cells; + } + commitLog.shutdownBlocking(); + + File dataDir = new File(CommitLogUpgradeTest.DATA_DIR + FBUtilities.getReleaseVersionString()); + System.out.format("Data will be stored in %s\n", dataDir); + if (dataDir.exists()) + FileUtils.deleteRecursive(dataDir); + + dataDir.mkdirs(); + for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) + FileUtils.createHardLink(f, new File(dataDir, f.getName())); + + Properties prop = new Properties(); + prop.setProperty(CFID_PROPERTY, Schema.instance.getId(KEYSPACE, TABLE).toString()); + prop.setProperty(CELLS_PROPERTY, Integer.toString(cells)); + prop.setProperty(HASH_PROPERTY, Integer.toString(hash)); + prop.store(new FileOutputStream(new File(dataDir, PROPERTIES_FILE)), + "CommitLog upgrade test, version " + FBUtilities.getReleaseVersionString()); + System.out.println("Done"); + } + + public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads) + { + stop = false; + for (int ii = 0; ii < NUM_THREADS; ii++) + { + final CommitlogExecutor t = new CommitlogExecutor(commitLog); + threads.add(t); + t.start(); + } + + final long start = System.currentTimeMillis(); + Runnable printRunnable = new Runnable() + { + long lastUpdate = 0; + + public void run() + { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = mb(runtime.maxMemory()); + long allocatedMemory = mb(runtime.totalMemory()); + long freeMemory = mb(runtime.freeMemory()); + long temp = 0; + long sz = 0; + for (CommitlogExecutor cle : threads) + { + temp += cle.counter.get(); + sz += cle.dataSize; + } + double time = (System.currentTimeMillis() - start) / 1000.0; + double avg = (temp / time); + System.out.println( + String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb", + ((System.currentTimeMillis() - start) / 1000), + maxMemory, + allocatedMemory, + freeMemory, + (temp - lastUpdate), + lastUpdate, + avg, + mb(sz / time))); + lastUpdate = temp; + } + }; + ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); + scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS); + return scheduled; + } + + private static long mb(long maxMemory) + { + return maxMemory / (1024 * 1024); + } + + private static double mb(double maxMemory) + { + return maxMemory / (1024 * 1024); + } + + public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) + { + ByteBuffer slice = ByteBuffer.allocate(quantity); + ByteBuffer source = dataSource.duplicate(); + source.position(tlr.nextInt(source.capacity() - quantity)); + source.limit(source.position() + quantity); + slice.put(source); + slice.flip(); + return slice; + } + + public class CommitlogExecutor extends Thread + { + final AtomicLong counter = new AtomicLong(); + int hash = 0; + int cells = 0; + int dataSize = 0; + final CommitLog commitLog; + + volatile ReplayPosition rp; + + public CommitlogExecutor(CommitLog commitLog) + { + this.commitLog = commitLog; + } + + public void run() + { + RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; + final ThreadLocalRandom tlr = ThreadLocalRandom.current(); + while (!stop) + { + if (rl != null) + rl.acquire(); + String ks = KEYSPACE; + ByteBuffer key = randomBytes(16, tlr); + Mutation mutation = new Mutation(ks, key); + + for (int ii = 0; ii < numCells; ii++) + { + int sz = randomSize ? tlr.nextInt(cellSize) : cellSize; + ByteBuffer bytes = randomBytes(sz, tlr); + mutation.add(TABLE, Util.cellname(CELLNAME + ii), bytes, System.currentTimeMillis()); + hash = hash(hash, bytes); + ++cells; + dataSize += sz; + } + rp = commitLog.add(mutation); + counter.incrementAndGet(); + } + } + } +}