This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 40f9ca60f1 Improve memtable allocator accounting when updating AtomicBTreePartition 40f9ca60f1 is described below commit 40f9ca60f103783aa481bc9a91b92fd55b4ea625 Author: Benedict Elliott Smith <https://bened...@apache.org> AuthorDate: Wed Mar 1 19:08:20 2023 -0700 Improve memtable allocator accounting when updating AtomicBTreePartition patch by Benedict Elliott Smith; reviewed by Benjamin Lerer, Jon Meredith for CASSANDRA-18125 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Memtable.java | 16 +- .../db/partitions/AtomicBTreePartition.java | 13 +- .../org/apache/cassandra/db/rows/ArrayCell.java | 2 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 7 +- .../org/apache/cassandra/db/rows/BufferCell.java | 2 +- .../org/apache/cassandra/db/rows/ColumnData.java | 49 ++- .../cassandra/db/rows/ComplexColumnData.java | 2 +- .../org/apache/cassandra/db/rows/NativeCell.java | 18 +- src/java/org/apache/cassandra/db/rows/Row.java | 8 + .../org/apache/cassandra/utils/btree/BTree.java | 15 +- .../cassandra/utils/memory/MemtablePool.java | 2 +- ...AtomicBTreePartitionMemtableAccountingTest.java | 422 +++++++++++++++++++++ 13 files changed, 522 insertions(+), 35 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6dd05a34a1..f2064c098e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.9 + * Improve memtable allocator accounting when updating AtomicBTreePartition (CASSANDRA-18125) * Update zstd-jni to version 1.5.4-1 (CASSANDRA-18259) * Split and order IDEA workspace template VM_PARAMETERS (CASSANDRA-18242) Merged from 3.11: diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index c6eb68ea57..b74ac5f63c 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -37,6 +37,7 @@ import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; @@ -80,16 +81,25 @@ public class Memtable implements Comparable<Memtable> { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool(); + public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPoolInternal(); public static final long NO_MIN_TIMESTAMP = -1; - private static MemtablePool createMemtableAllocatorPool() + private static MemtablePool createMemtableAllocatorPoolInternal() { + Config.MemtableAllocationType allocationType = DatabaseDescriptor.getMemtableAllocationType(); long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20; long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20; final float cleaningThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable; - switch (DatabaseDescriptor.getMemtableAllocationType()) + return createMemtableAllocatorPoolInternal(allocationType, heapLimit, offHeapLimit, cleaningThreshold, cleaner); + } + + @VisibleForTesting + public static MemtablePool createMemtableAllocatorPoolInternal(Config.MemtableAllocationType allocationType, + long heapLimit, long offHeapLimit, + float cleaningThreshold, MemtableCleaner cleaner) + { + switch (allocationType) { case unslabbed_heap_buffers: return new HeapPool(heapLimit, cleaningThreshold, cleaner); diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index 1c0ad60cc6..7b275d0fca 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -375,7 +375,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition indexer.onInserted(insert); this.dataSize += data.dataSize(); - onAllocatedOnHeap(data.unsharedHeapSizeExcludingData()); + this.heapSize += data.unsharedHeapSizeExcludingData(); if (inserted == null) inserted = new ArrayList<>(); inserted.add(data); @@ -409,12 +409,11 @@ public final class AtomicBTreePartition extends AbstractBTreePartition public Cell<?> merge(Cell<?> previous, Cell<?> insert) { - if (insert != previous) - { - long timeDelta = Math.abs(insert.timestamp() - previous.timestamp()); - if (timeDelta < colUpdateTimeDelta) - colUpdateTimeDelta = timeDelta; - } + if (insert == previous) + return insert; + long timeDelta = Math.abs(insert.timestamp() - previous.timestamp()); + if (timeDelta < colUpdateTimeDelta) + colUpdateTimeDelta = timeDelta; if (cloner != null) insert = cloner.clone(insert); dataSize += insert.dataSize() - previous.dataSize(); diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java b/src/java/org/apache/cassandra/db/rows/ArrayCell.java index 2d82a1268c..48a97a78c9 100644 --- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java +++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java @@ -105,7 +105,7 @@ public class ArrayCell extends AbstractCell<byte[]> @Override public Cell<?> clone(ByteBufferCloner cloner) { - if (value.length == 0) + if (value.length == 0 && path == null) return this; return super.clone(cloner); diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 73c0991f07..5a10f3799c 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -280,7 +280,12 @@ public class BTreeRow extends AbstractRow public ComplexColumnData getComplexColumnData(ColumnMetadata c) { assert c.isComplex(); - return (ComplexColumnData) BTree.<Object>find(btree, ColumnMetadata.asymmetricColumnDataComparator, c); + return (ComplexColumnData) getColumnData(c); + } + + public ColumnData getColumnData(ColumnMetadata c) + { + return (ColumnData) BTree.<Object>find(btree, ColumnMetadata.asymmetricColumnDataComparator, c); } @Override diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 391be4872d..fc85b3973a 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -137,7 +137,7 @@ public class BufferCell extends AbstractCell<ByteBuffer> @Override public Cell<?> clone(ByteBufferCloner cloner) { - if (!value.hasRemaining()) + if (!value.hasRemaining() && path == null) return this; return super.clone(cloner); diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index 8e4957a742..e4fa83c396 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -98,13 +98,13 @@ public abstract class ColumnData public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable { private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>(); - private PostReconciliationFunction modifier; + private PostReconciliationFunction postReconcile; private DeletionTime activeDeletion; private TinyThreadLocalPool.TinyPool<Reconciler> pool; - private void init(PostReconciliationFunction modifier, DeletionTime activeDeletion) + private void init(PostReconciliationFunction postReconcile, DeletionTime activeDeletion) { - this.modifier = modifier; + this.postReconcile = postReconcile; this.activeDeletion = activeDeletion; } @@ -112,11 +112,10 @@ public abstract class ColumnData { if (!(existing instanceof ComplexColumnData)) { - Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update; - + Cell<?> existingCell = (Cell<?>) existing, updateCell = (Cell<?>) update; Cell<?> result = Cells.reconcile(existingCell, updateCell); - return modifier.merge(existingCell, result); + return postReconcile.merge(existingCell, result); } else { @@ -132,17 +131,22 @@ public abstract class ColumnData Object[] cells; - try (Reconciler reconciler = reconciler(modifier, maxComplexDeletion)) + try (Reconciler reconciler = reconciler(postReconcile, maxComplexDeletion)) { if (!maxComplexDeletion.isLive()) { if (maxComplexDeletion == existingDeletion) { - updateTree = BTree.transformAndFilter(updateTree, reconciler::retain); + updateTree = BTree.<ColumnData, ColumnData>transformAndFilter(updateTree, reconciler::removeShadowed); } else { - existingTree = BTree.transformAndFilter(existingTree, reconciler::retain); + Object[] retained = BTree.transformAndFilter(existingTree, reconciler::retain); + if (existingTree != retained) + { + onAllocatedOnHeap(BTree.sizeOnHeapOf(retained) - BTree.sizeOnHeapOf(existingTree)); + existingTree = retained; + } } } cells = BTree.update(existingTree, updateTree, existingComplex.column.cellComparator(), (UpdateFunction) reconciler); @@ -154,13 +158,13 @@ public abstract class ColumnData @Override public void onAllocatedOnHeap(long heapSize) { - modifier.onAllocatedOnHeap(heapSize); + postReconcile.onAllocatedOnHeap(heapSize); } @Override public ColumnData insert(ColumnData insert) { - return modifier.insert(insert); + return postReconcile.insert(insert); } /** @@ -170,22 +174,37 @@ public abstract class ColumnData * @return {@code null} if the value should be removed from the BTree or the existing value if it should not. */ public ColumnData retain(ColumnData existing) + { + return removeShadowed(existing, postReconcile); + } + + private ColumnData removeShadowed(ColumnData existing) + { + return removeShadowed(existing, ColumnData.noOp); + } + + /** + * Checks if the specified value should be deleted or not. + * + * @param existing the existing value to check + * @return {@code null} if the value should be removed from the BTree or the existing value if it should not. + */ + private ColumnData removeShadowed(ColumnData existing, PostReconciliationFunction recordDeletion) { if (!(existing instanceof ComplexColumnData)) { if (activeDeletion.deletes((Cell) existing)) { - modifier.delete(existing); + recordDeletion.delete(existing); return null; } } else { ComplexColumnData existingComplex = (ComplexColumnData) existing; - if (activeDeletion.supersedes(existingComplex.complexDeletion())) { - Object[] cells = BTree.transformAndFilter(existingComplex.tree(), this::retain); + Object[] cells = BTree.transformAndFilter(existingComplex.tree(), (ColumnData cd) -> removeShadowed(cd, recordDeletion)); return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE); } } @@ -196,7 +215,7 @@ public abstract class ColumnData public void close() { activeDeletion = null; - modifier = null; + postReconcile = null; TinyThreadLocalPool.TinyPool<Reconciler> tmp = pool; pool = null; diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index 7ff59f9e29..ee86e5d6c0 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -138,7 +138,7 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell<?>> public long unsharedHeapSizeExcludingData() { - long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells); + long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells); // TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation for (Cell<?> cell : this) heapSize += cell.unsharedHeapSizeExcludingData(); diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java b/src/java/org/apache/cassandra/db/rows/NativeCell.java index 02e000823a..d538504415 100644 --- a/src/java/org/apache/cassandra/db/rows/NativeCell.java +++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java @@ -72,7 +72,7 @@ public class NativeCell extends AbstractCell<ByteBuffer> CellPath path) { super(column); - long size = simpleSize(value.remaining()); + long size = offHeapSizeWithoutPath(value.remaining()); assert value.order() == ByteOrder.BIG_ENDIAN; assert column.isComplex() == (path != null); @@ -105,7 +105,7 @@ public class NativeCell extends AbstractCell<ByteBuffer> } } - private static long simpleSize(int length) + private static long offHeapSizeWithoutPath(int length) { return VALUE + length; } @@ -138,7 +138,7 @@ public class NativeCell extends AbstractCell<ByteBuffer> public CellPath path() { - if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0) + if (!hasPath()) return null; long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH); @@ -171,4 +171,16 @@ public class NativeCell extends AbstractCell<ByteBuffer> return EMPTY_SIZE; } + public long offHeapSize() + { + long size = offHeapSizeWithoutPath(MemoryUtil.getInt(peer + LENGTH)); + if (hasPath()) + size += 4 + MemoryUtil.getInt(peer + size); + return size; + } + + private boolean hasPath() + { + return MemoryUtil.getByte(peer+ HAS_CELLPATH) != 0; + } } diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index a51f5afebe..40d1467a9b 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -148,6 +148,14 @@ public interface Row extends Unfiltered, Iterable<ColumnData> */ public ComplexColumnData getComplexColumnData(ColumnMetadata c); + /** + * The data for a regular or complex column. + * + * @param c the column for which to return the complex data. + * @return the data for {@code c} or {@code null} if the row has no data for this column. + */ + public ColumnData getColumnData(ColumnMetadata c); + /** * An iterable over the cells of this row. * <p> diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index caf0699e75..97f5cc0fb5 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -365,7 +365,9 @@ public class BTree toUpdate = insert; insert = tmp; } - return updateLeaves(toUpdate, insert, comparator, updateF); + Object[] merged = updateLeaves(toUpdate, insert, comparator, updateF); + updateF.onAllocatedOnHeap(sizeOnHeapOf(merged) - sizeOnHeapOf(toUpdate)); + return merged; } if (!isLeaf(insert) && isSimple(updateF)) @@ -2195,6 +2197,8 @@ public class BTree public static long sizeOnHeapOf(Object[] tree) { + if (isEmpty(tree)) + return 0; long size = ObjectSizes.sizeOfArray(tree); if (isLeaf(tree)) return size; @@ -2204,6 +2208,13 @@ public class BTree return size; } + private static long sizeOnHeapOfLeaf(Object[] tree) + { + if (isEmpty(tree)) + return 0; + return ObjectSizes.sizeOfArray(tree); + } + // Arbitrary boundaries private static Object POSITIVE_INFINITY = new Object(); private static Object NEGATIVE_INFINITY = new Object(); @@ -2751,7 +2762,7 @@ public class BTree sizeOfLeaf = count; leaf = drain(); if (allocated >= 0 && sizeOfLeaf > 0) - allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (unode == null ? 0 : ObjectSizes.sizeOfArray(unode)); + allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (unode == null ? 0 : sizeOnHeapOfLeaf(unode)); } count = 0; diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index 58b2910972..966c560f5f 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -190,7 +190,7 @@ public abstract class MemtablePool void released(long size) { - assert size >= 0; + assert size >= 0 : "Negative released: " + size; adjustAllocated(-size); hasRoom.signalAll(); } diff --git a/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java b/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java new file mode 100644 index 0000000000..125015fbf6 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/partitions/AtomicBTreePartitionMemtableAccountingTest.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.partitions; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; + +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; + +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.Cells; +import org.apache.cassandra.db.rows.ColumnData; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.db.rows.NativeCell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.Cloner; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.apache.cassandra.utils.memory.MemtableCleaner; +import org.apache.cassandra.utils.memory.MemtablePool; + +import static org.assertj.core.api.Assertions.assertThat; + +/* Test memory pool accounting when updating atomic btree partitions. CASSANDRA-18125 hit an issue + * where cells were doubly-counted when releasing causing negative allocator onHeap ownership which + * crashed memtable flushing. + * + * The aim of the test is to exhaustively test updates to simple and complex cells in all possible + * state and check the accounting is reasonable. It generates an initial row, then an update row + * and checks the allocator ownership is reasonable, then compares usage to a freshly recreated + * instance of the partition. + * + * Replacing existing values does not free up memory and is accounted for when comparing + * the fresh build. + */ +@RunWith(Parameterized.class) +public class AtomicBTreePartitionMemtableAccountingTest +{ + public static final int INITIAL_TS = 2000; + public static final int EARLIER_TS = 1000; + public static final int LATER_TS = 3000; + + public static final int NOW_LDT = FBUtilities.nowInSeconds(); + public static final int LATER_LDT = NOW_LDT + 1000; + public static final int EARLIER_LDT = NOW_LDT - 1000; + + public static final int EXPIRED_TTL = 1; + public static final int EXPIRING_TTL = 10000; + + public static final long HEAP_LIMIT = 1 << 20; + public static final long OFF_HEAP_LIMIT = 1 << 20; + public static final float MEMTABLE_CLEANUP_THRESHOLD = 0.25f; + public static final MemtableCleaner DUMMY_CLEANER = () -> CompletableFuture.completedFuture(null); + + @Parameterized.Parameters(name="allocationType={0}") + public static Iterable<? extends Object> data() + { + return Arrays.asList(Config.MemtableAllocationType.values()); + } + + @Parameterized.Parameter + public Config.MemtableAllocationType allocationType; + + static TableMetadata metadata; + static DecoratedKey partitionKey; + static ColumnMetadata r1md; + static ColumnMetadata c2md; + static ColumnMetadata s3md; + static ColumnMetadata c4md; + + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + metadata = TableMetadata.builder("dummy_ks", "dummy_tbl") + .addPartitionKeyColumn("pk", Int32Type.instance) + .addRegularColumn("r1", Int32Type.instance) + .addRegularColumn("c2", SetType.getInstance(Int32Type.instance, true)) + .addStaticColumn("s3", Int32Type.instance) + .addStaticColumn("c4", SetType.getInstance(Int32Type.instance, true)) + .build(); + partitionKey = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(0)); + r1md = metadata.getColumn(new ColumnIdentifier("r1", false)); + c2md = metadata.getColumn(new ColumnIdentifier("c2", false)); + s3md = metadata.getColumn(new ColumnIdentifier("s3", false)); + c4md = metadata.getColumn(new ColumnIdentifier("c4", false)); + } + + @Ignore + @Test + public void repro() // For running in the IDE, update with failing testCase parameters to run + { + new TestCase(INITIAL_TS, Cell.NO_TTL, Cell.NO_DELETION_TIME, new DeletionTime(EARLIER_TS, EARLIER_LDT), 1, + EARLIER_TS, Cell.NO_TTL, Cell.NO_DELETION_TIME, DeletionTime.LIVE, 3).execute(); + } + + @Test + public void exhaustiveTest() + { + // TTLs for initial and updated cells + List<Integer> ttls = Arrays.asList(Cell.NO_TTL, EXPIRING_TTL, EXPIRED_TTL); + + // Initital local deleted times - a live cell, and a tombstone from now + List<Integer> initialLDTs = Arrays.asList(Cell.NO_DELETION_TIME, NOW_LDT); + + // Initial complex deletion time for c2 - no deletion, earlier than c2 elements, or concurrent with c2 elements + List<DeletionTime> initialComplexDeletionTimes = Arrays.asList(DeletionTime.LIVE, + new DeletionTime(EARLIER_TS, EARLIER_LDT), + new DeletionTime(INITIAL_TS, NOW_LDT)); + + // Update timestamps - earlier - ignore update, same as initial, after initial - supercedes + List<Integer> updateTimestamps = Arrays.asList(EARLIER_TS, INITIAL_TS, LATER_TS); + + // Update local deleted times - live cell, earlier tombstone, concurrent tombstone, or future deletion + List<Integer> updateLDTs = Arrays.asList(Cell.NO_DELETION_TIME, EARLIER_LDT, NOW_LDT, LATER_LDT); + + // Update complex deletion time for c2 - no deletion, earlier than c2 elements, + // or concurrent with c2 elements, after c2 elements + List<DeletionTime> updateComplexDeletionTimes = Arrays.asList(DeletionTime.LIVE, + new DeletionTime(EARLIER_TS, EARLIER_LDT), + new DeletionTime(INITIAL_TS, NOW_LDT), + new DeletionTime(LATER_TS, LATER_LDT)); + + // Number of cells to put in the update collection - overlapping by one cell + List<Integer> initialComplexCellCount = Arrays.asList(3, 1); + List<Integer> updateComplexCellCount = Arrays.asList(3, 1); + + ttls.forEach(initialTTL -> { + initialLDTs.forEach(initialLDT -> { + initialComplexDeletionTimes.forEach(initialCDT -> { + initialComplexCellCount.forEach(numC2InitialCells -> { + updateTimestamps.forEach(updateTS -> { + ttls.forEach(updateTTL -> { + updateLDTs.forEach(updateLDT -> { + updateComplexDeletionTimes.forEach(updateCDT -> { + updateComplexCellCount.forEach(numC2UpdateCells -> { + new TestCase(INITIAL_TS, initialTTL, initialLDT, initialCDT, numC2InitialCells, + updateTS, updateTTL, updateLDT, updateCDT, numC2UpdateCells).execute(); + }); + }); + }); + }); + }); + }); + }); + }); + }); + } + + class TestCase + { + int initialTS; + int initialTTL; + int initialLDT; + DeletionTime initialCDT; + int numC2InitialCells; + int updateTS; + int updateTTL; + int updateLDT; + DeletionTime updateCDT; + Integer numC2UpdateCells; + + public TestCase(int initialTS, int initialTTL, int initialLDT, DeletionTime initialCDT, int numC2InitialCells, + int updateTS, int updateTTL, int updateLDT, DeletionTime updateCDT, Integer numC2UpdateCells) + { + this.initialTS = initialTS; + this.initialTTL = initialTTL; + this.initialLDT = initialLDT; + this.initialCDT = initialCDT; + this.numC2InitialCells = numC2InitialCells; + this.updateTS = updateTS; + this.updateTTL = updateTTL; + this.updateLDT = updateLDT; + this.updateCDT = updateCDT; + this.numC2UpdateCells = numC2UpdateCells; + } + + void execute() + { + // Test regular row updates + Pair<Row, Row> regularRows = makeInitialAndUpdate(r1md, c2md); + PartitionUpdate initial = PartitionUpdate.singleRowUpdate(metadata, partitionKey, regularRows.left, null); + PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, partitionKey, regularRows.right, null); + validateUpdates(metadata, partitionKey, Arrays.asList(initial, update)); + + // Test static row updates + Pair<Row, Row> staticRows = makeInitialAndUpdate(s3md, c4md); + PartitionUpdate staticInitial = PartitionUpdate.singleRowUpdate(metadata, partitionKey, null, staticRows.left); + PartitionUpdate staticUpdate = PartitionUpdate.singleRowUpdate(metadata, partitionKey, null, staticRows.right); + validateUpdates(metadata, partitionKey, Arrays.asList(staticInitial, staticUpdate)); + } + + private Pair<Row, Row> makeInitialAndUpdate(ColumnMetadata regular, ColumnMetadata complex) + { + final ByteBuffer initialValueBB = ByteBufferUtil.bytes(111); + final ByteBuffer updateValueBB = ByteBufferUtil.bytes(222); + + // Create the initial row to populate the partition with + Row.Builder initialRowBuilder = BTreeRow.unsortedBuilder(); + initialRowBuilder.newRow(regular.isStatic() ? Clustering.STATIC_CLUSTERING : Clustering.EMPTY); + + initialRowBuilder.addCell(makeCell(regular, initialTS, initialTTL, initialLDT, initialValueBB, null)); + if (initialCDT != DeletionTime.LIVE) + initialRowBuilder.addComplexDeletion(complex, initialCDT); + int cellPath = 1000; + for (int i = 0; i < numC2InitialCells; i++) + initialRowBuilder.addCell(makeCell(complex, initialTS, initialTTL, initialLDT, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + CellPath.create(ByteBufferUtil.bytes(cellPath--)))); + Row initialRow = initialRowBuilder.build(); + + // Create the update row to modify the partition with + Row.Builder updateRowBuilder = BTreeRow.unsortedBuilder(); + updateRowBuilder.newRow(regular.isStatic() ? Clustering.STATIC_CLUSTERING : Clustering.EMPTY); + + updateRowBuilder.addCell(makeCell(regular, updateTS, updateTTL, updateLDT, updateValueBB, null)); + if (updateCDT != DeletionTime.LIVE) + updateRowBuilder.addComplexDeletion(complex, updateCDT); + + // Make multiple update cells to make any issues more pronounced + cellPath = 1000; + for (int i = 0; i < numC2UpdateCells; i++) + updateRowBuilder.addCell(makeCell(complex, updateTS, updateTTL, updateLDT, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + CellPath.create(ByteBufferUtil.bytes(cellPath++)))); + Row updateRow = updateRowBuilder.build(); + return Pair.create(initialRow, updateRow); + } + + Cell<?> makeCell(ColumnMetadata column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path) + { + if (localDeletionTime != Cell.NO_DELETION_TIME) // never a ttl for a tombstone + { + ttl = Cell.NO_TTL; + value = ByteBufferUtil.EMPTY_BYTE_BUFFER; + } + return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); + } + } + + void validateUpdates(TableMetadata metadata, DecoratedKey partitionKey, List<PartitionUpdate> updates) + { + TableMetadataRef metadataRef = TableMetadataRef.forOfflineTools(metadata); + + OpOrder opOrder = new OpOrder(); + opOrder.start(); + UpdateTransaction indexer = UpdateTransaction.NO_OP; + + MemtablePool memtablePool = Memtable.createMemtableAllocatorPoolInternal(allocationType, + HEAP_LIMIT, OFF_HEAP_LIMIT, MEMTABLE_CLEANUP_THRESHOLD, DUMMY_CLEANER); + MemtableAllocator allocator = memtablePool.newAllocator(); + MemtableAllocator recreatedAllocator = memtablePool.newAllocator(); + try + { + // Prepare a partition to receive updates + AtomicBTreePartition partition = new AtomicBTreePartition(metadataRef, partitionKey, allocator); + + // For each update, apply it and verify the allocator is positive + long unreleasable = updates.stream().mapToLong(update -> { + DeletionTime exsDeletion = partition.deletionInfo().getPartitionDeletion(); + DeletionTime updDeletion = update.deletionInfo().getPartitionDeletion(); + long updateUnreleasable = 0; + if (!BTree.isEmpty(partition.unsafeGetHolder().tree)) + { + for (Row updRow : BTree.<Row>iterable(update.holder().tree)) + { + Row exsRow = BTree.find(partition.unsafeGetHolder().tree, partition.metadata().comparator, updRow); + updateUnreleasable += getUnreleasableSize(updRow, exsRow, exsDeletion, updDeletion); + } + } + if (partition.staticRow() != null) + { + updateUnreleasable += getUnreleasableSize(update.staticRow(), partition.unsafeGetHolder().staticRow, exsDeletion, updDeletion); + } + + OpOrder.Group writeOp = opOrder.getCurrent(); + Cloner cloner = allocator.cloner(writeOp); + partition.addAllWithSizeDelta(update, cloner, writeOp, indexer); + opOrder.newBarrier().issue(); + + assertThat(allocator.onHeap().owns()).isGreaterThanOrEqualTo(0L); + assertThat(allocator.offHeap().owns()).isGreaterThanOrEqualTo(0L); + return updateUnreleasable; + }).sum(); + + // Now recreate the partition to see if there's a leak in the accounting + + AtomicBTreePartition recreated = new AtomicBTreePartition(metadataRef, partitionKey, recreatedAllocator); + try (UnfilteredRowIterator iter = partition.unfilteredIterator()) + { + PartitionUpdate update = PartitionUpdate.fromIterator(iter, ColumnFilter.NONE); + opOrder.newBarrier().issue(); + OpOrder.Group writeOp = opOrder.getCurrent(); + Cloner cloner = recreatedAllocator.cloner(writeOp); + recreated.addAllWithSizeDelta(update, cloner, writeOp, indexer); + } + + // offheap allocators don't release on heap memory, so expect the same + long unreleasableOnHeap = 0, unreleasableOffHeap = 0; + if (allocator.offHeap().owns() > 0) unreleasableOffHeap = unreleasable; + else unreleasableOnHeap = unreleasable; + + assertThat(recreatedAllocator.offHeap().owns()).isEqualTo(allocator.offHeap().owns() - unreleasableOffHeap); + assertThat(recreatedAllocator.onHeap().owns()).isEqualTo(allocator.onHeap().owns() - unreleasableOnHeap); + } + finally + { + // Release test resources + recreatedAllocator.setDiscarding(); + recreatedAllocator.setDiscarded(); + allocator.setDiscarding(); + allocator.setDiscarded(); + try + { + memtablePool.shutdownAndWait(1, TimeUnit.SECONDS); + } + catch (Throwable tr) + { + // too bad + } + } + } + + private long getUnreleasableSize(Row updRow, Row exsRow, DeletionTime exsDeletion, DeletionTime updDeletion) + { + if (exsRow.deletion().supersedes(exsDeletion)) + exsDeletion = exsRow.deletion().time(); + if (updRow.deletion().supersedes(updDeletion)) + updDeletion = updRow.deletion().time(); + + long size = 0; + for (ColumnData exsCd : exsRow.columnData()) + { + ColumnData updCd = updRow.getColumnData(exsCd.column()); + if (exsCd instanceof Cell) + { + Cell exsCell = (Cell) exsCd, updCell = (Cell) updCd; + if (updDeletion.deletes(exsCell)) + size += sizeOf(exsCell); + else if (updCell != null && Cells.reconcile(exsCell, updCell) != exsCell && !exsDeletion.deletes(updCell)) + size += sizeOf(exsCell); + } + else + { + ComplexColumnData exsCcd = (ComplexColumnData) exsCd; + ComplexColumnData updCcd = (ComplexColumnData) updCd; + + DeletionTime activeExsDeletion = exsDeletion; + DeletionTime activeUpdDeletion = updDeletion; + if (exsCcd.complexDeletion().supersedes(exsDeletion)) + activeExsDeletion = exsCcd.complexDeletion(); + if (updCcd != null && updCcd.complexDeletion().supersedes(updDeletion)) + activeUpdDeletion = updCcd.complexDeletion(); + + for (Cell exsCell : exsCcd) + { + Cell updCell = updCcd == null ? null : updCcd.getCell(exsCell.path()); + + if (activeUpdDeletion.deletes(exsCell)) + size += sizeOf(exsCell); + else if (updCell != null && (Cells.reconcile(exsCell, updCell) != exsCell && !activeExsDeletion.deletes(updCell))) + size += sizeOf(exsCell); + } + } + } + return size; + } + + private static long sizeOf(Cell cell) + { + if (cell instanceof NativeCell) + return ((NativeCell) cell).offHeapSize(); + return cell.valueSize() + (cell.path() == null ? 0 : cell.path().dataSize()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org