This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d95aebe0a3583179605b63dba0a4edc33cb2f97f Merge: da95e4b 18736b4 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu May 7 11:00:46 2020 +0200 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../db/lifecycle/LifecycleTransaction.java | 45 ++++++- .../io/sstable/IndexSummaryRedistribution.java | 28 ++++- .../cassandra/io/sstable/format/SSTableReader.java | 6 - .../apache/cassandra/io/DiskSpaceMetricsTest.java | 138 +++++++++++++++++++++ 5 files changed, 209 insertions(+), 9 deletions(-) diff --cc CHANGES.txt index 0c50b0a,c326801..7f1930d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,27 -1,7 +1,28 @@@ -3.11.7 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904) +4.0-alpha5 + * Add tunable initial size and growth factor to RangeTombstoneList (CASSANDRA-15763) + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755) + * bin/sstableverify should support user provided token ranges (CASSANDRA-15753) + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781) + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560) + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726) + * Avoid race condition when completing stream sessions (CASSANDRA-15666) + * Flush with fast compressors by default (CASSANDRA-15379) + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637) + * Allow sending Entire SSTables over SSL (CASSANDRA-15740) + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739) + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730) + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687) + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601) + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660) + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597) + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573) + * Improve logging around incremental repair (CASSANDRA-15599) + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688) + * Replace array iterators with get by index (CASSANDRA-15394) + * Minimize BTree iterator allocations (CASSANDRA-15389) +Merged from 3.11: Merged from 3.0: + * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674) * Fix Debian init start/stop (CASSANDRA-15770) * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242) * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595) diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index a129e41,5994707..574c6a4 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@@ -20,8 -20,8 +20,7 @@@ package org.apache.cassandra.db.lifecyc import java.io.File; import java.nio.file.Path; import java.util.*; --import java.util.function.BiFunction; - +import java.util.function.BiPredicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index 1300c99,b914963..90a8621 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@@ -28,7 -28,8 +28,6 @@@ import java.util.Map import java.util.UUID; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; --import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -41,7 -41,7 +39,8 @@@ import org.apache.cassandra.db.compacti import org.apache.cassandra.db.compaction.CompactionInfo.Unit; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@@ -256,15 -262,40 +255,40 @@@ public class IndexSummaryRedistributio logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries", sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); - ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); + ColumnFamilyStore cfs = Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id); + long oldSize = sstable.bytesOnDisk(); SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); + long newSize = replacement.bytesOnDisk(); newSSTables.add(replacement); - transactions.get(sstable.metadata.cfId).update(replacement, true); + transactions.get(sstable.metadata().id).update(replacement, true); + addHooks(cfs, transactions, oldSize, newSize); } return newSSTables; } + /** + * Add hooks to correctly update the storage load metrics once the transaction is closed/aborted + */ + @SuppressWarnings("resource") // Transactions are closed in finally outside of this method - private void addHooks(ColumnFamilyStore cfs, Map<UUID, LifecycleTransaction> transactions, long oldSize, long newSize) ++ private void addHooks(ColumnFamilyStore cfs, Map<TableId, LifecycleTransaction> transactions, long oldSize, long newSize) + { - LifecycleTransaction txn = transactions.get(cfs.metadata.cfId); ++ LifecycleTransaction txn = transactions.get(cfs.metadata.id); + txn.runOnCommit(() -> { + // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738 + StorageMetrics.load.dec(oldSize); + cfs.metric.liveDiskSpaceUsed.dec(oldSize); + cfs.metric.totalDiskSpaceUsed.dec(oldSize); + }); + txn.runOnAbort(() -> { + // the local disk was modified but book keeping couldn't be commited, apply the delta + long delta = oldSize - newSize; // if new is larger this will be negative, so dec will become a inc + StorageMetrics.load.dec(delta); + cfs.metric.liveDiskSpaceUsed.dec(delta); + cfs.metric.totalDiskSpaceUsed.dec(delta); + }); + } + @VisibleForTesting static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) { diff --cc test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java index 0000000,ddacc6b..73d5e22 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java +++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java @@@ -1,0 -1,139 +1,138 @@@ + package org.apache.cassandra.io; + + import java.io.IOException; -import java.util.Collections; + import java.util.List; + import java.util.Map; -import java.util.UUID; + import java.util.concurrent.atomic.AtomicInteger; + + import com.google.common.collect.ImmutableMap; + import com.google.common.collect.Lists; + import org.junit.Assert; + import org.junit.Test; + + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.compaction.CompactionInterruptedException; + import org.apache.cassandra.db.compaction.OperationType; + import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.SSTableSet; + import org.apache.cassandra.io.sstable.IndexSummaryManager; + import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; + import org.apache.cassandra.io.sstable.format.SSTableReader; ++import org.apache.cassandra.schema.TableId; + import org.apache.cassandra.utils.FBUtilities; + + public class DiskSpaceMetricsTest extends CQLTester + { + /** + * This test runs the system with normal operations and makes sure the disk metrics match reality + */ + @Test + public void baseline() throws Throwable + { + createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + // disable compaction so nothing changes between calculations + cfs.disableAutoCompaction(); + + // create 100 sstables + for (int i = 0; i < 100; i++) + insert(cfs, i); + assertDiskSpaceEqual(cfs); + } + + /** + * If index summary downsampling is interrupted in the middle, the metrics still reflect the real data + */ + @Test + public void summaryRedistribution() throws Throwable + { + createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + // disable compaction so nothing changes between calculations + cfs.disableAutoCompaction(); + + // create 100 sstables, make sure they have more than 1 value, else sampling can't happen + for (int i = 0; i < 100; i++) + insertN(cfs, 10, i); + assertDiskSpaceEqual(cfs); + + // summary downsample + for (int i = 0; i < 100; i++) + { + indexDownsampleCancelLastSSTable(cfs); + assertDiskSpaceEqual(cfs); + } + } + + private void insert(ColumnFamilyStore cfs, long value) throws Throwable + { + insertN(cfs, 1, value); + } + + private void insertN(ColumnFamilyStore cfs, int n, long base) throws Throwable + { + for (int i = 0; i < n; i++) + execute("INSERT INTO %s (pk) VALUES (?)", base + i); + + // flush to write the sstable + cfs.forceBlockingFlush(); + } + + private void assertDiskSpaceEqual(ColumnFamilyStore cfs) + { + long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount(); + long actual = 0; + for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables()) + actual += sstable.bytesOnDisk(); + + Assert.assertEquals("bytes on disk does not match current metric liveDiskSpaceUsed", actual, liveDiskSpaceUsed); + + // totalDiskSpaceUsed is based off SStable delete, which is async: LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks + // wait for there to be no more pending sstable releases + LifecycleTransaction.waitForDeletions(); + long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount(); + Assert.assertEquals("bytes on disk does not match current metric totalDiskSpaceUsed", actual, totalDiskSpaceUsed); + } + + private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs) + { + List<SSTableReader> sstables = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL)); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - Map<UUID, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.cfId, txn); ++ Map<TableId, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.id, txn); + // fail on the last file (* 3 because we call isStopRequested 3 times for each sstable, and we should fail on the last) + AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1); - IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(Collections.emptyList(), txns, 0) { ++ IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(txns, 0, 0) { + public boolean isStopRequested() + { + return countdown.decrementAndGet() == 0; + } + }; + try + { + IndexSummaryManager.redistributeSummaries(redistribution); + Assert.fail("Should throw CompactionInterruptedException"); + } + catch (CompactionInterruptedException e) + { + // trying to get this to happen + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + FBUtilities.closeAll(txns.values()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org