Repository: cassandra Updated Branches: refs/heads/trunk 2c0edce09 -> dc9ed4634
Improve write path performance Patch by tjake; reviewed by Stefania Alborghetti for CASSANDRA-12269 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc9ed463 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc9ed463 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc9ed463 Branch: refs/heads/trunk Commit: dc9ed463417aa8028e77e91718e4f3d6ea563210 Parents: 2c0edce Author: T Jake Luciani <j...@apache.org> Authored: Tue Jun 21 21:53:43 2016 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Jul 26 14:55:54 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/jvm.options | 1 + .../org/apache/cassandra/config/CFMetaData.java | 18 ++- src/java/org/apache/cassandra/db/Columns.java | 11 ++ .../cassandra/db/SerializationHeader.java | 12 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../cassandra/db/commitlog/CommitLog.java | 68 +++++---- .../db/partitions/AbstractBTreePartition.java | 3 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 49 +++--- src/java/org/apache/cassandra/db/rows/Row.java | 13 ++ src/java/org/apache/cassandra/db/rows/Rows.java | 24 +-- .../rows/UnfilteredRowIteratorSerializer.java | 3 + .../cassandra/db/rows/UnfilteredSerializer.java | 75 +++++++--- .../org/apache/cassandra/hints/HintsWriter.java | 2 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 3 +- .../cassandra/io/util/DataOutputBuffer.java | 42 +++--- .../io/util/DataOutputBufferFixed.java | 4 +- .../cassandra/io/util/SafeMemoryWriter.java | 2 +- .../cassandra/net/IncomingTcpConnection.java | 3 +- .../org/apache/cassandra/net/MessageIn.java | 2 +- .../org/apache/cassandra/net/MessageOut.java | 24 ++- .../apache/cassandra/net/MessagingService.java | 2 + .../cassandra/net/OutboundTcpConnection.java | 3 +- .../apache/cassandra/service/ClientWarn.java | 3 +- .../org/apache/cassandra/tracing/Tracing.java | 3 +- .../apache/cassandra/utils/ChecksumType.java | 13 +- .../org/apache/cassandra/utils/Wrapped.java | 48 ++++++ .../apache/cassandra/utils/WrappedBoolean.java | 42 ++++++ .../cassandra/utils/WrappedException.java | 30 ++++ .../org/apache/cassandra/utils/WrappedInt.java | 52 +++++++ .../org/apache/cassandra/utils/btree/BTree.java | 131 +++++++++++++++- .../db/commitlog/CommitLogStressTest.java | 4 +- .../test/microbench/FastThreadExecutor.java | 96 ++++++++++++ .../test/microbench/MutationBench.java | 148 +++++++++++++++++++ .../org/apache/cassandra/utils/BTreeTest.java | 25 ++++ 35 files changed, 826 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c586d10..efbbb4d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Faster write path (CASSANDRA-12269) * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/conf/jvm.options ---------------------------------------------------------------------- diff --git a/conf/jvm.options b/conf/jvm.options index 692d06b..9e13e0e 100644 --- a/conf/jvm.options +++ b/conf/jvm.options @@ -118,6 +118,7 @@ # resize them at runtime. -XX:+UseTLAB -XX:+ResizeTLAB +-XX:+UseNUMA # http://www.evanjones.ca/jvm-mmap-pause.html -XX:+PerfDisableSharedMem http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index b175ef1c..beb9d1a 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -47,6 +47,7 @@ import org.apache.cassandra.cql3.statements.CFStatement; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; @@ -122,6 +123,9 @@ public final class CFMetaData public final DataResource resource; + //For hot path serialization it's often easier to store this info here + private volatile ColumnFilter allColumnFilter; + /* * All of these methods will go away once CFMetaData becomes completely immutable. */ @@ -294,9 +298,12 @@ public final class CFMetaData this.clusteringColumns = clusteringColumns; this.partitionColumns = partitionColumns; - this.serializers = new Serializers(this); - this.resource = DataResource.table(ksName, cfName); + //This needs to happen before serializers are set + //because they use comparator.subtypes() rebuild(); + + this.resource = DataResource.table(ksName, cfName); + this.serializers = new Serializers(this); } // This rebuild informations that are intrinsically duplicate of the table definition but @@ -323,6 +330,8 @@ public final class CFMetaData if (isCompactTable()) this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper()); + + this.allColumnFilter = ColumnFilter.all(this); } public Indexes getIndexes() @@ -330,6 +339,11 @@ public final class CFMetaData return indexes; } + public ColumnFilter getAllColumnFilter() + { + return allColumnFilter; + } + public static CFMetaData create(String ksName, String name, UUID cfId, http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index e3c30fa..e9e3abf 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.*; +import java.util.function.Consumer; import java.util.function.Predicate; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -366,6 +367,16 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col digest.update(c.name.bytes.duplicate()); } + /** + * Apply a function to each column definition in forwards or reversed order. + * @param function + * @param reversed + */ + public void apply(Consumer<ColumnDefinition> function, boolean reversed) + { + BTree.apply(columns, function, reversed); + } + @Override public boolean equals(Object other) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index af2d434..9c443c7 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.ColumnFilter; @@ -108,17 +105,12 @@ public class SerializationHeader { this(isForSSTable, metadata.getKeyValidator(), - typesOf(metadata.clusteringColumns()), + metadata.comparator.subtypes(), columns, stats, null); } - private static List<AbstractType<?>> typesOf(List<ColumnDefinition> columns) - { - return ImmutableList.copyOf(Lists.transform(columns, column -> column.type)); - } - public PartitionColumns columns() { return columns; @@ -398,7 +390,7 @@ public class SerializationHeader EncodingStats stats = EncodingStats.serializer.deserialize(in); AbstractType<?> keyType = metadata.getKeyValidator(); - List<AbstractType<?>> clusteringTypes = typesOf(metadata.clusteringColumns()); + List<AbstractType<?>> clusteringTypes = metadata.comparator.subtypes(); Columns statics, regulars; if (selection == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 120125f..6451c83 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -652,7 +652,7 @@ public final class SystemKeyspace private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position) { DataOutputBuffer out = null; - try (DataOutputBuffer ignored = out = DataOutputBuffer.RECYCLER.get()) + try (DataOutputBuffer ignored = out = DataOutputBuffer.scratchBuffer.get()) { CommitLogPosition.serializer.serialize(position, out); out.writeLong(truncatedAt); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index d76b9cb..0bb913d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -38,6 +38,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CommitLogMetrics; @@ -243,43 +244,56 @@ public class CommitLog implements CommitLogMBean { assert mutation != null; - int size = (int) Mutation.serializer.serializedSize(mutation, MessagingService.current_version); - - int totalSize = size + ENTRY_OVERHEAD_SIZE; - if (totalSize > MAX_MUTATION_SIZE) + DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get(); + try { - throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s", - FBUtilities.prettyPrintMemory(totalSize), - FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE))); - } + Mutation.serializer.serialize(mutation, dob, MessagingService.current_version); + int size = dob.getLength(); + + int totalSize = size + ENTRY_OVERHEAD_SIZE; + if (totalSize > MAX_MUTATION_SIZE) + { + throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s", + FBUtilities.prettyPrintMemory(totalSize), + FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE))); + } - Allocation alloc = segmentManager.allocate(mutation, totalSize); + Allocation alloc = segmentManager.allocate(mutation, totalSize); - CRC32 checksum = new CRC32(); - final ByteBuffer buffer = alloc.getBuffer(); - try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer)) - { - // checksummed length - dos.writeInt(size); - updateChecksumInt(checksum, size); - buffer.putInt((int) checksum.getValue()); - - // checksummed mutation - Mutation.serializer.serialize(mutation, dos, MessagingService.current_version); - updateChecksum(checksum, buffer, buffer.position() - size, size); - buffer.putInt((int) checksum.getValue()); + CRC32 checksum = new CRC32(); + final ByteBuffer buffer = alloc.getBuffer(); + try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer)) + { + // checksummed length + dos.writeInt(size); + updateChecksumInt(checksum, size); + buffer.putInt((int) checksum.getValue()); + + // checksummed mutation + dos.write(dob.getData(), 0, size); + updateChecksum(checksum, buffer, buffer.position() - size, size); + buffer.putInt((int) checksum.getValue()); + } + catch (IOException e) + { + throw new FSWriteError(e, alloc.getSegment().getPath()); + } + finally + { + alloc.markWritten(); + } + + executor.finishWriteFor(alloc); + return alloc.getCommitLogPosition(); } catch (IOException e) { - throw new FSWriteError(e, alloc.getSegment().getPath()); + throw new FSWriteError(e, segmentManager.allocatingFrom().getPath()); } finally { - alloc.markWritten(); + dob.recycle(); } - - executor.finishWriteFor(alloc); - return alloc.getCommitLogPosition(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 0400402..9549c0d 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -169,7 +169,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> public UnfilteredRowIterator unfilteredIterator() { - return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + return unfilteredIterator(metadata().getAllColumnFilter(), Slices.ALL, false); } public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) @@ -197,7 +197,6 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> ClusteringBound end = slice.end() == ClusteringBound.TOP ? null : slice.end(); Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed)); Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed); - return merge(rowIter, deleteIter, selection, reversed, current, staticRow); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index c699634..0eccb6e 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.util.*; +import java.util.function.Consumer; import java.util.function.Predicate; import com.google.common.base.Function; @@ -32,9 +33,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSearchIterator; import org.apache.cassandra.utils.btree.UpdateFunction; @@ -168,16 +167,23 @@ public class BTreeRow extends AbstractRow return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd); } + public void apply(Consumer<ColumnData> function, boolean reversed) + { + BTree.apply(btree, function, reversed); + } + + public void apply(Consumer<ColumnData> funtion, com.google.common.base.Predicate<ColumnData> stopCondition, boolean reversed) + { + BTree.apply(btree, funtion, stopCondition, reversed); + } + private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion) { - int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion)); - for (ColumnData cd : BTree.<ColumnData>iterable(btree)) - { - min = Math.min(min, minDeletionTime(cd)); - if (min == Integer.MIN_VALUE) - break; - } - return min; + //we have to wrap this for the lambda + final WrappedInt min = new WrappedInt(Math.min(minDeletionTime(info), minDeletionTime(rowDeletion))); + + BTree.<ColumnData>apply(btree, cd -> min.set( Math.min(min.get(), minDeletionTime(cd)) ), cd -> min.get() == Integer.MIN_VALUE, false); + return min.get(); } public Clustering clustering() @@ -324,17 +330,26 @@ public class BTreeRow extends AbstractRow public boolean hasComplexDeletion() { + final WrappedBoolean result = new WrappedBoolean(false); + // We start by the end cause we know complex columns sort before simple ones - for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC)) - { - if (cd.column().isSimple()) - return false; + apply(c -> {}, cd -> { + if (cd.column.isSimple()) + { + result.set(false); + return true; + } if (!((ComplexColumnData) cd).complexDeletion().isLive()) + { + result.set(true); return true; - } + } + + return false; + }, true); - return false; + return result.get(); } public Row markCounterLocalToBeCleared() http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 53b0eb3..51cf435 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -19,6 +19,9 @@ package org.apache.cassandra.db.rows; import java.util.*; import java.security.MessageDigest; +import java.util.function.Consumer; + +import com.google.common.base.Predicate; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; @@ -251,6 +254,16 @@ public interface Row extends Unfiltered, Collection<ColumnData> public String toString(CFMetaData metadata, boolean fullDetails); /** + * Apply a function to every column in a row + */ + public void apply(Consumer<ColumnData> function, boolean reverse); + + /** + * Apply a funtion to every column in a row until a stop condition is reached + */ + public void apply(Consumer<ColumnData> function, Predicate<ColumnData> stopCondition, boolean reverse); + + /** * A row deletion/tombstone. * <p> * A row deletion mostly consists of the time of said deletion, but there is 2 variants: shadowable http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index e325091..4f6c8d2 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -26,6 +26,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.WrappedInt; /** * Static utilities to work on Row objects. @@ -72,14 +73,15 @@ public abstract class Rows collector.update(row.primaryKeyLivenessInfo()); collector.update(row.deletion().time()); - int columnCount = 0; - int cellCount = 0; - for (ColumnData cd : row) - { + //we have to wrap these for the lambda + final WrappedInt columnCount = new WrappedInt(0); + final WrappedInt cellCount = new WrappedInt(0); + + row.apply(cd -> { if (cd.column().isSimple()) { - ++columnCount; - ++cellCount; + columnCount.increment(); + cellCount.increment(); Cells.collectStats((Cell) cd, collector); } else @@ -88,18 +90,18 @@ public abstract class Rows collector.update(complexData.complexDeletion()); if (complexData.hasCells()) { - ++columnCount; + columnCount.increment(); for (Cell cell : complexData) { - ++cellCount; + cellCount.increment(); Cells.collectStats(cell, collector); } } } + }, false); - } - collector.updateColumnSetPerRow(columnCount); - return cellCount; + collector.updateColumnSetPerRow(columnCount.get()); + return cellCount.get(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 542f0a2..45c026f 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -80,12 +80,15 @@ public class UnfilteredRowIteratorSerializer } // Should only be used for the on-wire format. + public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { + SerializationHeader header = new SerializationHeader(false, iterator.metadata(), iterator.columns(), iterator.stats()); + serialize(iterator, header, selection, out, version, rowEstimate); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 5ca7e03..ed6bd12 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.WrappedException; /** * Serialize/deserialize a single Unfiltered (both on-wire and on-disk). @@ -181,7 +182,7 @@ public class UnfilteredSerializer if (header.isForSSTable()) { - DataOutputBuffer dob = DataOutputBuffer.RECYCLER.get(); + DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get(); try { serializeRowBody(row, flags, header, dob); @@ -190,7 +191,7 @@ public class UnfilteredSerializer // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler). // This is currently not used however and using it is tbd. out.writeUnsignedVInt(previousUnfilteredSize); - out.write(dob.buffer()); + out.write(dob.getData(), 0, dob.getLength()); } finally { @@ -227,20 +228,37 @@ public class UnfilteredSerializer Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out); SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator(); - for (ColumnData data : row) + + try { - // We can obtain the column for data directly from data.column(). However, if the cell/complex data - // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, - // and if that type have been recently altered, that may not be the type we want to serialize the column - // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what - // happens if we don't do that. - ColumnDefinition column = si.next(data.column()); - assert column != null; + row.apply(cd -> { + // We can obtain the column for data directly from data.column(). However, if the cell/complex data + // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, + // and if that type have been recently altered, that may not be the type we want to serialize the column + // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what + // happens if we don't do that. + ColumnDefinition column = si.next(cd.column()); + assert column != null : cd.column.toString(); + + try + { + if (cd.column.isSimple()) + Cell.serializer.serialize((Cell) cd, column, out, pkLiveness, header); + else + writeComplexColumn((ComplexColumnData) cd, column, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); - if (data.column.isSimple()) - Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header); - else - writeComplexColumn((ComplexColumnData) data, column, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out); + throw e; } } @@ -496,12 +514,31 @@ public class UnfilteredSerializer builder.addRowDeletion(hasDeletion ? new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : Row.Deletion.LIVE); Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in); - for (ColumnDefinition column : columns) + + final LivenessInfo livenessInfo = rowLiveness; + + try { - if (column.isSimple()) - readSimpleColumn(column, in, header, helper, builder, rowLiveness); - else - readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness); + columns.apply(column -> { + try + { + if (column.isSimple()) + readSimpleColumn(column, in, header, helper, builder, livenessInfo); + else + readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, livenessInfo); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); + + throw e; } return builder.build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/hints/HintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index ae9e05a..a081451 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -75,7 +75,7 @@ class HintsWriter implements AutoCloseable CRC32 crc = new CRC32(); DataOutputBuffer dob = null; - try (DataOutputBuffer ignored = dob = DataOutputBuffer.RECYCLER.get()) + try (DataOutputBuffer ignored = dob = DataOutputBuffer.scratchBuffer.get()) { // write the descriptor descriptor.serialize(dob); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 6d3a714..fa88817 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Throwables; +import com.datastax.shaded.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.Row; @@ -189,7 +190,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter //// typedef static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {} - private class DiskWriter extends Thread + private class DiskWriter extends FastThreadLocalThread { volatile Throwable exception = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index cc42c66..0d2423c 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -24,7 +24,7 @@ import java.nio.channels.WritableByteChannel; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.config.Config; /** @@ -43,25 +43,22 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus /* * Only recycle OutputBuffers up to 1Mb. Larger buffers will be trimmed back to this size. */ - private static final int MAX_RECYCLE_BUFFER_SIZE = 1024 * 1024; + private static final int MAX_RECYCLE_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "dob_max_recycle_bytes", 1024 * 1024); private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128; - public static final Recycler<DataOutputBuffer> RECYCLER = new Recycler<DataOutputBuffer>() + /** + * Scratch buffers used mostly for serializing in memory. It's important to call #recycle() when finished + * to keep the memory overhead from being too large in the system. + */ + public static final FastThreadLocal<DataOutputBuffer> scratchBuffer = new FastThreadLocal<DataOutputBuffer>() { - protected DataOutputBuffer newObject(Handle handle) + protected DataOutputBuffer initialValue() throws Exception { - return new DataOutputBuffer(handle); + return new DataOutputBuffer(); } }; - private final Recycler.Handle handle; - - private DataOutputBuffer(Recycler.Handle handle) - { - this(DEFAULT_INITIAL_BUFFER_SIZE, handle); - } - public DataOutputBuffer() { this(DEFAULT_INITIAL_BUFFER_SIZE); @@ -69,28 +66,23 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus public DataOutputBuffer(int size) { - this(size, null); - } - - protected DataOutputBuffer(int size, Recycler.Handle handle) - { - this(ByteBuffer.allocate(size), handle); + super(ByteBuffer.allocate(size)); } - protected DataOutputBuffer(ByteBuffer buffer, Recycler.Handle handle) + public DataOutputBuffer(ByteBuffer buffer) { super(buffer); - this.handle = handle; } public void recycle() { - assert handle != null; - - if (buffer().capacity() <= MAX_RECYCLE_BUFFER_SIZE) + if (buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE) + { + buffer.clear(); + } + else { - buffer.rewind(); - RECYCLER.recycle(this, handle); + buffer = ByteBuffer.allocate(DEFAULT_INITIAL_BUFFER_SIZE); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java index 5193401..8beb7a9 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java @@ -38,12 +38,12 @@ public class DataOutputBufferFixed extends DataOutputBuffer public DataOutputBufferFixed(int size) { - super(size, null); + super(size); } public DataOutputBufferFixed(ByteBuffer buffer) { - super(buffer, null); + super(buffer); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java index 88912f9..24eb93c 100644 --- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java +++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java @@ -33,7 +33,7 @@ public class SafeMemoryWriter extends DataOutputBuffer private SafeMemoryWriter(SafeMemory memory) { - super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN), null); + super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN)); this.memory = memory; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 9e8e2e1..02147ef 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -29,6 +29,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocalThread; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4FastDecompressor; import net.jpountz.lz4.LZ4Factory; @@ -42,7 +43,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.NIODataInputStream; -public class IncomingTcpConnection extends Thread implements Closeable +public class IncomingTcpConnection extends FastThreadLocalThread implements Closeable { private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index fe85595..df1b4e1 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -73,7 +73,7 @@ public class MessageIn<T> { InetAddress from = CompactEndpointSerializationHelper.deserialize(in); - MessagingService.Verb verb = MessagingService.Verb.values()[in.readInt()]; + MessagingService.Verb verb = MessagingService.verbValues[in.readInt()]; int parameterCount = in.readInt(); Map<String, byte[]> parameters; if (parameterCount == 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index bc5c41b..08d858b 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -30,6 +30,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; @@ -113,11 +114,26 @@ public class MessageOut<T> out.write(entry.getValue()); } - long longSize = payloadSize(version); - assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages - out.writeInt((int) longSize); if (payload != null) - serializer.serialize(payload, out, version); + { + DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get(); + try + { + serializer.serialize(payload, dob, version); + + int size = dob.getLength(); + out.writeInt(size); + out.write(dob.getData(), 0, size); + } + finally + { + dob.recycle(); + } + } + else + { + out.writeInt(0); + } } public int serializedSize(int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 08c08f3..b1f88ee 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -153,6 +153,8 @@ public final class MessagingService implements MessagingServiceMBean ; } + public static final Verb[] verbValues = Verb.values(); + public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class) {{ put(Verb.MUTATION, Stage.MUTATION); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index c2d10fd..76b2854 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -40,6 +40,7 @@ import javax.net.ssl.SSLHandshakeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocalThread; import net.jpountz.lz4.LZ4BlockOutputStream; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; @@ -63,7 +64,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import com.google.common.util.concurrent.Uninterruptibles; -public class OutboundTcpConnection extends Thread +public class OutboundTcpConnection extends FastThreadLocalThread { private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/service/ClientWarn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java index ddad197..878b5e9 100644 --- a/src/java/org/apache/cassandra/service/ClientWarn.java +++ b/src/java/org/apache/cassandra/service/ClientWarn.java @@ -20,13 +20,14 @@ package org.apache.cassandra.service; import java.util.ArrayList; import java.util.List; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.concurrent.ExecutorLocal; import org.apache.cassandra.utils.FBUtilities; public class ClientWarn implements ExecutorLocal<ClientWarn.State> { private static final String TRUNCATED = " [truncated]"; - private static final ThreadLocal<ClientWarn.State> warnLocal = new ThreadLocal<>(); + private static final FastThreadLocal<State> warnLocal = new FastThreadLocal<>(); public static ClientWarn instance = new ClientWarn(); private ClientWarn() http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index e69645f..adf5ed9 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.concurrent.ExecutorLocal; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.TimeUUIDType; @@ -84,7 +85,7 @@ public abstract class Tracing implements ExecutorLocal<TraceState> private final InetAddress localAddress = FBUtilities.getLocalAddress(); - private final ThreadLocal<TraceState> state = new ThreadLocal<>(); + private final FastThreadLocal<TraceState> state = new FastThreadLocal<>(); protected final ConcurrentMap<UUID, TraceState> sessions = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/ChecksumType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java b/src/java/org/apache/cassandra/utils/ChecksumType.java index 3fa245b..413a171 100644 --- a/src/java/org/apache/cassandra/utils/ChecksumType.java +++ b/src/java/org/apache/cassandra/utils/ChecksumType.java @@ -22,6 +22,8 @@ import java.util.zip.Checksum; import java.util.zip.CRC32; import java.util.zip.Adler32; +import io.netty.util.concurrent.FastThreadLocal; + public enum ChecksumType { Adler32 @@ -60,12 +62,13 @@ public enum ChecksumType public abstract Checksum newInstance(); public abstract void update(Checksum checksum, ByteBuffer buf); - private ThreadLocal<Checksum> instances = ThreadLocal.withInitial(this::newInstance); - - public Checksum threadLocalInstance() + private FastThreadLocal<Checksum> instances = new FastThreadLocal<Checksum>() { - return instances.get(); - } + protected Checksum initialValue() throws Exception + { + return newInstance(); + } + }; public long of(ByteBuffer buf) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/Wrapped.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Wrapped.java b/src/java/org/apache/cassandra/utils/Wrapped.java new file mode 100644 index 0000000..1996a86 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Wrapped.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Simple wrapper class to be used when a lambda function + * needs to modify a variable outside it's scope. + */ +public class Wrapped<T> +{ + private T value; + + public static <V> Wrapped<V> create(V initial) + { + return new Wrapped<>(initial); + } + + private Wrapped(T initial) + { + this.value = initial; + } + + public T get() + { + return value; + } + + public void set(T value) + { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedBoolean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/WrappedBoolean.java b/src/java/org/apache/cassandra/utils/WrappedBoolean.java new file mode 100644 index 0000000..4b1443e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/WrappedBoolean.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Simple wrapper for native boolean type + */ +public class WrappedBoolean +{ + private boolean value; + + public WrappedBoolean(boolean initial) + { + this.value = initial; + } + + public boolean get() + { + return value; + } + + public void set(boolean value) + { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/WrappedException.java b/src/java/org/apache/cassandra/utils/WrappedException.java new file mode 100644 index 0000000..3cf56bc --- /dev/null +++ b/src/java/org/apache/cassandra/utils/WrappedException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Wrapped runtime exception for lambda functions + */ +public class WrappedException extends RuntimeException +{ + public WrappedException(Exception cause) + { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/WrappedInt.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java b/src/java/org/apache/cassandra/utils/WrappedInt.java new file mode 100644 index 0000000..a106575 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/WrappedInt.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +/** + * Simple wrapper for native int type + */ +public class WrappedInt +{ + private int value; + + public WrappedInt(int initial) + { + this.value = initial; + } + + public int get() + { + return value; + } + + public void set(int value) + { + this.value = value; + } + + public void increment() + { + ++value; + } + + public void decrement() + { + --value; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/src/java/org/apache/cassandra/utils/btree/BTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 33f4152..5a6fffe 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -19,8 +19,10 @@ package org.apache.cassandra.utils.btree; import java.util.*; +import java.util.function.Consumer; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; @@ -136,7 +138,8 @@ public class BTree for (K k : source) values[i++] = updateF.apply(k); } - updateF.allocated(ObjectSizes.sizeOfArray(values)); + if (updateF != UpdateFunction.noOp()) + updateF.allocated(ObjectSizes.sizeOfArray(values)); return values; } @@ -188,7 +191,7 @@ public class BTree tree1 = tree2; tree2 = tmp; } - return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), updateF); + return update(tree1, comparator, new BTreeSet<>(tree2, comparator), updateF); } public static <V> Iterator<V> iterator(Object[] btree) @@ -1159,4 +1162,128 @@ public class BTree } return compare(cmp, previous, max) < 0; } + + /** + * Simple method to walk the btree forwards or reversed and apply a function to each element + * + * Public method + * + */ + public static <V> void apply(Object[] btree, Consumer<V> function, boolean reversed) + { + if (reversed) + applyReverse(btree, function, null); + else + applyForwards(btree, function, null); + } + + /** + * Simple method to walk the btree forwards or reversed and apply a function till a stop condition is reached + * + * Public method + * + */ + public static <V> void apply(Object[] btree, Consumer<V> function, Predicate<V> stopCondition, boolean reversed) + { + if (reversed) + applyReverse(btree, function, stopCondition); + else + applyForwards(btree, function, stopCondition); + } + + + + + /** + * Simple method to walk the btree forwards and apply a function till a stop condition is reached + * + * Private method + * + * @param btree + * @param function + * @param stopCondition + */ + private static <V> boolean applyForwards(Object[] btree, Consumer<V> function, Predicate<V> stopCondition) + { + boolean isLeaf = isLeaf(btree); + int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree); + int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1; + for (int i = 0 ; i < limit ; i++) + { + // we want to visit in iteration order, so we visit our key nodes inbetween our children + int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0); + Object current = btree[idx]; + if (idx < childOffset) + { + V castedCurrent = (V) current; + if (stopCondition != null && stopCondition.apply(castedCurrent)) + return true; + + function.accept(castedCurrent); + } + else + { + if (applyForwards((Object[]) current, function, stopCondition)) + return true; + } + } + + return false; + } + + /** + * Simple method to walk the btree in reverse and apply a function till a stop condition is reached + * + * Private method + * + * @param btree + * @param function + * @param stopCondition + */ + private static <V> boolean applyReverse(Object[] btree, Consumer<V> function, Predicate<V> stopCondition) + { + boolean isLeaf = isLeaf(btree); + int childOffset = isLeaf ? 0 : getChildStart(btree); + int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1; + for (int i = limit - 1, visited = 0; i >= 0 ; i--, visited++) + { + int idx = i; + + // we want to visit in reverse iteration order, so we visit our children nodes inbetween our keys + if (!isLeaf) + { + int typeOffset = visited / 2; + + if (i % 2 == 0) + { + // This is a child branch. Since children are in the second half of the array, we must + // adjust for the key's we've visited along the way + idx += typeOffset; + } + else + { + // This is a key. Since the keys are in the first half of the array and we are iterating + // in reverse we subtract the childOffset and adjust for children we've walked so far + idx = i - childOffset + typeOffset; + } + } + + Object current = btree[idx]; + if (isLeaf || idx < childOffset) + { + V castedCurrent = (V) current; + if (stopCondition != null && stopCondition.apply(castedCurrent)) + return true; + + function.accept(castedCurrent); + } + else + { + if (applyReverse((Object[]) current, function, stopCondition)) + return true; + } + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index b86a15b..babd571 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -35,6 +35,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; @@ -408,7 +409,8 @@ public class CommitLogStressTest return slice; } - public class CommitlogThread extends Thread { + public class CommitlogThread extends FastThreadLocalThread + { final AtomicLong counter = new AtomicLong(); int hash = 0; int cells = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java new file mode 100644 index 0000000..d0b4442 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.FastThreadLocalThread; + +/** + * Created to test perf of FastThreadLocal + * + * Used in MutationBench via: + * jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"} + */ +public class FastThreadExecutor extends AbstractExecutorService +{ + final FastThreadLocalThread thread; + final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + final CountDownLatch shutdown = new CountDownLatch(1); + + public FastThreadExecutor(int size, String name) + { + assert size == 1; + + thread = new FastThreadLocalThread(() -> { + Runnable work = null; + try + { + while ((work = queue.take()) != null) + work.run(); + } + catch (InterruptedException e) + { + shutdown.countDown(); + } + }); + + thread.setName(name + "-1"); + thread.setDaemon(true); + + thread.start(); + } + + + public void shutdown() + { + thread.interrupt(); + } + + public List<Runnable> shutdownNow() + { + thread.interrupt(); + return Collections.emptyList(); + } + + public boolean isShutdown() + { + return shutdown.getCount() == 0; + } + + public boolean isTerminated() + { + return shutdown.getCount() == 0; + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return shutdown.await(timeout, unit); + } + + public void execute(Runnable command) + { + while(!queue.add(command)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java new file mode 100644 index 0000000..89dcb0f --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.concurrent.*; + +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputBufferFixed; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.results.Result; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1 + , jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor" + //,"-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder","-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", + // "-XX:StartFlightRecording=duration=60s,filename=./profiling-data.jfr,name=profile,settings=profile", + // "-XX:FlightRecorderOptions=settings=/home/jake/workspace/cassandra/profiling-advanced.jfc,samplethreads=true" + } +) +@Threads(1) +@State(Scope.Benchmark) +public class MutationBench +{ + static + { + Config.setClientMode(true); + // Partitioner is not set in client mode. + if (DatabaseDescriptor.getPartitioner() == null) + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + } + + static String keyspace = "keyspace1"; + + private Mutation mutation; + private MessageOut<Mutation> messageOut; + + private ByteBuffer buffer; + private DataOutputBuffer outputBuffer; + private DataInputBuffer inputBuffer; + + + @State(Scope.Thread) + public static class ThreadState + { + MessageIn<Mutation> in; + int counter = 0; + } + + @Setup + public void setup() throws IOException + { + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + CFMetaData metadata = CFMetaData.compile("CREATE TABLE userpics " + + "( userid bigint," + + "picid bigint," + + "commentid bigint, " + + "PRIMARY KEY(userid, picid))", keyspace); + + Schema.instance.load(metadata); + Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata))); + + + mutation = (Mutation)UpdateBuilder.create(metadata, 1L).newRow(1L).add("commentid", 32L).makeMutation(); + messageOut = mutation.createMessage(); + buffer = ByteBuffer.allocate(messageOut.serializedSize(MessagingService.current_version)); + outputBuffer = new DataOutputBufferFixed(buffer); + inputBuffer = new DataInputBuffer(buffer, false); + + messageOut.serialize(outputBuffer, MessagingService.current_version); + } + + @Benchmark + public void serialize(ThreadState state) throws IOException + { + buffer.rewind(); + messageOut.serialize(outputBuffer, MessagingService.current_version); + state.counter++; + } + + @Benchmark + public void deserialize(ThreadState state) throws IOException + { + buffer.rewind(); + state.in = MessageIn.read(inputBuffer, MessagingService.current_version, 0); + state.counter++; + } + + public static void main(String... args) throws Exception { + Options opts = new OptionsBuilder() + .include(".*"+MutationBench.class.getSimpleName()+".*") + .jvmArgs("-server") + .forks(1) + .mode(Mode.Throughput) + .addProfiler(StackProfiler.class) + .build(); + + Collection<RunResult> records = new Runner(opts).run(); + for ( RunResult result : records) { + Result r = result.getPrimaryResult(); + System.out.println("API replied benchmark score: " + + r.getScore() + " " + + r.getScoreUnit() + " over " + + r.getStatistics().getN() + " iterations"); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc9ed463/test/unit/org/apache/cassandra/utils/BTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java index a01ad2e..ec4cdb8 100644 --- a/test/unit/org/apache/cassandra/utils/BTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java @@ -21,6 +21,7 @@ import java.util.*; import java.util.concurrent.ThreadLocalRandom; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.junit.Test; import junit.framework.Assert; @@ -128,6 +129,30 @@ public class BTreeTest checkResult(i, BTree.update(BTree.build(seq(i), noOp), CMP, seq(i), updateF)); } + @Test + public void testApplyForwards() + { + List<Integer> input = seq(71); + Object[] btree = BTree.build(input, noOp); + + final List<Integer> result = new ArrayList<>(); + BTree.<Integer>apply(btree, i -> result.add(i), false); + + org.junit.Assert.assertArrayEquals(input.toArray(),result.toArray()); + } + + @Test + public void testApplyReverse() + { + List<Integer> input = seq(71); + Object[] btree = BTree.build(input, noOp); + + final List<Integer> result = new ArrayList<>(); + BTree.<Integer>apply(btree, i -> result.add(i), true); + + org.junit.Assert.assertArrayEquals(Lists.reverse(input).toArray(),result.toArray()); + } + /** * Tests that the apply method of the <code>UpdateFunction</code> is only called once with each key update. * (see CASSANDRA-8018).