This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 40f9ca60f1 Improve memtable allocator accounting when updating 
AtomicBTreePartition
40f9ca60f1 is described below

commit 40f9ca60f103783aa481bc9a91b92fd55b4ea625
Author: Benedict Elliott Smith <https://bened...@apache.org>
AuthorDate: Wed Mar 1 19:08:20 2023 -0700

    Improve memtable allocator accounting when updating AtomicBTreePartition
    
    patch by Benedict Elliott Smith; reviewed by Benjamin Lerer, Jon Meredith 
for CASSANDRA-18125
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/Memtable.java     |  16 +-
 .../db/partitions/AtomicBTreePartition.java        |  13 +-
 .../org/apache/cassandra/db/rows/ArrayCell.java    |   2 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java     |   7 +-
 .../org/apache/cassandra/db/rows/BufferCell.java   |   2 +-
 .../org/apache/cassandra/db/rows/ColumnData.java   |  49 ++-
 .../cassandra/db/rows/ComplexColumnData.java       |   2 +-
 .../org/apache/cassandra/db/rows/NativeCell.java   |  18 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |   8 +
 .../org/apache/cassandra/utils/btree/BTree.java    |  15 +-
 .../cassandra/utils/memory/MemtablePool.java       |   2 +-
 ...AtomicBTreePartitionMemtableAccountingTest.java | 422 +++++++++++++++++++++
 13 files changed, 522 insertions(+), 35 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6dd05a34a1..f2064c098e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.9
+ * Improve memtable allocator accounting when updating AtomicBTreePartition 
(CASSANDRA-18125)
  * Update zstd-jni to version 1.5.4-1 (CASSANDRA-18259)
  * Split and order IDEA workspace template VM_PARAMETERS (CASSANDRA-18242)
 Merged from 3.11:
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index c6eb68ea57..b74ac5f63c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -37,6 +37,7 @@ import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
@@ -80,16 +81,25 @@ public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Memtable.class);
 
-    public static final MemtablePool MEMORY_POOL = 
createMemtableAllocatorPool();
+    public static final MemtablePool MEMORY_POOL = 
createMemtableAllocatorPoolInternal();
     public static final long NO_MIN_TIMESTAMP = -1;
 
-    private static MemtablePool createMemtableAllocatorPool()
+    private static MemtablePool createMemtableAllocatorPoolInternal()
     {
+        Config.MemtableAllocationType allocationType = 
DatabaseDescriptor.getMemtableAllocationType();
         long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
         long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() 
<< 20;
         final float cleaningThreshold = 
DatabaseDescriptor.getMemtableCleanupThreshold();
         final MemtableCleaner cleaner = 
ColumnFamilyStore::flushLargestMemtable;
-        switch (DatabaseDescriptor.getMemtableAllocationType())
+        return createMemtableAllocatorPoolInternal(allocationType, heapLimit, 
offHeapLimit, cleaningThreshold, cleaner);
+    }
+
+    @VisibleForTesting
+    public static MemtablePool 
createMemtableAllocatorPoolInternal(Config.MemtableAllocationType 
allocationType,
+                                                                   long 
heapLimit, long offHeapLimit,
+                                                                   float 
cleaningThreshold, MemtableCleaner cleaner)
+    {
+        switch (allocationType)
         {
             case unslabbed_heap_buffers:
                 return new HeapPool(heapLimit, cleaningThreshold, cleaner);
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 1c0ad60cc6..7b275d0fca 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -375,7 +375,7 @@ public final class AtomicBTreePartition extends 
AbstractBTreePartition
             indexer.onInserted(insert);
 
             this.dataSize += data.dataSize();
-            onAllocatedOnHeap(data.unsharedHeapSizeExcludingData());
+            this.heapSize += data.unsharedHeapSizeExcludingData();
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(data);
@@ -409,12 +409,11 @@ public final class AtomicBTreePartition extends 
AbstractBTreePartition
 
         public Cell<?> merge(Cell<?> previous, Cell<?> insert)
         {
-            if (insert != previous)
-            {
-                long timeDelta = Math.abs(insert.timestamp() - 
previous.timestamp());
-                if (timeDelta < colUpdateTimeDelta)
-                    colUpdateTimeDelta = timeDelta;
-            }
+            if (insert == previous)
+                return insert;
+            long timeDelta = Math.abs(insert.timestamp() - 
previous.timestamp());
+            if (timeDelta < colUpdateTimeDelta)
+                colUpdateTimeDelta = timeDelta;
             if (cloner != null)
                 insert = cloner.clone(insert);
             dataSize += insert.dataSize() - previous.dataSize();
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java 
b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
index 2d82a1268c..48a97a78c9 100644
--- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
+++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
@@ -105,7 +105,7 @@ public class ArrayCell extends AbstractCell<byte[]>
     @Override
     public Cell<?> clone(ByteBufferCloner cloner)
     {
-        if (value.length == 0)
+        if (value.length == 0 && path == null)
             return this;
 
         return super.clone(cloner);
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 73c0991f07..5a10f3799c 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -280,7 +280,12 @@ public class BTreeRow extends AbstractRow
     public ComplexColumnData getComplexColumnData(ColumnMetadata c)
     {
         assert c.isComplex();
-        return (ComplexColumnData) BTree.<Object>find(btree, 
ColumnMetadata.asymmetricColumnDataComparator, c);
+        return (ComplexColumnData) getColumnData(c);
+    }
+
+    public ColumnData getColumnData(ColumnMetadata c)
+    {
+        return (ColumnData) BTree.<Object>find(btree, 
ColumnMetadata.asymmetricColumnDataComparator, c);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java 
b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 391be4872d..fc85b3973a 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -137,7 +137,7 @@ public class BufferCell extends AbstractCell<ByteBuffer>
     @Override
     public Cell<?> clone(ByteBufferCloner cloner)
     {
-        if (!value.hasRemaining())
+        if (!value.hasRemaining() && path == null)
             return this;
 
         return super.clone(cloner);
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 8e4957a742..e4fa83c396 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -98,13 +98,13 @@ public abstract class ColumnData
     public static class Reconciler implements UpdateFunction<ColumnData, 
ColumnData>, AutoCloseable
     {
         private static final TinyThreadLocalPool<Reconciler> POOL = new 
TinyThreadLocalPool<>();
-        private PostReconciliationFunction modifier;
+        private PostReconciliationFunction postReconcile;
         private DeletionTime activeDeletion;
         private TinyThreadLocalPool.TinyPool<Reconciler> pool;
 
-        private void init(PostReconciliationFunction modifier, DeletionTime 
activeDeletion)
+        private void init(PostReconciliationFunction postReconcile, 
DeletionTime activeDeletion)
         {
-            this.modifier = modifier;
+            this.postReconcile = postReconcile;
             this.activeDeletion = activeDeletion;
         }
 
@@ -112,11 +112,10 @@ public abstract class ColumnData
         {
             if (!(existing instanceof ComplexColumnData))
             {
-                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) 
update;
-
+                Cell<?> existingCell = (Cell<?>) existing, updateCell = 
(Cell<?>) update;
                 Cell<?> result = Cells.reconcile(existingCell, updateCell);
 
-                return modifier.merge(existingCell, result);
+                return postReconcile.merge(existingCell, result);
             }
             else
             {
@@ -132,17 +131,22 @@ public abstract class ColumnData
 
                 Object[] cells;
 
-                try (Reconciler reconciler = reconciler(modifier, 
maxComplexDeletion))
+                try (Reconciler reconciler = reconciler(postReconcile, 
maxComplexDeletion))
                 {
                     if (!maxComplexDeletion.isLive())
                     {
                         if (maxComplexDeletion == existingDeletion)
                         {
-                            updateTree = BTree.transformAndFilter(updateTree, 
reconciler::retain);
+                            updateTree = BTree.<ColumnData, 
ColumnData>transformAndFilter(updateTree, reconciler::removeShadowed);
                         }
                         else
                         {
-                            existingTree = 
BTree.transformAndFilter(existingTree, reconciler::retain);
+                            Object[] retained = 
BTree.transformAndFilter(existingTree, reconciler::retain);
+                            if (existingTree != retained)
+                            {
+                                onAllocatedOnHeap(BTree.sizeOnHeapOf(retained) 
- BTree.sizeOnHeapOf(existingTree));
+                                existingTree = retained;
+                            }
                         }
                     }
                     cells = BTree.update(existingTree, updateTree, 
existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
@@ -154,13 +158,13 @@ public abstract class ColumnData
         @Override
         public void onAllocatedOnHeap(long heapSize)
         {
-            modifier.onAllocatedOnHeap(heapSize);
+            postReconcile.onAllocatedOnHeap(heapSize);
         }
 
         @Override
         public ColumnData insert(ColumnData insert)
         {
-            return modifier.insert(insert);
+            return postReconcile.insert(insert);
         }
 
         /**
@@ -170,22 +174,37 @@ public abstract class ColumnData
          * @return {@code null} if the value should be removed from the BTree 
or the existing value if it should not.
          */
         public ColumnData retain(ColumnData existing)
+        {
+            return removeShadowed(existing, postReconcile);
+        }
+
+        private ColumnData removeShadowed(ColumnData existing)
+        {
+            return removeShadowed(existing, ColumnData.noOp);
+        }
+
+        /**
+         * Checks if the specified value  should be deleted or not.
+         *
+         * @param existing the existing value to check
+         * @return {@code null} if the value should be removed from the BTree 
or the existing value if it should not.
+         */
+        private ColumnData removeShadowed(ColumnData existing, 
PostReconciliationFunction recordDeletion)
         {
             if (!(existing instanceof ComplexColumnData))
             {
                 if (activeDeletion.deletes((Cell) existing))
                 {
-                    modifier.delete(existing);
+                    recordDeletion.delete(existing);
                     return null;
                 }
             }
             else
             {
                 ComplexColumnData existingComplex = (ComplexColumnData) 
existing;
-
                 if 
(activeDeletion.supersedes(existingComplex.complexDeletion()))
                 {
-                    Object[] cells = 
BTree.transformAndFilter(existingComplex.tree(), this::retain);
+                    Object[] cells = 
BTree.transformAndFilter(existingComplex.tree(), (ColumnData cd) -> 
removeShadowed(cd, recordDeletion));
                     return BTree.isEmpty(cells) ? null : new 
ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE);
                 }
             }
@@ -196,7 +215,7 @@ public abstract class ColumnData
         public void close()
         {
             activeDeletion = null;
-            modifier = null;
+            postReconcile = null;
 
             TinyThreadLocalPool.TinyPool<Reconciler> tmp = pool;
             pool = null;
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 7ff59f9e29..ee86e5d6c0 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -138,7 +138,7 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell<?>>
 
     public long unsharedHeapSizeExcludingData()
     {
-        long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
+        long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
         // TODO: this can be turned into a simple multiplication, at least 
while we have only one Cell implementation
         for (Cell<?> cell : this)
             heapSize += cell.unsharedHeapSizeExcludingData();
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java 
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index 02e000823a..d538504415 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -72,7 +72,7 @@ public class NativeCell extends AbstractCell<ByteBuffer>
                       CellPath path)
     {
         super(column);
-        long size = simpleSize(value.remaining());
+        long size = offHeapSizeWithoutPath(value.remaining());
 
         assert value.order() == ByteOrder.BIG_ENDIAN;
         assert column.isComplex() == (path != null);
@@ -105,7 +105,7 @@ public class NativeCell extends AbstractCell<ByteBuffer>
         }
     }
 
-    private static long simpleSize(int length)
+    private static long offHeapSizeWithoutPath(int length)
     {
         return VALUE + length;
     }
@@ -138,7 +138,7 @@ public class NativeCell extends AbstractCell<ByteBuffer>
 
     public CellPath path()
     {
-        if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0)
+        if (!hasPath())
             return null;
 
         long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH);
@@ -171,4 +171,16 @@ public class NativeCell extends AbstractCell<ByteBuffer>
         return EMPTY_SIZE;
     }
 
+    public long offHeapSize()
+    {
+        long size = offHeapSizeWithoutPath(MemoryUtil.getInt(peer + LENGTH));
+        if (hasPath())
+            size += 4 + MemoryUtil.getInt(peer + size);
+        return size;
+    }
+
+    private boolean hasPath()
+    {
+        return MemoryUtil.getByte(peer+ HAS_CELLPATH) != 0;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index a51f5afebe..40d1467a9b 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -148,6 +148,14 @@ public interface Row extends Unfiltered, 
Iterable<ColumnData>
      */
     public ComplexColumnData getComplexColumnData(ColumnMetadata c);
 
+    /**
+     * The data for a regular or complex column.
+     *
+     * @param c the column for which to return the complex data.
+     * @return the data for {@code c} or {@code null} if the row has no data 
for this column.
+     */
+    public ColumnData getColumnData(ColumnMetadata c);
+
     /**
      * An iterable over the cells of this row.
      * <p>
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java 
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index caf0699e75..97f5cc0fb5 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -365,7 +365,9 @@ public class BTree
                 toUpdate = insert;
                 insert = tmp;
             }
-            return updateLeaves(toUpdate, insert, comparator, updateF);
+            Object[] merged = updateLeaves(toUpdate, insert, comparator, 
updateF);
+            updateF.onAllocatedOnHeap(sizeOnHeapOf(merged) - 
sizeOnHeapOf(toUpdate));
+            return merged;
         }
 
         if (!isLeaf(insert) && isSimple(updateF))
@@ -2195,6 +2197,8 @@ public class BTree
 
     public static long sizeOnHeapOf(Object[] tree)
     {
+        if (isEmpty(tree))
+            return 0;
         long size = ObjectSizes.sizeOfArray(tree);
         if (isLeaf(tree))
             return size;
@@ -2204,6 +2208,13 @@ public class BTree
         return size;
     }
 
+    private static long sizeOnHeapOfLeaf(Object[] tree)
+    {
+        if (isEmpty(tree))
+            return 0;
+        return ObjectSizes.sizeOfArray(tree);
+    }
+
     // Arbitrary boundaries
     private static Object POSITIVE_INFINITY = new Object();
     private static Object NEGATIVE_INFINITY = new Object();
@@ -2751,7 +2762,7 @@ public class BTree
                 sizeOfLeaf = count;
                 leaf = drain();
                 if (allocated >= 0 && sizeOfLeaf > 0)
-                    allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 
1) - (unode == null ? 0 : ObjectSizes.sizeOfArray(unode));
+                    allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 
1) - (unode == null ? 0 : sizeOnHeapOfLeaf(unode));
             }
 
             count = 0;
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java 
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 58b2910972..966c560f5f 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -190,7 +190,7 @@ public abstract class MemtablePool
 
         void released(long size)
         {
-            assert size >= 0;
+            assert size >= 0 : "Negative released: " + size;
             adjustAllocated(-size);
             hasRoom.signalAll();
         }
diff --git 
a/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java
 
b/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java
new file mode 100644
index 0000000000..125015fbf6
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.NativeCell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.Cloner;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.MemtableCleaner;
+import org.apache.cassandra.utils.memory.MemtablePool;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/* Test memory pool accounting when updating atomic btree partitions.  
CASSANDRA-18125 hit an issue
+ * where cells were doubly-counted when releasing causing negative allocator 
onHeap ownership which
+ * crashed memtable flushing.
+ *
+ * The aim of the test is to exhaustively test updates to simple and complex 
cells in all possible
+ * state and check the accounting is reasonable. It generates an initial row, 
then an update row
+ * and checks the allocator ownership is reasonable, then compares usage to a 
freshly recreated
+ * instance of the partition.
+ *
+ * Replacing existing values does not free up memory and is accounted for when 
comparing
+ * the fresh build.
+ */
+@RunWith(Parameterized.class)
+public class AtomicBTreePartitionMemtableAccountingTest
+{
+    public static final int INITIAL_TS = 2000;
+    public static final int EARLIER_TS = 1000;
+    public static final int LATER_TS = 3000;
+
+    public static final int NOW_LDT = FBUtilities.nowInSeconds();
+    public static final int LATER_LDT = NOW_LDT + 1000;
+    public static final int EARLIER_LDT = NOW_LDT - 1000;
+
+    public static final int EXPIRED_TTL = 1;
+    public static final int EXPIRING_TTL = 10000;
+
+    public static final long HEAP_LIMIT = 1 << 20;
+    public static final long OFF_HEAP_LIMIT = 1 << 20;
+    public static final float MEMTABLE_CLEANUP_THRESHOLD = 0.25f;
+    public static final MemtableCleaner DUMMY_CLEANER = () -> 
CompletableFuture.completedFuture(null);
+
+    @Parameterized.Parameters(name="allocationType={0}")
+    public static Iterable<? extends Object> data()
+    {
+        return Arrays.asList(Config.MemtableAllocationType.values());
+    }
+
+    @Parameterized.Parameter
+    public Config.MemtableAllocationType allocationType;
+
+    static TableMetadata metadata;
+    static DecoratedKey partitionKey;
+    static  ColumnMetadata r1md;
+    static  ColumnMetadata c2md;
+    static  ColumnMetadata s3md;
+    static  ColumnMetadata c4md;
+
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        metadata = TableMetadata.builder("dummy_ks", "dummy_tbl")
+                                                           
.addPartitionKeyColumn("pk", Int32Type.instance)
+                                                           
.addRegularColumn("r1", Int32Type.instance)
+                                                           
.addRegularColumn("c2", SetType.getInstance(Int32Type.instance, true))
+                                                           
.addStaticColumn("s3", Int32Type.instance)
+                                                           
.addStaticColumn("c4", SetType.getInstance(Int32Type.instance, true))
+                                                           .build();
+        partitionKey = 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(0));
+        r1md = metadata.getColumn(new ColumnIdentifier("r1", false));
+        c2md = metadata.getColumn(new ColumnIdentifier("c2", false));
+        s3md = metadata.getColumn(new ColumnIdentifier("s3", false));
+        c4md = metadata.getColumn(new ColumnIdentifier("c4", false));
+    }
+
+    @Ignore
+    @Test
+    public void repro() // For running in the IDE, update with failing 
testCase parameters to run
+    {
+        new TestCase(INITIAL_TS, Cell.NO_TTL, Cell.NO_DELETION_TIME, new 
DeletionTime(EARLIER_TS, EARLIER_LDT), 1,
+                     EARLIER_TS, Cell.NO_TTL, Cell.NO_DELETION_TIME, 
DeletionTime.LIVE, 3).execute();
+    }
+
+    @Test
+    public void exhaustiveTest()
+    {
+        // TTLs for initial and updated cells
+        List<Integer> ttls = Arrays.asList(Cell.NO_TTL, EXPIRING_TTL, 
EXPIRED_TTL);
+
+        // Initital local deleted times - a live cell, and a tombstone from now
+        List<Integer> initialLDTs = Arrays.asList(Cell.NO_DELETION_TIME, 
NOW_LDT);
+
+        // Initial complex deletion time for c2 - no deletion, earlier than c2 
elements, or concurrent with c2 elements
+        List<DeletionTime> initialComplexDeletionTimes = 
Arrays.asList(DeletionTime.LIVE,
+                                                                       new 
DeletionTime(EARLIER_TS, EARLIER_LDT),
+                                                                       new 
DeletionTime(INITIAL_TS, NOW_LDT));
+
+        // Update timestamps - earlier - ignore update, same as initial, after 
initial - supercedes
+        List<Integer> updateTimestamps = Arrays.asList(EARLIER_TS, INITIAL_TS, 
LATER_TS);
+
+        // Update local deleted times - live cell, earlier tombstone, 
concurrent tombstone, or future deletion
+        List<Integer> updateLDTs = Arrays.asList(Cell.NO_DELETION_TIME, 
EARLIER_LDT, NOW_LDT, LATER_LDT);
+
+        // Update complex deletion time for c2 - no deletion, earlier than c2 
elements,
+        // or concurrent with c2 elements, after c2 elements
+        List<DeletionTime> updateComplexDeletionTimes = 
Arrays.asList(DeletionTime.LIVE,
+                                                                      new 
DeletionTime(EARLIER_TS, EARLIER_LDT),
+                                                                      new 
DeletionTime(INITIAL_TS, NOW_LDT),
+                                                                      new 
DeletionTime(LATER_TS, LATER_LDT));
+
+        // Number of cells to put in the update collection - overlapping by 
one cell
+        List<Integer> initialComplexCellCount = Arrays.asList(3, 1);
+        List<Integer> updateComplexCellCount = Arrays.asList(3, 1);
+
+        ttls.forEach(initialTTL -> {
+            initialLDTs.forEach(initialLDT -> {
+                initialComplexDeletionTimes.forEach(initialCDT -> {
+                    initialComplexCellCount.forEach(numC2InitialCells -> {
+                        updateTimestamps.forEach(updateTS -> {
+                            ttls.forEach(updateTTL -> {
+                                updateLDTs.forEach(updateLDT -> {
+                                    
updateComplexDeletionTimes.forEach(updateCDT -> {
+                                        
updateComplexCellCount.forEach(numC2UpdateCells -> {
+                                            new TestCase(INITIAL_TS, 
initialTTL, initialLDT, initialCDT, numC2InitialCells,
+                                                         updateTS, updateTTL, 
updateLDT, updateCDT, numC2UpdateCells).execute();
+                                        });
+                                    });
+                                });
+                            });
+                        });
+                    });
+                });
+            });
+        });
+    }
+
+    class TestCase
+    {
+        int initialTS;
+        int initialTTL;
+        int initialLDT;
+        DeletionTime initialCDT;
+        int numC2InitialCells;
+        int updateTS;
+        int updateTTL;
+        int updateLDT;
+        DeletionTime updateCDT;
+        Integer numC2UpdateCells;
+
+        public TestCase(int initialTS, int initialTTL, int initialLDT, 
DeletionTime initialCDT, int numC2InitialCells,
+                        int updateTS, int updateTTL, int updateLDT, 
DeletionTime updateCDT, Integer numC2UpdateCells)
+        {
+            this.initialTS = initialTS;
+            this.initialTTL = initialTTL;
+            this.initialLDT = initialLDT;
+            this.initialCDT = initialCDT;
+            this.numC2InitialCells = numC2InitialCells;
+            this.updateTS = updateTS;
+            this.updateTTL = updateTTL;
+            this.updateLDT = updateLDT;
+            this.updateCDT = updateCDT;
+            this.numC2UpdateCells = numC2UpdateCells;
+        }
+
+        void execute()
+        {
+            // Test regular row updates
+            Pair<Row, Row> regularRows = makeInitialAndUpdate(r1md, c2md);
+            PartitionUpdate initial = 
PartitionUpdate.singleRowUpdate(metadata, partitionKey, regularRows.left, null);
+            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, 
partitionKey, regularRows.right, null);
+            validateUpdates(metadata, partitionKey, Arrays.asList(initial, 
update));
+
+            // Test static row updates
+            Pair<Row, Row> staticRows = makeInitialAndUpdate(s3md, c4md);
+            PartitionUpdate staticInitial = 
PartitionUpdate.singleRowUpdate(metadata, partitionKey, null, staticRows.left);
+            PartitionUpdate staticUpdate = 
PartitionUpdate.singleRowUpdate(metadata, partitionKey, null, staticRows.right);
+            validateUpdates(metadata, partitionKey, 
Arrays.asList(staticInitial, staticUpdate));
+        }
+
+        private Pair<Row, Row> makeInitialAndUpdate(ColumnMetadata regular, 
ColumnMetadata complex)
+        {
+            final ByteBuffer initialValueBB = ByteBufferUtil.bytes(111);
+            final ByteBuffer updateValueBB = ByteBufferUtil.bytes(222);
+
+            // Create the initial row to populate the partition with
+            Row.Builder initialRowBuilder = BTreeRow.unsortedBuilder();
+            initialRowBuilder.newRow(regular.isStatic() ? 
Clustering.STATIC_CLUSTERING : Clustering.EMPTY);
+
+            initialRowBuilder.addCell(makeCell(regular, initialTS, initialTTL, 
initialLDT, initialValueBB, null));
+            if (initialCDT != DeletionTime.LIVE)
+                initialRowBuilder.addComplexDeletion(complex, initialCDT);
+            int cellPath = 1000;
+            for (int i = 0; i < numC2InitialCells; i++)
+                initialRowBuilder.addCell(makeCell(complex, initialTS, 
initialTTL, initialLDT,
+                                                   
ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                   
CellPath.create(ByteBufferUtil.bytes(cellPath--))));
+            Row initialRow = initialRowBuilder.build();
+
+            // Create the update row to modify the partition with
+            Row.Builder updateRowBuilder = BTreeRow.unsortedBuilder();
+            updateRowBuilder.newRow(regular.isStatic() ? 
Clustering.STATIC_CLUSTERING : Clustering.EMPTY);
+
+            updateRowBuilder.addCell(makeCell(regular, updateTS, updateTTL, 
updateLDT, updateValueBB, null));
+            if (updateCDT != DeletionTime.LIVE)
+                updateRowBuilder.addComplexDeletion(complex, updateCDT);
+
+            // Make multiple update cells to make any issues more pronounced
+            cellPath = 1000;
+            for (int i = 0; i < numC2UpdateCells; i++)
+                updateRowBuilder.addCell(makeCell(complex, updateTS, 
updateTTL, updateLDT,
+                                                  
ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                  
CellPath.create(ByteBufferUtil.bytes(cellPath++))));
+            Row updateRow = updateRowBuilder.build();
+            return Pair.create(initialRow, updateRow);
+        }
+
+        Cell<?> makeCell(ColumnMetadata column, long timestamp, int ttl, int 
localDeletionTime, ByteBuffer value, CellPath path)
+        {
+            if (localDeletionTime != Cell.NO_DELETION_TIME) // never a ttl for 
a tombstone
+            {
+                ttl = Cell.NO_TTL;
+                value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            }
+            return new BufferCell(column, timestamp, ttl, localDeletionTime, 
value, path);
+        }
+    }
+
+    void validateUpdates(TableMetadata metadata, DecoratedKey partitionKey, 
List<PartitionUpdate> updates)
+    {
+        TableMetadataRef metadataRef = 
TableMetadataRef.forOfflineTools(metadata);
+
+        OpOrder opOrder = new OpOrder();
+        opOrder.start();
+        UpdateTransaction indexer = UpdateTransaction.NO_OP;
+
+        MemtablePool memtablePool = 
Memtable.createMemtableAllocatorPoolInternal(allocationType,
+                                                                               
  HEAP_LIMIT, OFF_HEAP_LIMIT, MEMTABLE_CLEANUP_THRESHOLD, DUMMY_CLEANER);
+        MemtableAllocator allocator = memtablePool.newAllocator();
+        MemtableAllocator recreatedAllocator = memtablePool.newAllocator();
+        try
+        {
+            // Prepare a partition to receive updates
+            AtomicBTreePartition partition = new 
AtomicBTreePartition(metadataRef, partitionKey, allocator);
+
+            // For each update, apply it and verify the allocator is positive
+            long unreleasable = updates.stream().mapToLong(update -> {
+                DeletionTime exsDeletion = 
partition.deletionInfo().getPartitionDeletion();
+                DeletionTime updDeletion = 
update.deletionInfo().getPartitionDeletion();
+                long updateUnreleasable = 0;
+                if (!BTree.isEmpty(partition.unsafeGetHolder().tree))
+                {
+                    for (Row updRow : 
BTree.<Row>iterable(update.holder().tree))
+                    {
+                        Row exsRow = 
BTree.find(partition.unsafeGetHolder().tree, partition.metadata().comparator, 
updRow);
+                        updateUnreleasable += getUnreleasableSize(updRow, 
exsRow, exsDeletion, updDeletion);
+                    }
+                }
+                if (partition.staticRow() != null)
+                {
+                    updateUnreleasable += 
getUnreleasableSize(update.staticRow(), partition.unsafeGetHolder().staticRow, 
exsDeletion, updDeletion);
+                }
+
+                OpOrder.Group writeOp = opOrder.getCurrent();
+                Cloner cloner = allocator.cloner(writeOp);
+                partition.addAllWithSizeDelta(update, cloner, writeOp, 
indexer);
+                opOrder.newBarrier().issue();
+
+                
assertThat(allocator.onHeap().owns()).isGreaterThanOrEqualTo(0L);
+                
assertThat(allocator.offHeap().owns()).isGreaterThanOrEqualTo(0L);
+                return updateUnreleasable;
+            }).sum();
+
+            // Now recreate the partition to see if there's a leak in the 
accounting
+
+            AtomicBTreePartition recreated = new 
AtomicBTreePartition(metadataRef, partitionKey, recreatedAllocator);
+            try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+            {
+                PartitionUpdate update = PartitionUpdate.fromIterator(iter, 
ColumnFilter.NONE);
+                opOrder.newBarrier().issue();
+                OpOrder.Group writeOp = opOrder.getCurrent();
+                Cloner cloner = recreatedAllocator.cloner(writeOp);
+                recreated.addAllWithSizeDelta(update, cloner, writeOp, 
indexer);
+            }
+
+            // offheap allocators don't release on heap memory, so expect the 
same
+            long unreleasableOnHeap = 0, unreleasableOffHeap = 0;
+            if (allocator.offHeap().owns() > 0) unreleasableOffHeap = 
unreleasable;
+            else unreleasableOnHeap = unreleasable;
+            
+            
assertThat(recreatedAllocator.offHeap().owns()).isEqualTo(allocator.offHeap().owns()
 - unreleasableOffHeap);
+            
assertThat(recreatedAllocator.onHeap().owns()).isEqualTo(allocator.onHeap().owns()
 - unreleasableOnHeap);
+        }
+        finally
+        {
+            // Release test resources
+            recreatedAllocator.setDiscarding();
+            recreatedAllocator.setDiscarded();
+            allocator.setDiscarding();
+            allocator.setDiscarded();
+            try
+            {
+                memtablePool.shutdownAndWait(1, TimeUnit.SECONDS);
+            }
+            catch (Throwable tr)
+            {
+                // too bad
+            }
+        }
+    }
+
+    private long getUnreleasableSize(Row updRow, Row exsRow, DeletionTime 
exsDeletion, DeletionTime updDeletion)
+    {
+        if (exsRow.deletion().supersedes(exsDeletion))
+            exsDeletion = exsRow.deletion().time();
+        if (updRow.deletion().supersedes(updDeletion))
+            updDeletion = updRow.deletion().time();
+
+        long size = 0;
+        for (ColumnData exsCd : exsRow.columnData())
+        {
+            ColumnData updCd = updRow.getColumnData(exsCd.column());
+            if (exsCd instanceof Cell)
+            {
+                Cell exsCell = (Cell) exsCd, updCell = (Cell) updCd;
+                if (updDeletion.deletes(exsCell))
+                    size += sizeOf(exsCell);
+                else if (updCell != null && Cells.reconcile(exsCell, updCell) 
!= exsCell && !exsDeletion.deletes(updCell))
+                    size += sizeOf(exsCell);
+            }
+            else
+            {
+                ComplexColumnData exsCcd = (ComplexColumnData) exsCd;
+                ComplexColumnData updCcd = (ComplexColumnData) updCd;
+
+                DeletionTime activeExsDeletion = exsDeletion;
+                DeletionTime activeUpdDeletion = updDeletion;
+                if (exsCcd.complexDeletion().supersedes(exsDeletion))
+                    activeExsDeletion = exsCcd.complexDeletion();
+                if (updCcd != null && 
updCcd.complexDeletion().supersedes(updDeletion))
+                    activeUpdDeletion = updCcd.complexDeletion();
+
+                for (Cell exsCell : exsCcd)
+                {
+                    Cell updCell = updCcd == null ? null : 
updCcd.getCell(exsCell.path());
+
+                    if (activeUpdDeletion.deletes(exsCell))
+                        size += sizeOf(exsCell);
+                    else if (updCell != null && (Cells.reconcile(exsCell, 
updCell) != exsCell && !activeExsDeletion.deletes(updCell)))
+                        size += sizeOf(exsCell);
+                }
+            }
+        }
+        return size;
+    }
+
+    private static long sizeOf(Cell cell)
+    {
+        if (cell instanceof NativeCell)
+            return ((NativeCell) cell).offHeapSize();
+        return cell.valueSize() + (cell.path() == null ? 0 : 
cell.path().dataSize());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to