This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 446a9d1d011be89e33970def2eb25366d6a24122
Merge: 402e2f2f73 1053e3b475
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Thu May 18 10:34:49 2023 -0700

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionController.java        |   2 +-
 .../cassandra/db/compaction/CompactionTask.java    |  25 ++-
 .../test/CompactionOverlappingSSTableTest.java     | 115 ++++++++++++
 .../db/compaction/PartialCompactionsTest.java      | 207 +++++++++++++++++++++
 5 files changed, 341 insertions(+), 9 deletions(-)

diff --cc CHANGES.txt
index baf11ee6c0,6db0e3b084..6167e04416
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -4.0.10
 +4.1.2
 + * Allow keystore and trustrore passwords to be nullable (CASSANDRA-18124)
 + * Return snapshots with dots in their name in nodetool listsnapshots 
(CASSANDRA-18371)
 + * Fix NPE when loading snapshots and data directory is one directory from 
root (CASSANDRA-18359)
 + * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304)
 + * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353)
 + * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354)
 +Merged from 4.0:
+  * Partial compaction can resurrect deleted data (CASSANDRA-18507)
   * Allow internal address to change with reconnecting snitches 
(CASSANDRA-16718)
   * Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
   * NPE when deserializing malformed collections from client (CASSANDRA-18505)
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index dc08f5ae01,90abac3fb2..5fc8031966
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -88,13 -83,14 +88,14 @@@ public class CompactionTask extends Abs
          if (partialCompactionsAcceptable() && transaction.originals().size() 
> 1)
          {
              // Try again w/o the largest one.
-             logger.warn("insufficient space to compact all requested files. 
{}MiB required, {} for compaction {}",
+             SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
 -            logger.warn("insufficient space to compact all requested files. 
{}MB required, {} for compaction {} - removing largest SSTable: {}",
++            logger.warn("insufficient space to compact all requested files. 
{}MiB required, {} for compaction {} - removing largest SSTable: {}",
                          (float) expectedSize / 1024 / 1024,
                          StringUtils.join(transaction.originals(), ", "),
-                         transaction.opId());
+                         transaction.opId(),
+                         removedSSTable);
              // Note that we have removed files that are still marked as 
compacting.
              // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
-             SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
              transaction.cancel(removedSSTable);
              return true;
          }
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
index 0000000000,54f8ad7a7a..6a65c91437
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
@@@ -1,0 -1,114 +1,115 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.stream.Collectors;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Uninterruptibles;
+ import org.junit.Test;
+ 
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.MethodDelegation;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class CompactionOverlappingSSTableTest extends TestBaseImpl
+ {
+     @Test
 -    public void partialCompactionOverlappingTest() throws IOException
++    public void partialCompactionOverlappingTest() throws IOException, 
TimeoutException
+     {
+ 
+         try (Cluster cluster = init(builder().withNodes(1)
+                                              .withDataDirCount(1)
+                                              
.withInstanceInitializer(BB::install)
+                                              .start()))
+         {
+             cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+             cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) with compaction = {'class':'SizeTieredCompactionStrategy', 
'enabled': 'false'} AND gc_grace_seconds=0"));
+             Set<Integer> expected = Sets.newHashSetWithExpectedSize(990);
+             for (int i = 0; i < 1000; i++)
+             {
+                 cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.ONE, i);
+                 if (i >= 10)
+                     expected.add(i);
+             }
+             cluster.get(1).flush(KEYSPACE);
+             for (int i = 0; i < 10; i++)
+             {
+                 cluster.coordinator(1).execute(withKeyspace("delete from 
%s.tbl where id = ?"), ConsistencyLevel.ONE, i);
+                 cluster.get(1).flush(KEYSPACE);
+             }
+             assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                          .map(x -> x[0])
+                                          .collect(Collectors.toSet()));
+ 
+             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // 
make sure tombstones are gc:able
+ 
+             cluster.get(1).runOnInstance(() -> {
+                 BB.enabled.set(true);
+                 ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                 cfs.forceMajorCompaction();
+                 assertEquals("We should have 2 sstables (not 1) after major 
compaction since we reduced the scope of the compaction",
+                              2, 
Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
+             });
+             assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                          .map(x -> x[0])
+                                          .collect(Collectors.toSet()));
+         }
+     }
+ 
+     public static class BB
+     {
+         static AtomicBoolean enabled = new AtomicBoolean();
+         public static void install(ClassLoader cl, Integer i)
+         {
+             new ByteBuddy().rebase(Directories.class)
+                            
.method(named("hasAvailableDiskSpace").and(takesArguments(2)))
+                            .intercept(MethodDelegation.to(BB.class))
+                            .make()
+                            .load(cl, ClassLoadingStrategy.Default.INJECTION);
+         }
+ 
+         public static boolean hasAvailableDiskSpace(long ignore1, long 
ignore2)
+         {
+             if (enabled.get())
+             {
+                 enabled.set(false);
+                 return false;
+             }
+             return true;
+         }
+     }
+ }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
index 0000000000,9cb3872f80..b922ca8057
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
@@@ -1,0 -1,207 +1,207 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.Iterator;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.schema.TableMetadataRef;
+ import org.apache.cassandra.utils.CloseableIterator;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.hamcrest.MatcherAssert.assertThat;
+ import static org.hamcrest.CoreMatchers.instanceOf;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class PartialCompactionsTest extends SchemaLoader
+ {
+     static final String KEYSPACE = 
PartialCompactionsTest.class.getSimpleName();
+     static final String TABLE = "testtable";
+ 
+     @BeforeClass
+     public static void initSchema()
+     {
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, 
TABLE));
+ 
+         LimitableDataDirectory.applyTo(KEYSPACE, TABLE);
+     }
+ 
+     @Before
+     public void prepareCFS()
+     {
+         LimitableDataDirectory.setAvailableSpace(cfStore(), null);
+     }
+ 
+     @After
+     public void truncateCF()
+     {
+         cfStore().truncateBlocking();
+         LifecycleTransaction.waitForDeletions();
+     }
+ 
+     private static ColumnFamilyStore cfStore()
+     {
+         return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+     }
+ 
+     @Test
+     public void 
shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace()
+     {
+         // given
+         ColumnFamilyStore cfs = cfStore();
+         int few = 10, many = 10 * few;
+ 
+         // a large sstable as the oldest
+         createDataSSTable(cfs, 0, many);
+         // more inserts (to have more than one sstable to compact)
+         createDataSSTable(cfs, many, many + few);
+         // delete data that's in both of the prior sstables
+         createTombstonesSSTable(cfs, many - few / 2, many + few / 2);
+ 
+         // emulate there not being enough space to compact all sstables
+         LimitableDataDirectory.setAvailableSpace(cfs, 
enoughSpaceForAllButTheLargestSSTable(cfs));
+ 
+         // when - run a compaction where all tombstones have timed out
+         
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+ 
+         // then - the tombstones should not be removed
+         assertEquals("live sstables after compaction", 2, 
cfs.getLiveSSTables().size());
+         assertEquals("remaining live rows after compaction", many, 
liveRows(cfs));
+     }
+ 
+     private static long 
enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs)
+     {
+         long totalSize = 1, maxSize = 0;
+         for (SSTableReader ssTable : cfs.getLiveSSTables())
+         {
+             long size = ssTable.onDiskLength();
+             if (size > maxSize) maxSize = size;
+             totalSize += size;
+         }
+         return totalSize - maxSize;
+     }
+ 
+     private static int liveRows(ColumnFamilyStore cfs)
+     {
+         return Util.getAll(Util.cmd(cfs, "key1").build()).stream()
+                    .map(partition -> count(partition.rowIterator()))
+                    .reduce(Integer::sum)
+                    .orElse(0);
+     }
+ 
+     private static int count(Iterator<?> iter)
+     {
+         try (CloseableIterator<?> unused = iter instanceof CloseableIterator 
? (CloseableIterator<?>) iter : null)
+         {
+             int count = 0;
+             for (; iter.hasNext(); iter.next())
+             {
+                 count++;
+             }
+             return count;
+         }
+     }
+ 
+     private static void createDataSSTable(ColumnFamilyStore cfs, int 
firstKey, int endKey)
+     {
+         for (int i = firstKey; i < endKey; i++)
+         {
+             new RowUpdateBuilder(cfs.metadata(), 0, "key1")
+             .clustering(String.valueOf(i))
+             .add("val", String.valueOf(i))
+             .build()
+             .applyUnsafe();
+         }
 -        cfs.forceBlockingFlush();
++        cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+     }
+ 
+     private static void createTombstonesSSTable(ColumnFamilyStore cfs, int 
firstKey, int endKey)
+     {
+         for (int i = firstKey; i < endKey; i++)
+         {
+             RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", 
String.valueOf(i)).applyUnsafe();
+         }
 -        cfs.forceBlockingFlush();
++        cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+     }
+ 
+     private static class LimitableDataDirectory extends 
Directories.DataDirectory
+     {
+         private Long availableSpace;
+ 
+         LimitableDataDirectory(Directories.DataDirectory dataDirectory)
+         {
+             super(dataDirectory.location);
+         }
+ 
+         @Override
+         public long getAvailableSpace()
+         {
+             if (availableSpace != null)
+                 return availableSpace;
+             return super.getAvailableSpace();
+         }
+ 
+         public static void setAvailableSpace(ColumnFamilyStore cfs, Long 
availableSpace)
+         {
+             for (Directories.DataDirectory location : 
cfs.getDirectories().getWriteableLocations())
+             {
+                 assertThat("ColumnFamilyStore set up with ability to emulate 
limited disk space",
+                            location, 
instanceOf(LimitableDataDirectory.class));
+                 ((LimitableDataDirectory) location).availableSpace = 
availableSpace;
+             }
+         }
+ 
+         public static void applyTo(String ks, String cf)
+         {
+             Keyspace keyspace = Keyspace.open(ks);
+             ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+             TableMetadataRef metadata = store.metadata;
 -            keyspace.dropCf(metadata.id);
++            keyspace.dropCf(metadata.id, true);
+             ColumnFamilyStore cfs = 
ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, 
wrapDirectoriesOf(store), false, false, true);
+             keyspace.initCfCustom(cfs);
+         }
+ 
+         private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs)
+         {
+             Directories.DataDirectory[] original = 
cfs.getDirectories().getWriteableLocations();
+             Directories.DataDirectory[] wrapped = new 
Directories.DataDirectory[original.length];
+             for (int i = 0; i < wrapped.length; i++)
+             {
+                 wrapped[i] = new LimitableDataDirectory(original[i]);
+             }
+             return new Directories(cfs.metadata(), wrapped);
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to