Repository: cassandra Updated Branches: refs/heads/trunk 7d857b46f -> 1d7466425
Make jmh jar and add RW + Compaction tests Patch by tjake; reviewed by Nitsan Wakart for CASSANDRA-12586 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d746642 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d746642 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d746642 Branch: refs/heads/trunk Commit: 1d7466425b96dcf4e1ebf68eceed4858fbbd6e29 Parents: 7d857b4 Author: T Jake Luciani <j...@apache.org> Authored: Thu Sep 1 10:36:11 2016 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Sep 6 10:13:33 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 29 +++- .../io/sstable/format/SSTableReader.java | 3 +- .../test/microbench/CompactionBench.java | 167 +++++++++++++++++++ .../test/microbench/ReadWriteTest.java | 115 +++++++++++++ 5 files changed, 309 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d746642/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 28c2d84..2e8fa4e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add JMH benchmarks.jar (CASSANDRA-12586) * Add row offset support to SASI (CASSANDRA-11990) * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) * Add keep-alive to streaming (CASSANDRA-11841) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d746642/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index bc9c6b9..858030a 100644 --- a/build.xml +++ b/build.xml @@ -156,12 +156,12 @@ <pathelement location="${build.classes.main}" /> <pathelement location="${build.classes.thrift}" /> <fileset dir="${build.lib}"> - <include name="**/*.jar" /> - <exclude name="**/*-sources.jar"/> + <include name="**/*.jar" /> + <exclude name="**/*-sources.jar"/> </fileset> <fileset dir="${build.dir.lib}"> - <include name="**/*.jar" /> - <exclude name="**/*-sources.jar"/> + <include name="**/*.jar" /> + <exclude name="**/*-sources.jar"/> </fileset> </path> @@ -1152,6 +1152,25 @@ </rat:report> </target> + <target name="build-jmh" depends="build-test" description="Create JMH uber jar"> + <jar jarfile="${build.test.dir}/deps.jar"> + <zipgroupfileset dir="${build.dir.lib}/jars"> + <include name="*jmh*.jar"/> + <include name="jopt*.jar"/> + <include name="commons*.jar"/> + </zipgroupfileset> + <zipgroupfileset dir="${build.lib}" includes="*.jar"/> + </jar> + <jar jarfile="${build.test.dir}/benchmarks.jar"> + <manifest> + <attribute name="Main-Class" value="org.openjdk.jmh.Main"/> + </manifest> + <zipfileset src="${build.test.dir}/deps.jar" excludes="META-INF/*.SF" /> + <fileset dir="${build.classes.main}"/> + <fileset dir="${test.classes}"/> + </jar> + </target> + <target name="build-test" depends="build" description="Compile test classes"> <javac compiler="modern" @@ -1701,7 +1720,7 @@ </target> <!-- run microbenchmarks suite --> - <target name="microbench" depends="build-test"> + <target name="microbench" depends="build-jmh"> <java classname="org.openjdk.jmh.Main" fork="true" failonerror="true"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d746642/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 8442ed7..5bfae62 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -42,6 +42,7 @@ import org.apache.cassandra.cache.ChunkCache; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Config; @@ -145,7 +146,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // Do NOT start this thread pool in client mode - ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); + ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("read-hotness-tracker")); // Immediately remove readMeter sync task when cancelled. syncExecutor.setRemoveOnCancelPolicy(true); return syncExecutor; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d746642/test/microbench/org/apache/cassandra/test/microbench/CompactionBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/CompactionBench.java b/test/microbench/org/apache/cassandra/test/microbench/CompactionBench.java new file mode 100644 index 0000000..d8dfd66 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/CompactionBench.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputBufferFixed; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.hadoop.util.bloom.Key; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.results.Result; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 25, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@Threads(1) +@State(Scope.Benchmark) +public class CompactionBench extends CQLTester +{ + static String keyspace; + String table; + String writeStatement; + String readStatement; + ColumnFamilyStore cfs; + List<File> snapshotFiles; + List<Descriptor> liveFiles; + + @Setup(Level.Trial) + public void setup() throws Throwable + { + CQLTester.prepareServer(); + keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid))"); + execute("use "+keyspace+";"); + writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)"; + readStatement = "SELECT * from "+table+" limit 100"; + + Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(c -> c.disableAutoCompaction())); + + cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + + //Warm up + System.err.println("Writing 50k"); + for (long i = 0; i < 50000; i++) + execute(writeStatement, i, i, i ); + + + cfs.forceBlockingFlush(); + + System.err.println("Writing 50k again..."); + for (long i = 0; i < 50000; i++) + execute(writeStatement, i, i, i ); + + cfs.forceBlockingFlush(); + + cfs.snapshot("originals"); + + snapshotFiles = cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots("originals").listFiles(); + } + + @TearDown(Level.Trial) + public void teardown() throws IOException, ExecutionException, InterruptedException + { + int active = Thread.currentThread().getThreadGroup().activeCount(); + Thread[] threads = new Thread[active]; + Thread.currentThread().getThreadGroup().enumerate(threads); + for (Thread t : threads) + { + if (!t.isDaemon()) + System.err.println("Thread "+t.getName()); + } + + CQLTester.cleanup(); + } + + + @TearDown(Level.Invocation) + public void resetSnapshot() + { + cfs.truncateBlocking(); + + List<File> directories = cfs.getDirectories().getCFDirectories(); + + for (File file : directories) + { + for (File f : file.listFiles()) + { + if (f.isDirectory()) + continue; + + FileUtils.delete(f); + } + } + + + for (File file : snapshotFiles) + FileUtils.createHardLink(file, new File(file.toPath().getParent().getParent().getParent().toFile(), file.getName())); + + cfs.loadNewSSTables(); + } + + @Benchmark + public void compactTest() throws Throwable + { + cfs.forceMajorCompaction(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d746642/test/microbench/org/apache/cassandra/test/microbench/ReadWriteTest.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/ReadWriteTest.java b/test/microbench/org/apache/cassandra/test/microbench/ReadWriteTest.java new file mode 100644 index 0000000..89973fd --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/ReadWriteTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputBufferFixed; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.results.Result; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@Threads(1) +@State(Scope.Benchmark) +public class ReadWriteTest extends CQLTester +{ + static String keyspace; + String table; + String writeStatement; + String readStatement; + long numRows = 0; + ColumnFamilyStore cfs; + + @Setup(Level.Trial) + public void setup() throws Throwable + { + CQLTester.setUpClass(); + keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid))"); + execute("use "+keyspace+";"); + writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)"; + readStatement = "SELECT * from "+table+" limit 100"; + + cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + + //Warm up + System.err.println("Writing 50k"); + for (long i = 0; i < 5000; i++) + execute(writeStatement, i, i, i ); + } + + @TearDown(Level.Trial) + public void teardown() throws IOException, ExecutionException, InterruptedException + { + CQLTester.cleanup(); + } + + @Benchmark + public Object write() throws Throwable + { + numRows++; + return execute(writeStatement, numRows, numRows, numRows ); + } + + + @Benchmark + public Object read() throws Throwable + { + return execute(readStatement); + } +}