This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d95aebe0a3583179605b63dba0a4edc33cb2f97f
Merge: da95e4b 18736b4
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu May 7 11:00:46 2020 +0200

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../db/lifecycle/LifecycleTransaction.java         |  45 ++++++-
 .../io/sstable/IndexSummaryRedistribution.java     |  28 ++++-
 .../cassandra/io/sstable/format/SSTableReader.java |   6 -
 .../apache/cassandra/io/DiskSpaceMetricsTest.java  | 138 +++++++++++++++++++++
 5 files changed, 209 insertions(+), 9 deletions(-)

diff --cc CHANGES.txt
index 0c50b0a,c326801..7f1930d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,7 +1,28 @@@
 -3.11.7
 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
 +4.0-alpha5
 + * Add tunable initial size and growth factor to RangeTombstoneList 
(CASSANDRA-15763)
 + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755)
 + * bin/sstableverify should support user provided token ranges 
(CASSANDRA-15753)
 + * Improve logging when mutation passed to commit log is too large 
(CASSANDRA-14781)
 + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
 + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool 
eviction (CASSANDRA-15726)
 + * Avoid race condition when completing stream sessions (CASSANDRA-15666)
 + * Flush with fast compressors by default (CASSANDRA-15379)
 + * Fix CqlInputFormat regression from the switch to system.size_estimates 
(CASSANDRA-15637)
 + * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
 + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility 
(CASSANDRA-15739)
 + * Fix batch statement preparation when multiple tables and parameters are 
used (CASSANDRA-15730)
 + * Fix regression with traceOutgoingMessage printing message size 
(CASSANDRA-15687)
 + * Ensure repaired data tracking reads a consistent amount of data across 
replicas (CASSANDRA-15601)
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics 
(CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ 
(CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled 
(CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 +Merged from 3.11:
  Merged from 3.0:
+  * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if 
IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
   * Fix Debian init start/stop (CASSANDRA-15770)
   * Fix infinite loop on index query paging in tables with clustering 
(CASSANDRA-14242)
   * Fix chunk index overflow due to large sstable with small chunk length 
(CASSANDRA-15595)
diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index a129e41,5994707..574c6a4
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@@ -20,8 -20,8 +20,7 @@@ package org.apache.cassandra.db.lifecyc
  import java.io.File;
  import java.nio.file.Path;
  import java.util.*;
--import java.util.function.BiFunction;
 -
 +import java.util.function.BiPredicate;
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
  import com.google.common.collect.*;
diff --cc 
src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 1300c99,b914963..90a8621
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -28,7 -28,8 +28,6 @@@ import java.util.Map
  import java.util.UUID;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ImmutableList;
--import com.google.common.collect.Iterables;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -41,7 -41,7 +39,8 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -256,15 -262,40 +255,40 @@@ public class IndexSummaryRedistributio
              logger.trace("Re-sampling index summary for {} from {}/{} to 
{}/{} of the original number of entries",
                           sstable, sstable.getIndexSummarySamplingLevel(), 
Downsampling.BASE_SAMPLING_LEVEL,
                           entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
 -            ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
 +            ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id);
+             long oldSize = sstable.bytesOnDisk();
              SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+             long newSize = replacement.bytesOnDisk();
              newSSTables.add(replacement);
 -            transactions.get(sstable.metadata.cfId).update(replacement, true);
 +            transactions.get(sstable.metadata().id).update(replacement, true);
+             addHooks(cfs, transactions, oldSize, newSize);
          }
  
          return newSSTables;
      }
  
+     /**
+      * Add hooks to correctly update the storage load metrics once the 
transaction is closed/aborted
+      */
+     @SuppressWarnings("resource") // Transactions are closed in finally 
outside of this method
 -    private void addHooks(ColumnFamilyStore cfs, Map<UUID, 
LifecycleTransaction> transactions, long oldSize, long newSize)
++    private void addHooks(ColumnFamilyStore cfs, Map<TableId, 
LifecycleTransaction> transactions, long oldSize, long newSize)
+     {
 -        LifecycleTransaction txn = transactions.get(cfs.metadata.cfId);
++        LifecycleTransaction txn = transactions.get(cfs.metadata.id);
+         txn.runOnCommit(() -> {
+             // The new size will be added in Transactional.commit() as an 
updated SSTable, more details: CASSANDRA-13738
+             StorageMetrics.load.dec(oldSize);
+             cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+             cfs.metric.totalDiskSpaceUsed.dec(oldSize);
+         });
+         txn.runOnAbort(() -> {
+             // the local disk was modified but book keeping couldn't be 
commited, apply the delta
+             long delta = oldSize - newSize; // if new is larger this will be 
negative, so dec will become a inc
+             StorageMetrics.load.dec(delta);
+             cfs.metric.liveDiskSpaceUsed.dec(delta);
+             cfs.metric.totalDiskSpaceUsed.dec(delta);
+         });
+     }
+ 
      @VisibleForTesting
      static Pair<List<SSTableReader>, List<ResampleEntry>> 
distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
      {
diff --cc test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
index 0000000,ddacc6b..73d5e22
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@@ -1,0 -1,139 +1,138 @@@
+ package org.apache.cassandra.io;
+ 
+ import java.io.IOException;
 -import java.util.Collections;
+ import java.util.List;
+ import java.util.Map;
 -import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.Lists;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.io.sstable.IndexSummaryManager;
+ import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ public class DiskSpaceMetricsTest extends CQLTester
+ {
+     /**
+      * This test runs the system with normal operations and makes sure the 
disk metrics match reality
+      */
+     @Test
+     public void baseline() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH 
min_index_interval=1");
+         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ 
+         // disable compaction so nothing changes between calculations
+         cfs.disableAutoCompaction();
+ 
+         // create 100 sstables
+         for (int i = 0; i < 100; i++)
+             insert(cfs, i);
+         assertDiskSpaceEqual(cfs);
+     }
+ 
+     /**
+      * If index summary downsampling is interrupted in the middle, the 
metrics still reflect the real data
+      */
+     @Test
+     public void summaryRedistribution() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH 
min_index_interval=1");
+         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ 
+         // disable compaction so nothing changes between calculations
+         cfs.disableAutoCompaction();
+ 
+         // create 100 sstables, make sure they have more than 1 value, else 
sampling can't happen
+         for (int i = 0; i < 100; i++)
+             insertN(cfs, 10, i);
+         assertDiskSpaceEqual(cfs);
+ 
+         // summary downsample
+         for (int i = 0; i < 100; i++)
+         {
+             indexDownsampleCancelLastSSTable(cfs);
+             assertDiskSpaceEqual(cfs);
+         }
+     }
+ 
+     private void insert(ColumnFamilyStore cfs, long value) throws Throwable
+     {
+         insertN(cfs, 1, value);
+     }
+ 
+     private void insertN(ColumnFamilyStore cfs, int n, long base) throws 
Throwable
+     {
+         for (int i = 0; i < n; i++)
+             execute("INSERT INTO %s (pk) VALUES (?)", base + i);
+ 
+         // flush to write the sstable
+         cfs.forceBlockingFlush();
+     }
+ 
+     private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
+     {
+         long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
+         long actual = 0;
+         for (SSTableReader sstable : 
cfs.getTracker().getView().liveSSTables())
+             actual += sstable.bytesOnDisk();
+ 
+         Assert.assertEquals("bytes on disk does not match current metric 
liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+ 
+         // totalDiskSpaceUsed is based off SStable delete, which is async: 
LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
+         // wait for there to be no more pending sstable releases
+         LifecycleTransaction.waitForDeletions();
+         long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
+         Assert.assertEquals("bytes on disk does not match current metric 
totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+     }
+ 
+     private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore 
cfs)
+     {
+         List<SSTableReader> sstables = 
Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL));
+         LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.UNKNOWN);
 -        Map<UUID, LifecycleTransaction> txns = 
ImmutableMap.of(cfs.metadata.cfId, txn);
++        Map<TableId, LifecycleTransaction> txns = 
ImmutableMap.of(cfs.metadata.id, txn);
+         // fail on the last file (* 3 because we call isStopRequested 3 times 
for each sstable, and we should fail on the last)
+         AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1);
 -        IndexSummaryRedistribution redistribution = new 
IndexSummaryRedistribution(Collections.emptyList(), txns, 0) {
++        IndexSummaryRedistribution redistribution = new 
IndexSummaryRedistribution(txns, 0, 0) {
+             public boolean isStopRequested()
+             {
+                 return countdown.decrementAndGet() == 0;
+             }
+         };
+         try
+         {
+             IndexSummaryManager.redistributeSummaries(redistribution);
+             Assert.fail("Should throw CompactionInterruptedException");
+         }
+         catch (CompactionInterruptedException e)
+         {
+             // trying to get this to happen
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+         finally
+         {
+             try
+             {
+                 FBUtilities.closeAll(txns.values());
+             }
+             catch (Exception e)
+             {
+                 throw new RuntimeException(e);
+             }
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to