This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39d1149c74 Optimize memtable flush logic
39d1149c74 is described below

commit 39d1149c74a850414edd2940b1ad32e25b777d68
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Thu Dec 18 12:25:56 2025 +0000

    Optimize memtable flush logic
    
    add more flushing stats: partitions/rows, bytes rate, CPU and heap 
allocation for the flushing thread
    avoid columns filtering overheads for unfilteredIterator
    do not re-map colums in serializeRowBody if they haven't changed
    reduce allocations during serialization of NativeClustering
    add fast return for BTreeRow.hasComplexDeletion, avoid 
column.name.bytes.hashCode if not needed, avoid capturing lambda allocation in 
UnfilteredSerializer.serializeRowBody
    check if Guardrails enabled at the beginning of writing, avoid hidden 
auto-boxing for logging of primitive parameters
    split call sites for in Cell serialize logic, make isCounterCell cheaper 
(avoid megamorphic call + cache isCounterColumn)
    invoke metadataCollector.updateClusteringValues only for first and last 
clustering key in a partition
    enforce inlining for MinMaxIntTracker/MinMaxLongTracker
    
    Patch by Dmitry Konstantinov; reviewed by Branimir Lambov for 
CASSANDRA-21083
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ClusteringPrefix.java  | 39 ++++++++---
 .../org/apache/cassandra/db/NativeClustering.java  | 54 ++++++++++++++-
 .../apache/cassandra/db/SerializationHeader.java   | 37 ++++++++++-
 .../db/compaction/unified/ShardedMultiWriter.java  |  8 +++
 .../apache/cassandra/db/marshal/AbstractType.java  | 27 ++++++++
 .../cassandra/db/marshal/IndexedValueHolder.java   | 32 +++++++++
 .../cassandra/db/marshal/NativeAccessor.java       |  7 ++
 .../apache/cassandra/db/marshal/ValueAccessor.java | 18 +++++
 .../org/apache/cassandra/db/memtable/Flushing.java | 39 ++++++++++-
 .../apache/cassandra/db/memtable/TrieMemtable.java |  2 +-
 .../db/partitions/AbstractBTreePartition.java      |  2 +-
 .../org/apache/cassandra/db/rows/AbstractCell.java |  4 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java     |  5 +-
 src/java/org/apache/cassandra/db/rows/Cell.java    | 76 +++++++++++++++++++---
 .../cassandra/db/rows/SerializationHelper.java     | 17 +++++
 .../cassandra/db/rows/UnfilteredSerializer.java    | 48 +++++++-------
 .../io/sstable/RangeAwareSSTableWriter.java        |  6 ++
 .../cassandra/io/sstable/SSTableMultiWriter.java   |  1 +
 .../io/sstable/SSTableTxnSingleStreamWriter.java   |  6 ++
 .../io/sstable/SSTableZeroCopyWriter.java          |  6 ++
 .../io/sstable/SimpleSSTableMultiWriter.java       |  5 ++
 .../cassandra/io/sstable/format/SSTableWriter.java |  5 ++
 .../io/sstable/format/SortedTableWriter.java       | 54 +++++++++++----
 .../io/sstable/format/big/BigTableWriter.java      |  3 +-
 .../io/sstable/metadata/MetadataCollector.java     | 48 ++++++++++++--
 .../apache/cassandra/schema/ColumnMetadata.java    |  8 +++
 .../org/apache/cassandra/utils/ThreadStats.java    | 60 +++++++++++++++++
 28 files changed, 548 insertions(+), 70 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e78d5bfd6e..d2fc1ba15f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Optimize memtable flush logic (CASSANDRA-21083)
  * No need to evict already prepared statements, as it creates a race 
condition between multiple threads (CASSANDRA-17401)
  * Include Level information for UnifiedCompactionStrategy in nodetool 
tablestats output (CASSANDRA-20820)
  * Support low-overhead async profiling (CASSANDRA-20854)
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index a200807045..8f304b0052 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -260,6 +260,33 @@ public interface ClusteringPrefix<V> extends 
IMeasurableMemory, Clusterable<V>
      */
     public V get(int i);
 
+
+    /**
+     * A dedicated method to write ith value of this prefix,
+     * it is introduced for optimization reasons to avoid retrieval (and 
potential allocation) of ith value object.
+     * For the same reason null and empty check are performed inside the 
method.
+     */
+    default void writeValueSkippingNullAndEmpty(AbstractType<?> type, int i, 
DataOutputPlus out) throws IOException
+    {
+        V v = get(i);
+        if (v != null && !isEmpty(i))
+            type.writeValue(v, accessor(), out);
+    }
+
+    /**
+     * A dedicated method to get a written length of ith value of this prefix,
+     * it is introduced for optimization reasons to avoid retrieval (and 
potential allocation) of ith value object.
+     * For the same reason null and empty check are performed inside the 
method.
+     */
+    default long writtenLengthSkippingNullAndEmpty(AbstractType<?> type, int i)
+    {
+        V v = get(i);
+        if (v == null || isEmpty(i))
+            return 0;
+
+        return type.writtenLength(v, accessor());
+    }
+
     /**
      * The method is introduced to allow to avoid a value object 
retrieval/allocation for simple checks
      */
@@ -478,7 +505,6 @@ public interface ClusteringPrefix<V> extends 
IMeasurableMemory, Clusterable<V>
         {
             int offset = 0;
             int clusteringSize = clustering.size();
-            ValueAccessor<V> accessor = clustering.accessor();
             // serialize in batches of 32, to avoid garbage when deserializing 
headers
             while (offset < clusteringSize)
             {
@@ -490,9 +516,7 @@ public interface ClusteringPrefix<V> extends 
IMeasurableMemory, Clusterable<V>
                 out.writeUnsignedVInt(makeHeader(clustering, offset, limit));
                 while (offset < limit)
                 {
-                    V v = clustering.get(offset);
-                    if (v != null && !accessor.isEmpty(v))
-                        types.get(offset).writeValue(v, accessor, out);
+                    
clustering.writeValueSkippingNullAndEmpty(types.get(offset), offset, out);
                     offset++;
                 }
             }
@@ -509,14 +533,9 @@ public interface ClusteringPrefix<V> extends 
IMeasurableMemory, Clusterable<V>
                 result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, 
offset, limit));
                 offset = limit;
             }
-            ValueAccessor<V> accessor = clustering.accessor();
             for (int i = 0; i < clusteringSize; i++)
             {
-                V v = clustering.get(i);
-                if (v == null || accessor.isEmpty(v))
-                    continue; // handled in the header
-
-                result += types.get(i).writtenLength(v, accessor);
+                result += 
clustering.writtenLengthSkippingNullAndEmpty(types.get(i), i);
             }
             return result;
         }
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java 
b/src/java/org/apache/cassandra/db/NativeClustering.java
index 45cfbb3b35..66fa3205a2 100644
--- a/src/java/org/apache/cassandra/db/NativeClustering.java
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -18,14 +18,18 @@
 */
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AddressBasedNativeData;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.IndexedValueHolder;
 import org.apache.cassandra.db.marshal.NativeAccessor;
 import org.apache.cassandra.db.marshal.NativeData;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -34,7 +38,7 @@ import org.apache.cassandra.utils.memory.HeapCloner;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
 
-public class NativeClustering implements Clustering<NativeData>
+public class NativeClustering implements Clustering<NativeData>, 
IndexedValueHolder<NativeData>
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new 
NativeClustering());
 
@@ -118,11 +122,59 @@ public class NativeClustering implements 
Clustering<NativeData>
         return buildDataObject(i, AddressBasedNativeData::new);
     }
 
+    public void writeValueSkippingNullAndEmpty(AbstractType<?> type, int i, 
DataOutputPlus out) throws IOException
+    {
+        if (!isEmpty(i)) // is null is checked as a part of isEmpty
+            type.writeValue(this, i, NativeAccessor.instance, out);
+    }
+
+    public long writtenLengthSkippingNullAndEmpty(AbstractType<?> type, int i)
+    {
+        if (isEmpty(i)) // is null is checked as a part of isEmpty
+            return 0;
+
+        return type.writtenLength(this, i, NativeAccessor.instance);
+    }
+
+    @Override
+    public int size(int i)
+    {
+        int size = size();
+        if (isNull(peer, size, i))
+            return 0;
+
+        int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i 
* 2);
+        int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 
2);
+        return (endOffset - startOffset);
+    }
+
     public boolean isNull(int i)
     {
         return isNull(peer, size(), i);
     }
 
+    @Override
+    public void write(int i, DataOutputPlus out) throws IOException
+    {
+        int size = size();
+        if (i >= size)
+            throw new IndexOutOfBoundsException();
+
+        int metadataSize = (size * 2) + 4;
+        int bitmapSize = ((size + 7) >>> 3);
+        long bitmapStart = peer + metadataSize;
+        int b = NativeEndianMemoryUtil.getByte(bitmapStart + (i >>> 3));
+        if ((b & (1 << (i & 7))) != 0)
+            return;
+
+        int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i 
* 2);
+        int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 
2);
+
+        long address = bitmapStart + bitmapSize + startOffset;
+        int length = endOffset - startOffset;
+        out.writeMemory(address, length);
+    }
+
     private static boolean isNull(long peer, int size, int i)
     {
         if (i >= size)
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index b587470e26..17b2888a89 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -63,12 +63,25 @@ public class SerializationHeader
 
     private final Map<ByteBuffer, AbstractType<?>> typeMap;
 
+    private final boolean columnsMayChanged;
+
     private SerializationHeader(boolean isForSSTable,
                                 AbstractType<?> keyType,
                                 List<AbstractType<?>> clusteringTypes,
                                 RegularAndStaticColumns columns,
                                 EncodingStats stats,
                                 Map<ByteBuffer, AbstractType<?>> typeMap)
+    {
+        this(isForSSTable, keyType, clusteringTypes, columns, stats, typeMap, 
true);
+    }
+
+    private SerializationHeader(boolean isForSSTable,
+                                AbstractType<?> keyType,
+                                List<AbstractType<?>> clusteringTypes,
+                                RegularAndStaticColumns columns,
+                                EncodingStats stats,
+                                Map<ByteBuffer, AbstractType<?>> typeMap,
+                                boolean columnsMayChanged)
     {
         this.isForSSTable = isForSSTable;
         this.keyType = keyType;
@@ -76,6 +89,7 @@ public class SerializationHeader
         this.columns = columns;
         this.stats = stats;
         this.typeMap = typeMap;
+        this.columnsMayChanged = columnsMayChanged;
     }
 
     public static SerializationHeader makeWithoutStats(TableMetadata metadata)
@@ -118,6 +132,21 @@ public class SerializationHeader
         return readers;
     }
 
+    public SerializationHeader(boolean isForSSTable,
+                               TableMetadata metadata,
+                               RegularAndStaticColumns columns,
+                               EncodingStats stats,
+                               boolean columnsMayChanged)
+    {
+        this(isForSSTable,
+             metadata.partitionKeyType,
+             metadata.comparator.subtypes(),
+             columns,
+             stats,
+             null,
+             columnsMayChanged);
+    }
+
     public SerializationHeader(boolean isForSSTable,
                                TableMetadata metadata,
                                RegularAndStaticColumns columns,
@@ -128,7 +157,8 @@ public class SerializationHeader
              metadata.comparator.subtypes(),
              columns,
              stats,
-             null);
+             null,
+             true);
     }
 
     public RegularAndStaticColumns columns()
@@ -146,6 +176,11 @@ public class SerializationHeader
         return isForSSTable;
     }
 
+    public boolean columnsMayChanged()
+    {
+        return columnsMayChanged;
+    }
+
     public EncodingStats stats()
     {
         return stats;
diff --git 
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java 
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
index 79efadbfa3..b0d3f6ec9c 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
@@ -205,6 +205,14 @@ public class ShardedMultiWriter implements 
SSTableMultiWriter
         return bytesWritten;
     }
 
+    public long getTotalRows()
+    {
+        long totalRows = 0;
+        for (int i = 0; i <= currentWriter; ++i)
+            totalRows += writers[i].getTotalRows();
+        return totalRows;
+    }
+
     @Override
     public TableId getTableId()
     {
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 62f65f28b6..09545d486e 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -586,6 +586,25 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
         }
     }
 
+    public  <V> void writeValue(IndexedValueHolder<V> valueHolder, int i, 
ValueAccessor<V> accessor, DataOutputPlus out) throws IOException
+    {
+        assert !valueHolder.isNull(i) : "bytes should not be null for type " + 
this;
+        int expectedValueLength = valueLengthIfFixed();
+        if (expectedValueLength >= 0)
+        {
+            int actualValueLength = valueHolder.size(i);
+            if (actualValueLength == expectedValueLength)
+                accessor.write(valueHolder, i, out);
+            else
+                throw new IOException(String.format("Expected exactly %d 
bytes, but was %d",
+                                                    expectedValueLength, 
actualValueLength));
+        }
+        else
+        {
+            accessor.writeWithVIntLength(valueHolder, i, out);
+        }
+    }
+
     public long writtenLength(ByteBuffer value)
     {
         return writtenLength(value, ByteBufferAccessor.instance);
@@ -599,6 +618,14 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
                : accessor.sizeWithVIntLength(value);
     }
 
+    public <V> long writtenLength(IndexedValueHolder<V> valueHolder, int i, 
ValueAccessor<V> accessor)
+    {
+        assert !valueHolder.isNull(i) : "bytes should not be null for type " + 
this;
+        return valueLengthIfFixed() >= 0
+               ? valueHolder.size(i) // if the size is wrong, this will be 
detected in writeValue
+               : accessor.sizeWithVIntLength(valueHolder, i);
+    }
+
     public ByteBuffer readBuffer(DataInputPlus in) throws IOException
     {
         return readBuffer(in, Integer.MAX_VALUE);
diff --git a/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java 
b/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java
new file mode 100644
index 0000000000..0aefaf267a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public interface IndexedValueHolder<V>
+{
+    V get(int i);
+    int size(int i);
+    boolean isEmpty(int i);
+    boolean isNull(int i);
+    void write(int i, DataOutputPlus out) throws IOException;
+}
diff --git a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java 
b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
index 70d73041de..fd5442f82d 100644
--- a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
@@ -70,6 +70,13 @@ public class NativeAccessor implements 
ValueAccessor<NativeData>
         out.writeMemory(sourceValue.getAddress(), 
sourceValue.nativeDataSize());
     }
 
+    public void writeWithVIntLength(IndexedValueHolder<NativeData> 
valueHolder, int i, DataOutputPlus out) throws IOException
+    {
+        int size = valueHolder.size(i);
+        out.writeUnsignedVInt32(size);
+        valueHolder.write(i, out);
+    }
+
     @Override
     public ByteBuffer toBuffer(NativeData value)
     {
diff --git a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java 
b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
index 1d9591d27b..cc2e5b2151 100644
--- a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
@@ -125,6 +125,12 @@ public interface ValueAccessor<V>
         return TypeSizes.sizeofUnsignedVInt(size) + size;
     }
 
+    default int sizeWithVIntLength(IndexedValueHolder<V> valueHolder, int i)
+    {
+        int size = valueHolder.size(i);
+        return TypeSizes.sizeofUnsignedVInt(size) + size;
+    }
+
     /** serialized size including a short length prefix */
     default int sizeWithShortLength(V value)
     {
@@ -173,12 +179,24 @@ public interface ValueAccessor<V>
      */
     void write(V value, DataOutputPlus out) throws IOException;
 
+    default void write(IndexedValueHolder<V> valueHolder, int i, 
DataOutputPlus out) throws IOException
+    {
+        write(valueHolder.get(i), out);
+    }
+
+
     default void writeWithVIntLength(V value, DataOutputPlus out) throws 
IOException
     {
         out.writeUnsignedVInt32(size(value));
         write(value, out);
     }
 
+    default void writeWithVIntLength(IndexedValueHolder<V> valueHolder, int i, 
DataOutputPlus out) throws IOException
+    {
+        out.writeUnsignedVInt32(valueHolder.size(i));
+        write(valueHolder.get(i), out);
+    }
+
     /**
      * Write the contents of the given value into the ByteBuffer
      */
diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java 
b/src/java/org/apache/cassandra/db/memtable/Flushing.java
index f2264dcf5b..b439438764 100644
--- a/src/java/org/apache/cassandra/db/memtable/Flushing.java
+++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -45,7 +46,9 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ThreadStats;
 
 public class Flushing
 {
@@ -152,6 +155,9 @@ public class Flushing
         private void writeSortedContents()
         {
             logger.info("Writing {}, flushed range = [{}, {})", 
toFlush.memtable(), toFlush.from(), toFlush.to());
+            long startTimeNs = Clock.Global.nanoTime();
+            long startCpuTime = ThreadStats.getCurrentThreadCpuTimeNano();
+            long startAllocatedBytes = 
ThreadStats.getCurrentThreadAllocatedBytes();
 
             // (we can't clear out the map as-we-go to free up memory,
             //  since the memtable is being used for queries in the "pending 
flush" category)
@@ -176,11 +182,34 @@ public class Flushing
 
             if (logCompletion)
             {
+                long endTimeNs = Clock.Global.nanoTime();
+                long endCpuTime = ThreadStats.getCurrentThreadCpuTimeNano();
+                long endAllocatedBytes = 
ThreadStats.getCurrentThreadAllocatedBytes();
+
+                long durationMs = TimeUnit.NANOSECONDS.toMillis(endTimeNs - 
startTimeNs);
+                long durationSec = TimeUnit.MILLISECONDS.toSeconds(durationMs);
+                durationSec = durationSec == 0 ? 1 : durationSec;
                 long bytesFlushed = writer.getBytesWritten();
-                logger.info("Completed flushing {} ({}) for commitlog position 
{}",
+                long byteFlushedPerSec = bytesFlushed / durationSec;
+                long partitionsPerSec = toFlush.partitionCount() / durationSec;
+                long rowsPerSec = writer.getTotalRows() / durationSec;
+
+                logger.info("Completed flushing {} ({}) for commitlog position 
{}, " +
+                            "time spent: {} ms, " +
+                            "bytes flushed: {} / (rate: {}), " +
+                            "partitions flushed: {} / (rate: {}/s), " +
+                            "rows: {} / (rate: {}/s), " +
+                            "cpu time: {} ms, heap allocated: {}",
                             writer.getFilename(),
                             FBUtilities.prettyPrintMemory(bytesFlushed),
-                            toFlush.memtable().getFinalCommitLogUpperBound());
+                            toFlush.memtable().getFinalCommitLogUpperBound(),
+                            durationMs,
+                            bytesFlushed, 
FBUtilities.prettyPrintMemoryPerSecond(byteFlushedPerSec),
+                            toFlush.partitionCount(), partitionsPerSec,
+                            writer.getTotalRows(), rowsPerSec,
+                            startCpuTime < 0 ? "n/a": 
TimeUnit.NANOSECONDS.toMillis(endCpuTime - startCpuTime),
+                            endAllocatedBytes < 0 ? "n/a" : 
FBUtilities.prettyPrintMemory(endAllocatedBytes - startAllocatedBytes)
+                );
                 // Update the metrics
                 metrics.bytesFlushed.inc(bytesFlushed);
             }
@@ -207,6 +236,9 @@ public class Flushing
                                                        Descriptor descriptor,
                                                        long partitionCount)
     {
+        // column types altering is not allowed after CASSANDRA-12443
+        // removal and re-adding of a column with a different type is limited 
to serialization compatible types only, CASSANDRA-16905
+        boolean columnsMayChanged = false;
         return cfs.createSSTableMultiWriter(descriptor,
                                             partitionCount,
                                             
ActiveRepairService.UNREPAIRED_SSTABLE,
@@ -217,7 +249,8 @@ public class Flushing
                                             new SerializationHeader(true,
                                                                     
flushSet.metadata(),
                                                                     
flushSet.columns(),
-                                                                    
flushSet.encodingStats()),
+                                                                    
flushSet.encodingStats(),
+                                                                    
columnsMayChanged),
                                             txn);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
index 1e7f5fd408..29ab18ee6f 100644
--- a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
@@ -737,7 +737,7 @@ public class TrieMemtable extends AbstractShardedMemtable
         @Override
         public UnfilteredRowIterator unfilteredIterator()
         {
-            return unfilteredIterator(ColumnFilter.selection(super.columns()), 
Slices.ALL, false);
+            return unfilteredIterator(ColumnFilter.all(super.columns()), 
Slices.ALL, false);
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index e4a653bec9..f8f2a041e3 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -168,7 +168,7 @@ public abstract class AbstractBTreePartition implements 
Partition, Iterable<Row>
 
     public UnfilteredRowIterator unfilteredIterator()
     {
-        return unfilteredIterator(ColumnFilter.selection(columns()), 
Slices.ALL, false);
+        return unfilteredIterator(ColumnFilter.all(columns()), Slices.ALL, 
false);
     }
 
     public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, 
Slices slices, boolean reversed)
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java 
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 400dede7b2..c398839742 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -51,12 +51,12 @@ public abstract class AbstractCell<V> extends Cell<V>
 
     public boolean isCounterCell()
     {
-        return !isTombstone() && column.isCounterColumn();
+        return column.isCounterColumn() && !isTombstone();
     }
 
     public boolean isLive(long nowInSec)
     {
-        return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && 
nowInSec < localDeletionTime());
+        return isLive(nowInSec, localDeletionTime(), ttl());
     }
 
     public boolean isTombstone()
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 96da1a1a73..ed610cf0a0 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -354,7 +354,8 @@ public class BTreeRow extends AbstractRow
             if (!inclusionTester.test(column))
                 return null;
 
-            DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+            // we check isEmpty here to avoid bytes.hashCode calculation if it 
is not needed
+            DroppedColumn dropped = droppedColumns.isEmpty() ? null : 
droppedColumns.get(column.name.bytes);
             if (column.isComplex())
                 return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed 
? activeDeletion : DeletionTime.LIVE, dropped, rowLiveness);
 
@@ -402,6 +403,8 @@ public class BTreeRow extends AbstractRow
 
     public boolean hasComplexDeletion()
     {
+        if (minLocalDeletionTime == Cell.MAX_DELETION_TIME || !hasComplex())
+            return false;
         long result = accumulate((cd, v) -> ((ComplexColumnData) 
cd).complexDeletion().isLive() ? 0 : STOP_SENTINEL_VALUE,
                                  COLUMN_COMPARATOR, isStatic() ? 
FIRST_COMPLEX_STATIC : FIRST_COMPLEX_REGULAR, 0L);
         return result == STOP_SENTINEL_VALUE;
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index d4c015b0cc..6dea396993 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -177,6 +177,11 @@ public abstract class Cell<V> extends ColumnData
      */
     public abstract boolean isLive(long nowInSec);
 
+    public final boolean isLive(long nowInSec, long localDeletionTime, int ttl)
+    {
+        return localDeletionTime == NO_DELETION_TIME || ttl != NO_TTL && 
nowInSec < localDeletionTime;
+    }
+
     /**
      * For cells belonging to complex types (non-frozen collection and UDT), 
the
      * path to the cell.
@@ -280,11 +285,64 @@ public abstract class Cell<V> extends ColumnData
         public <T> void serialize(Cell<T> cell, ColumnMetadata column, 
DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) 
throws IOException
         {
             assert cell != null;
-            boolean hasValue = cell.valueSize() > 0;
-            boolean isDeleted = cell.isTombstone();
-            boolean isExpiring = cell.isExpiring();
-            boolean useRowTimestamp = !rowLiveness.isEmpty() && 
cell.timestamp() == rowLiveness.timestamp();
-            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && 
cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == 
rowLiveness.localExpirationTime();
+            int valueSize;
+            boolean hasValue;
+            boolean isDeleted;
+            boolean isExpiring;
+            long cellTimestamp;
+            long localDeletionTime;
+            int ttl;
+            T value;
+            ValueAccessor<T> accessor;
+            // To avoid megamorphic calls we split call sites.
+            // We have ArrayCell, BufferCell and NativeCell and all of them 
can be used here in different scenarios.
+            // The type check is executed once per 8 Cell method calls, so it 
amortized.
+            // While this is a micro-optimization, given that it is invoked 
per cell,
+            // the invocation frequency is extremely large and the overhead is 
noticeable.
+            Class<?> cellClass = cell.getClass();
+            if (cellClass == NativeCell.class)
+            {
+                valueSize = cell.valueSize();
+                hasValue = valueSize > 0;
+                isDeleted = cell.isTombstone();
+                isExpiring = cell.isExpiring();
+                cellTimestamp = cell.timestamp();
+                localDeletionTime = cell.localDeletionTime();
+                ttl = cell.ttl();
+                value = cell.value();
+                accessor = cell.accessor();
+            }
+            else if (cellClass == ArrayCell.class)
+            {
+                valueSize = cell.valueSize();
+                hasValue = valueSize > 0;
+                isDeleted = cell.isTombstone();
+                isExpiring = cell.isExpiring();
+                cellTimestamp = cell.timestamp();
+                localDeletionTime = cell.localDeletionTime();
+                ttl = cell.ttl();
+                value = cell.value();
+                accessor = cell.accessor();
+            }
+            else
+            {
+                valueSize = cell.valueSize();
+                hasValue = valueSize > 0;
+                isDeleted = cell.isTombstone();
+                isExpiring = cell.isExpiring();
+                cellTimestamp = cell.timestamp();
+                localDeletionTime = cell.localDeletionTime();
+                ttl = cell.ttl();
+                value = cell.value();
+                accessor = cell.accessor();
+            }
+
+
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && cellTimestamp 
== rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring
+                                && rowLiveness.isExpiring()
+                                && ttl == rowLiveness.ttl()
+                                && localDeletionTime == 
rowLiveness.localExpirationTime();
             int flags = 0;
             if (!hasValue)
                 flags |= HAS_EMPTY_VALUE_MASK;
@@ -302,18 +360,18 @@ public abstract class Cell<V> extends ColumnData
             out.writeByte((byte)flags);
 
             if (!useRowTimestamp)
-                header.writeTimestamp(cell.timestamp(), out);
+                header.writeTimestamp(cellTimestamp, out);
 
             if ((isDeleted || isExpiring) && !useRowTTL)
-                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
+                header.writeLocalDeletionTime(localDeletionTime, out);
             if (isExpiring && !useRowTTL)
-                header.writeTTL(cell.ttl(), out);
+                header.writeTTL(ttl, out);
 
             if (column.isComplex())
                 column.cellPathSerializer().serialize(cell.path(), out);
 
             if (hasValue)
-                header.getType(column).writeValue(cell.value(), 
cell.accessor(), out);
+                header.getType(column).writeValue(value, accessor, out);
         }
 
         public <V> Cell<V> deserialize(DataInputPlus in, LivenessInfo 
rowLiveness, ColumnMetadata column, SerializationHeader header, 
DeserializationHelper helper, ValueAccessor<V> accessor) throws IOException
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java 
b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index dca4240dd0..ff097111aa 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -18,17 +18,34 @@
 
 package org.apache.cassandra.db.rows;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 
+/**
+ * The class has a mutable state such as iterators and reusable fields to 
avoid extra allocation during cells processing,
+ * so it is not safe to share between mutiple threads.
+ */
+@NotThreadSafe
 public class SerializationHelper
 {
     public final SerializationHeader header;
     private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null;
     private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars = 
null;
 
+    // reusable fields to avoid extra allocation during cells processing
+    // within 
org.apache.cassandra.db.rows.UnfilteredSerializer.serializeRowBody
+    int flags;
+    LivenessInfo pkLiveness;
+
+    DataOutputPlus out;
+    SearchIterator<ColumnMetadata, ColumnMetadata> si;
+
     public SerializationHelper(SerializationHeader header)
     {
         this.header = header;
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index f3b716a2cd..02e235bab9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -242,31 +242,15 @@ public class UnfilteredSerializer
         if ((flags & HAS_ALL_COLUMNS) == 0)
             Columns.serializer.serializeSubset(row.columns(), headerColumns, 
out);
 
-        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
helper.iterator(isStatic);
+        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
helper.header.columnsMayChanged() ? helper.iterator(isStatic) : null;
 
+        helper.flags = flags;
+        helper.pkLiveness = pkLiveness;
+        helper.out = out;
+        helper.si = si;
         try
         {
-            row.apply(cd -> {
-                // We can obtain the column for data directly from 
data.column(). However, if the cell/complex data
-                // originates from a sstable, the column we'll get will have 
the type used when the sstable was serialized,
-                // and if that type have been recently altered, that may not 
be the type we want to serialize the column
-                // with. So we use the ColumnMetadata from the "header" which 
is "current". Also see #11810 for what
-                // happens if we don't do that.
-                ColumnMetadata column = si.next(cd.column());
-                assert column != null : cd.column.toString();
-
-                try
-                {
-                    if (cd.column.isSimple())
-                        Cell.serializer.serialize((Cell<?>) cd, column, out, 
pkLiveness, header);
-                    else
-                        writeComplexColumn((ComplexColumnData) cd, column, 
hasComplexDeletion(flags), pkLiveness, header, out);
-                }
-                catch (IOException e)
-                {
-                    throw new WrappedException(e);
-                }
-            });
+            row.apply(UnfilteredSerializer::serializeColumnData, helper);
         }
         catch (WrappedException e)
         {
@@ -277,7 +261,25 @@ public class UnfilteredSerializer
         }
     }
 
-    private void writeComplexColumn(ComplexColumnData data, ColumnMetadata 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header, DataOutputPlus out)
+    private static void serializeColumnData(SerializationHelper helper, 
ColumnData cd)
+    {
+        ColumnMetadata column = helper.header.columnsMayChanged() ? 
helper.si.next(cd.column()) : cd.column();
+        assert column != null : cd.column.toString();
+
+        try
+        {
+            if (cd.column.isSimple())
+                Cell.serializer.serialize((Cell<?>) cd, column, helper.out, 
helper.pkLiveness, helper.header);
+            else
+                writeComplexColumn((ComplexColumnData) cd, column, 
hasComplexDeletion(helper.flags), helper.pkLiveness, helper.header, helper.out);
+        }
+        catch (IOException e)
+        {
+            throw new WrappedException(e);
+        }
+    }
+
+    private static void writeComplexColumn(ComplexColumnData data, 
ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header, DataOutputPlus out)
     throws IOException
     {
         if (hasComplexDeletion)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
index 1b4fa8e8fe..1db8ef679d 100644
--- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
@@ -154,6 +154,12 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
        return currentWriter != null ? currentWriter.getBytesWritten() : 0L;
     }
 
+    @Override
+    public long getTotalRows()
+    {
+        return currentWriter != null ? currentWriter.getTotalRows() : 0L;
+    }
+
     @Override
     public long getOnDiskBytesWritten()
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
index 9a7968071b..c5ea26af98 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -44,6 +44,7 @@ public interface SSTableMultiWriter extends Transactional
     String getFilename();
     long getBytesWritten();
     long getOnDiskBytesWritten();
+    long getTotalRows();
     TableId getTableId();
 
     static void abortOrDie(SSTableMultiWriter writer)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
index 390e63858f..35a31ded44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
@@ -106,6 +106,12 @@ public class SSTableTxnSingleStreamWriter implements 
SSTableMultiWriter
         return writer.getOnDiskBytesWritten();
     }
 
+    @Override
+    public long getTotalRows()
+    {
+        return writer.getTotalRows();
+    }
+
     @Override
     public TableId getTableId()
     {
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 9d191482af..1632c1d209 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -164,6 +164,12 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
         return 0;
     }
 
+    @Override
+    public long getTotalRows()
+    {
+        return 0;
+    }
+
     @Override
     public TableId getTableId()
     {
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index fa541d075c..482b4cf071 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -81,6 +81,11 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
         return writer.getEstimatedOnDiskBytesWritten();
     }
 
+    public long getTotalRows()
+    {
+        return writer.getTotalRows();
+    }
+
     public TableId getTableId()
     {
         return writer.metadata().id;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index b542b15bf6..03865c6ff2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -215,6 +215,11 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         return getOnDiskFilePointer();
     }
 
+    public long getTotalRows()
+    {
+        return metadataCollector.getTotalRows();
+    }
+
     /**
      * Reset the data file to the marked position (see {@link #mark()}) and 
truncate the rest of the file.
      */
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index 05924a1b54..0c1ccf8560 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -89,6 +89,11 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
     private long lastEarlyOpenLength;
     private final Supplier<Double> crcCheckChanceSupplier;
 
+    private final boolean isPartitionSizeGuardEnabled;
+    private final boolean isPartitionTombstonesGuardEnabled;
+    private final boolean areCollectionGuardsDisabled;
+
+
     public SortedTableWriter(Builder<P, I, ?, ?> builder, 
ILifecycleTransaction txn, SSTable.Owner owner)
     {
         super(builder, txn, owner);
@@ -119,6 +124,9 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
             handleConstructionFailure(ex);
             throw ex;
         }
+        isPartitionSizeGuardEnabled = Guardrails.partitionSize.enabled();
+        isPartitionTombstonesGuardEnabled = 
Guardrails.partitionTombstones.enabled();
+        areCollectionGuardsDisabled = !Guardrails.collectionSize.enabled() && 
!Guardrails.itemsPerCollection.enabled();
     }
 
     /**
@@ -146,8 +154,20 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
             if (header.hasStatic())
                 addStaticRow(partition.partitionKey(), partition.staticRow());
 
-            while (partition.hasNext())
-                addUnfiltered(partition.partitionKey(), partition.next());
+            int i = 0;
+            boolean hasNext = false;
+            while (hasNext || partition.hasNext())
+            {
+                Unfiltered current = partition.next();
+                hasNext = partition.hasNext();
+                boolean isRowFirstOrLast;
+                if (i == 0)
+                    isRowFirstOrLast = true;
+                else
+                    isRowFirstOrLast = !hasNext;
+                addUnfiltered(partition.partitionKey(), current, 
isRowFirstOrLast);
+                i++;
+            }
 
             indexEntry = endPartition(partition.partitionKey(), 
partition.partitionLevelDeletion());
 
@@ -198,30 +218,32 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
         onStaticRow(row);
     }
 
-    private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws 
IOException
+    private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered, 
boolean isRowFirstOrLast) throws IOException
     {
         if (unfiltered.isRow())
-            addRow(key, (Row) unfiltered);
+            addRow(key, (Row) unfiltered, isRowFirstOrLast);
         else
-            addRangeTomstoneMarker((RangeTombstoneMarker) unfiltered);
+            addRangeTomstoneMarker((RangeTombstoneMarker) unfiltered, 
isRowFirstOrLast);
     }
 
-    private void addRow(DecoratedKey key, Row row) throws IOException
+    private void addRow(DecoratedKey key, Row row, boolean isRowFirstOrLast) 
throws IOException
     {
         guardCollectionSize(key, row);
 
         partitionWriter.addUnfiltered(row);
-        metadataCollector.updateClusteringValues(row.clustering());
+        if (isRowFirstOrLast)
+            metadataCollector.updateClusteringValues(row.clustering());
         Rows.collectStats(row, metadataCollector);
 
         onRow(row);
     }
 
-    private void addRangeTomstoneMarker(RangeTombstoneMarker marker) throws 
IOException
+    private void addRangeTomstoneMarker(RangeTombstoneMarker marker, boolean 
isRowFirstOrLast) throws IOException
     {
         partitionWriter.addUnfiltered(marker);
 
-        
metadataCollector.updateClusteringValuesByBoundOrBoundary(marker.clustering());
+        if (isRowFirstOrLast)
+            
metadataCollector.updateClusteringValuesByBoundOrBoundary(marker.clustering());
         if (marker.isBoundary())
         {
             RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) 
marker;
@@ -242,10 +264,12 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
         long finishResult = partitionWriter.finish();
 
         long endPosition = dataWriter.position();
-        // inclusive of last byte
         long partitionSize = endPosition - 
partitionWriter.getPartitionStartPosition();
-        guardPartitionThreshold(Guardrails.partitionSize, key, partitionSize);
-        guardPartitionThreshold(Guardrails.partitionTombstones, key, 
metadataCollector.totalTombstones);
+        // inclusive of last byte
+        if (isPartitionSizeGuardEnabled)
+            guardPartitionThreshold(Guardrails.partitionSize, key, 
partitionSize);
+        if (isPartitionTombstonesGuardEnabled)
+            guardPartitionThreshold(Guardrails.partitionTombstones, key, 
metadataCollector.totalTombstones);
         metadataCollector.addPartitionSizeInBytes(partitionSize);
         metadataCollector.addKey(key.getKey());
         metadataCollector.addCellPerPartitionCount();
@@ -255,7 +279,8 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
         if (first == null)
             first = lastWrittenKey;
 
-        logger.trace("wrote {} at {}", key, endPosition);
+        if (logger.isTraceEnabled())
+            logger.trace("wrote {} at {}", key, endPosition);
 
         return createRowIndexEntry(key, partitionLevelDeletion, finishResult);
     }
@@ -402,6 +427,9 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
 
     private void guardCollectionSize(DecoratedKey partitionKey, Row row)
     {
+        if (areCollectionGuardsDisabled)
+            return;
+
         if (!Guardrails.collectionSize.enabled() && 
!Guardrails.itemsPerCollection.enabled())
             return;
 
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 1f706c0ba4..7fc4d2604a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -295,7 +295,8 @@ public class BigTableWriter extends 
SortedTableWriter<BigFormatPartitionWriter,
             }
             long indexEnd = writer.position();
 
-            logger.trace("wrote index entry: {} at {}", indexEntry, 
indexStart);
+            if (logger.isTraceEnabled())
+                logger.trace("wrote index entry: {} at {}", indexEntry, 
indexStart);
 
             summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
         }
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index d60f37fa91..b362a5a2a2 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 
+import net.nicoulaj.compilecommand.annotations.Inline;
+
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringBound;
 import org.apache.cassandra.db.ClusteringBoundOrBoundary;
@@ -39,7 +41,9 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.ArrayCell;
 import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.NativeCell;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.io.sstable.ClusteringDescriptor;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -246,10 +250,39 @@ public class MetadataCollector implements 
PartitionStatisticsCollector
     public void update(Cell<?> cell)
     {
         ++currentPartitionCells;
-        updateTimestamp(cell.timestamp());
-        updateTTL(cell.ttl());
-        updateLocalDeletionTime(cell.localDeletionTime());
-        if (!cell.isLive(nowInSec))
+        long timestamp;
+        int ttl;
+        long localDeletionTime;
+        // This method may process several implementations of Cell.
+        // To improve inlining of Cell method calls, we split the call sites.
+        // This is a very hot path, invoked for every cell (potentially 
millions of times per second),
+        // so this micro-optimization is justified.
+        // The type check is executed once per 3 Cell method calls, so it 
amortized.
+        Class<?> cellClass = cell.getClass();
+        if (cellClass == NativeCell.class)
+        {
+            timestamp = cell.timestamp();
+            ttl = cell.ttl();
+            localDeletionTime = cell.localDeletionTime();
+        }
+        else if (cellClass == ArrayCell.class)
+        {
+            timestamp = cell.timestamp();
+            ttl = cell.ttl();
+            localDeletionTime = cell.localDeletionTime();
+        }
+        else
+        {
+            timestamp = cell.timestamp();
+            ttl = cell.ttl();
+            localDeletionTime = cell.localDeletionTime();
+        }
+        updateTimestamp(timestamp);
+        updateTTL(ttl);
+        updateLocalDeletionTime(localDeletionTime);
+
+        // isLive(nowInSec) is not used to avoid additional non-monomorphic 
calls of Cell methods
+        if (!cell.isLive(nowInSec, localDeletionTime, ttl))
             updateTombstoneCount();
     }
 
@@ -406,6 +439,11 @@ public class MetadataCollector implements 
PartitionStatisticsCollector
         this.hasLegacyCounterShards = this.hasLegacyCounterShards || 
hasLegacyCounterShards;
     }
 
+    public long getTotalRows()
+    {
+        return totalRows;
+    }
+
     public Map<MetadataType, MetadataComponent> finalizeMetadata(String 
partitioner, double bloomFilterFPChance, long repairedAt, TimeUUID 
pendingRepair, boolean isTransient, SerializationHeader header, ByteBuffer 
firstKey, ByteBuffer lastKey)
     {
         assert minClustering.kind() == ClusteringPrefix.Kind.CLUSTERING || 
minClustering.kind().isStart();
@@ -478,6 +516,7 @@ public class MetadataCollector implements 
PartitionStatisticsCollector
             this.defaultMax = defaultMax;
         }
 
+        @Inline
         public void update(long value)
         {
             if (!isSet)
@@ -525,6 +564,7 @@ public class MetadataCollector implements 
PartitionStatisticsCollector
             this.defaultMax = defaultMax;
         }
 
+        @Inline
         public void update(int value)
         {
             if (!isSet)
diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java 
b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
index d10b6faefd..9c58ae087c 100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@ -136,6 +136,8 @@ public final class ColumnMetadata extends 
ColumnSpecification implements Selecta
      */
     private final long comparisonOrder;
 
+    private final boolean isCounterColumn;
+
     /**
      * Masking function used to dynamically mask the contents of this column.
      */
@@ -289,6 +291,7 @@ public final class ColumnMetadata extends 
ColumnSpecification implements Selecta
         this.cellComparator = cellPathComparator == null ? 
ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), 
b.path());
         this.asymmetricCellPathComparator = cellPathComparator == null ? null 
: (a, b) -> cellPathComparator.compare(((Cell<?>)a).path(), (CellPath) b);
         this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, 
position), name);
+        this.isCounterColumn = isCounterColumn(type);
         this.mask = mask;
         this.columnConstraints = columnConstraints;
         this.columnConstraints.setColumnName(name);
@@ -730,6 +733,11 @@ public final class ColumnMetadata extends 
ColumnSpecification implements Selecta
      * Check if column is counter type.
      */
     public boolean isCounterColumn()
+    {
+        return isCounterColumn;
+    }
+
+    private static boolean isCounterColumn(AbstractType<?> type)
     {
         if (type instanceof CollectionType) // Possible with, for example, 
supercolumns
             return ((CollectionType) type).valueComparator().isCounter();
diff --git a/src/java/org/apache/cassandra/utils/ThreadStats.java 
b/src/java/org/apache/cassandra/utils/ThreadStats.java
new file mode 100644
index 0000000000..c19968088a
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ThreadStats.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.management.ManagementFactory;
+
+import com.sun.management.ThreadMXBean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThreadStats
+{
+    private final static Logger logger = 
LoggerFactory.getLogger(ThreadStats.class);
+
+    private static final ThreadMXBean threadMxBean;
+    static
+    {
+        ThreadMXBean threadMxBeanToInit;
+        try
+        {
+            threadMxBeanToInit = (com.sun.management.ThreadMXBean) 
ManagementFactory.getThreadMXBean();
+            threadMxBeanToInit.setThreadAllocatedMemoryEnabled(true);
+        }
+        catch (Throwable e)
+        {
+            threadMxBeanToInit = null;
+            logger.debug("Per thread stats are not available: {}", 
e.getMessage());
+        }
+        threadMxBean = threadMxBeanToInit;
+    }
+
+    public static long getCurrentThreadCpuTimeNano()
+    {
+        return threadMxBean == null || 
!threadMxBean.isCurrentThreadCpuTimeSupported()
+               ? -1 : threadMxBean.getCurrentThreadCpuTime();
+    }
+
+    public static long getCurrentThreadAllocatedBytes()
+    {
+        return threadMxBean == null || 
!threadMxBean.isThreadAllocatedMemorySupported()
+               ? -1 : threadMxBean.getCurrentThreadAllocatedBytes();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to