This is an automated email from the ASF dual-hosted git repository.

jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 873e024a32 CASSANDRA-17711: Add nodetool forcecompact
873e024a32 is described below

commit 873e024a32d37de08550c8106a8d7fd52bda588b
Author: Cheng Wang <che...@netflix.com>
AuthorDate: Fri Jun 17 15:53:34 2022 -0700

    CASSANDRA-17711: Add nodetool forcecompact
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  29 +++
 .../db/compaction/CompactionIterator.java          |  12 +
 .../cassandra/db/compaction/CompactionManager.java |  29 +++
 .../cassandra/db/partitions/PurgeFunction.java     |  13 +-
 .../apache/cassandra/service/StorageService.java   |  18 ++
 .../cassandra/service/StorageServiceMBean.java     |   7 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |   5 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   8 +-
 .../cassandra/tools/nodetool/ForceCompact.java     |  58 +++++
 .../tools/nodetool/ForceCompactionTest.java        | 285 +++++++++++++++++++++
 11 files changed, 463 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 74055ab158..3fabee8b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC 
grace for given table and partition keys (CASSANDRA-17711)
  * Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, 
CREATE ROLE and DROP ROLE (CASSANDRA-16640)
  * Nodetool bootstrap resume will now return an error if the operation fails 
(CASSANDRA-16491)
  * Disable resumable bootstrap by default (CASSANDRA-17679)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ebe4aeba8a..9fc0f775e1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -320,6 +321,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
     private volatile boolean compactionSpaceCheck = true;
 
+    // Tombtone partitions that ignore the gc_grace_seconds during compaction
+    private final Set<DecoratedKey> partitionKeySetIgnoreGcGrace = 
ConcurrentHashMap.newKeySet();
+
     @VisibleForTesting
     final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
     private volatile ShardBoundaries cachedShardBoundaries = null;
@@ -2416,6 +2420,31 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         CompactionManager.instance.forceCompactionForKey(this, key);
     }
 
+    public void forceCompactionKeysIgnoringGcGrace(String... 
partitionKeysIgnoreGcGrace)
+    {
+        List<DecoratedKey> decoratedKeys = new ArrayList<>();
+        try
+        {
+            partitionKeySetIgnoreGcGrace.clear();
+
+            for (String key : partitionKeysIgnoreGcGrace) {
+                DecoratedKey dk = 
decorateKey(metadata().partitionKeyType.fromString(key));
+                partitionKeySetIgnoreGcGrace.add(dk);
+                decoratedKeys.add(dk);
+            }
+
+            CompactionManager.instance.forceCompactionForKeys(this, 
decoratedKeys);
+        } finally
+        {
+            partitionKeySetIgnoreGcGrace.clear();
+        }
+    }
+
+    public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
+    {
+        return partitionKeySetIgnoreGcGrace.contains(dk);
+    }
+
     public static Iterable<ColumnFamilyStore> all()
     {
         List<Iterable<ColumnFamilyStore>> stores = new 
ArrayList<>(Schema.instance.getKeyspaces().size());
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 2f79f92b84..bceb2b6a97 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -349,6 +349,18 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                 updateBytesRead();
         }
 
+        /*
+         * Called at the beginning of each new partition
+         * Return true if the current partitionKey ignores the 
gc_grace_seconds during compaction.
+         * Note that this method should be called after the onNewPartition 
because it depends on the currentKey
+         * which is set in the onNewPartition
+         */
+        @Override
+        protected boolean shouldIgnoreGcGrace()
+        {
+            return controller.cfs.shouldIgnoreGcGraceForKey(currentKey);
+        }
+
         /*
          * Evaluates whether a tombstone with the given deletion timestamp can 
be purged. This is the minimum
          * timestamp for any sstable containing `currentKey` outside of the 
set of sstables involved in this compaction.
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index dc22b6712a..2e1d94e807 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1047,6 +1047,23 @@ public class CompactionManager implements 
CompactionManagerMBean
         forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable 
-> sstable.maybePresent(key));
     }
 
+    public void forceCompactionForKeys(ColumnFamilyStore cfStore, 
Collection<DecoratedKey> keys)
+    {
+        com.google.common.base.Predicate<SSTableReader> predicate = sstable -> 
{
+            for (DecoratedKey key : keys)
+            {
+                if(sstable.maybePresent(key))
+                {
+                    return true;
+                }
+            }
+
+            return false;
+        };
+
+        forceCompaction(cfStore, () -> sstablesWithKeys(cfStore, keys), 
predicate);
+    }
+
     private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore 
cfs, DecoratedKey key)
     {
         final Set<SSTableReader> sstables = new HashSet<>();
@@ -1060,6 +1077,18 @@ public class CompactionManager implements 
CompactionManagerMBean
         return sstables.isEmpty() ? Collections.emptyList() : sstables;
     }
 
+    private static Collection<SSTableReader> 
sstablesWithKeys(ColumnFamilyStore cfs, Collection<DecoratedKey> decoratedKeys)
+    {
+        final Set<SSTableReader> sstables = new HashSet<>();
+
+        for (DecoratedKey decoratedKey : decoratedKeys)
+        {
+            sstables.addAll(sstablesWithKey(cfs, decoratedKey));
+        }
+
+        return sstables;
+    }
+
     public void forceUserDefinedCompaction(String dataFiles)
     {
         String[] filenames = dataFiles.split(",");
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java 
b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 09f3ae3bbf..5d97fd36b1 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -31,13 +31,15 @@ public abstract class PurgeFunction extends 
Transformation<UnfilteredRowIterator
     private final boolean enforceStrictLiveness;
     private boolean isReverseOrder;
 
+    private boolean ignoreGcGraceSeconds;
+
     public PurgeFunction(int nowInSec, int gcBefore, int 
oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones,
                          boolean enforceStrictLiveness)
     {
         this.nowInSec = nowInSec;
         this.purger = (timestamp, localDeletionTime) ->
                       !(onlyPurgeRepairedTombstones && localDeletionTime >= 
oldestUnrepairedTombstone)
-                      && localDeletionTime < gcBefore
+                      && (localDeletionTime < gcBefore || ignoreGcGraceSeconds)
                       && getPurgeEvaluator().test(timestamp);
         this.enforceStrictLiveness = enforceStrictLiveness;
     }
@@ -59,6 +61,13 @@ public abstract class PurgeFunction extends 
Transformation<UnfilteredRowIterator
     {
     }
 
+    // Called at the beginning of each new partition
+    // Return true if the current partitionKey ignores the gc_grace_seconds 
during compaction.
+    protected boolean shouldIgnoreGcGrace()
+    {
+        return false;
+    }
+
     protected void setReverseOrder(boolean isReverseOrder)
     {
         this.isReverseOrder = isReverseOrder;
@@ -70,6 +79,8 @@ public abstract class PurgeFunction extends 
Transformation<UnfilteredRowIterator
     {
         onNewPartition(partition.partitionKey());
 
+        ignoreGcGraceSeconds = shouldIgnoreGcGrace();
+
         setReverseOrder(partition.isReverseOrder());
         UnfilteredRowIterator purged = Transformation.apply(partition, this);
         if (purged.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d616233104..da130c60a0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4072,6 +4072,24 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
+    /***
+     * Forces compaction for a list of partition keys in a table
+     * The method will ignore the gc_grace_seconds for the 
partitionKeysIgnoreGcGrace during the comapction,
+     * in order to purge the tombstones and free up space quicker.
+     * @param keyspaceName keyspace name
+     * @param tableName table name
+     * @param partitionKeysIgnoreGcGrace partition keys ignoring the 
gc_grace_seconds
+     * @throws IOException on any I/O operation error
+     * @throws ExecutionException when attempting to retrieve the result of a 
task that aborted by throwing an exception
+     * @throws InterruptedException when a thread is waiting, sleeping, or 
otherwise occupied, and the thread is interrupted, either before or during the 
activity
+     */
+    public void forceCompactionKeysIgnoringGcGrace(String keyspaceName,
+                                                   String tableName, String... 
partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, 
InterruptedException
+    {
+        ColumnFamilyStore cfStore = 
getValidKeyspace(keyspaceName).getColumnFamilyStore(tableName);
+        cfStore.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace);
+    }
+
     /**
      * Takes the snapshot for the given keyspaces. A snapshot name must be 
specified.
      *
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 101c9a3ebf..c92ea72bd6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -364,6 +364,13 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      */
     public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, 
String partitionKey, String... tableNames) throws IOException, 
ExecutionException, InterruptedException;
 
+    /**
+     * Forces compaction for a list of partition keys on a table.
+     * The method will ignore the gc_grace_seconds for the 
partitionKeysIgnoreGcGrace during the comapction,
+     * in order to purge the tombstones and free up space quicker.
+     */
+    public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String 
tableName, String... partitionKeysIgnoreGcGrace) throws IOException, 
ExecutionException, InterruptedException;
+
     /**
      * Trigger a cleanup of keys on a single keyspace
      */
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 7abede2ca8..e296d39723 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -460,6 +460,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, 
partitionKey, tableNames);
     }
 
+    public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String 
tableName, String... partitionKeysIgnoreGcGrace) throws IOException, 
ExecutionException, InterruptedException
+    {
+        ssProxy.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, 
partitionKeysIgnoreGcGrace);
+    }
+
     public void forceKeyspaceFlush(String keyspaceName, String... tableNames) 
throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8d87c88906..5dca8eda73 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -225,7 +225,8 @@ public class NodeTool
                 UpgradeSSTable.class,
                 Verify.class,
                 Version.class,
-                ViewBuildStatus.class
+                ViewBuildStatus.class,
+                ForceCompact.class
         );
 
         Cli.CliBuilder<NodeToolCmdRunnable> builder = Cli.builder("nodetool");
@@ -484,6 +485,11 @@ public class NodeTool
         {
             return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : 
toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
         }
+
+        protected String[] parsePartitionKeys(List<String> cmdArgs)
+        {
+            return cmdArgs.size() <= 2 ? EMPTY_STRING_ARRAY : 
toArray(cmdArgs.subList(2, cmdArgs.size()), String.class);
+        }
     }
 
     public static SortedMap<String, SetHostStatWithPort> 
getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java 
b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java
new file mode 100644
index 0000000000..99265e7bf0
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "forcecompact", description = "Force a (major) compaction on a 
table")
+public class ForceCompact extends NodeToolCmd
+{
+    @Arguments(usage = "[<keyspace> <table> <keys>]", description = "The 
keyspace, table, and a list of partition keys ignoring the gc_grace_seconds")
+    private List<String> args = new ArrayList<>();
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        // Check if the input has valid size
+        checkArgument(args.size() >= 3, "forcecompact requires keyspace, table 
and keys args");
+
+        // We rely on lower-level APIs to check and throw exceptions if the 
input keyspace or table name are invalid
+        String keyspaceName = args.get(0);
+        String tableName = args.get(1);
+        String[] partitionKeysIgnoreGcGrace = parsePartitionKeys(args);
+
+        try
+        {
+            probe.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, 
partitionKeysIgnoreGcGrace);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Error occurred during compaction 
keys", e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java 
b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java
new file mode 100644
index 0000000000..04d369ec6d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class ForceCompactionTest extends CQLTester
+{
+    private final static int NUM_PARTITIONS = 10;
+    private final static int NUM_ROWS = 100;
+
+    @Before
+    public void setup() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key text, c1 text, c2 text, c3 text, 
PRIMARY KEY (key, c1))");
+
+        for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; 
partitionCount++)
+        {
+            for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++)
+            {
+                execute("INSERT INTO %s (key, c1, c2, c3) VALUES (?, ?, ?, ?)",
+                        "k" + partitionCount, "c1_" + rowCount, "c2_" + 
rowCount, "c3_" + rowCount);
+            }
+        }
+
+        // Disable auto compaction
+        // NOTE: We can only disable the auto compaction once the table is 
created because the setting is on
+        // the table level. And we don't need to re-enable it back because the 
table will be dropped after the test.
+        disableCompaction();
+    }
+
+    @Test
+    public void forceCompactPartitionTombstoneTest() throws Throwable
+    {
+        String keyToPurge = "k0";
+
+        testHelper("DELETE FROM %s WHERE key = ?", keyToPurge);
+    }
+
+    @Test
+    public void forceCompactMultiplePartitionsTombstoneTest() throws Throwable
+    {
+        List<String> keysToPurge = new ArrayList<>();
+        Random rand = new Random();
+
+        int numPartitionsToPurge = 1 + rand.nextInt(NUM_PARTITIONS);
+        for (int count = 0; count < numPartitionsToPurge; count++)
+        {
+            String key = "k" + rand.nextInt(NUM_PARTITIONS);
+
+            execute("DELETE FROM %s WHERE key = ?", key);
+            keysToPurge.add(key);
+        }
+
+        flush();
+
+        String[] keys = new String[keysToPurge.size()];
+        keys = keysToPurge.toArray(keys);
+        forceCompact(keys);
+
+        verifyNotContainsTombstones();
+    }
+
+    @Test
+    public void forceCompactRowTombstoneTest() throws Throwable
+    {
+        String keyToPurge = "k0";
+
+        testHelper("DELETE FROM %s WHERE key = ? AND c1 = 'c1_0'", keyToPurge);
+    }
+
+    @Test
+    public void forceCompactMultipleRowsTombstoneTest() throws Throwable
+    {
+        List<String> keysToPurge = new ArrayList<>();
+
+        Random randPartition = new Random();
+        Random randRow = new Random();
+
+        for (int count = 0; count < 10; count++)
+        {
+            String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS);
+            String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS);
+
+            execute("DELETE FROM %s WHERE key = ? AND c1 = ?", partitionKey, 
clusteringKey);
+            keysToPurge.add(partitionKey);
+        }
+
+        flush();
+
+        String[] keys = new String[keysToPurge.size()];
+        keys = keysToPurge.toArray(keys);
+        forceCompact(keys);
+
+        verifyNotContainsTombstones();
+    }
+
+    @Test
+    public void forceCompactCellTombstoneTest() throws Throwable
+    {
+        String keyToPurge = "k0";
+        testHelper("DELETE c2 FROM %s WHERE key = ? AND c1 = 'c1_0'", 
keyToPurge);
+    }
+
+    @Test
+    public void forceCompactMultipleCellsTombstoneTest() throws Throwable
+    {
+        List<String> keysToPurge = new ArrayList<>();
+
+        Random randPartition = new Random();
+        Random randRow = new Random();
+
+        for (int count = 0; count < 10; count++)
+        {
+            String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS);
+            String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS);
+
+            execute("DELETE c2, c3 FROM %s WHERE key = ? AND c1 = ?", 
partitionKey, clusteringKey);
+            keysToPurge.add(partitionKey);
+        }
+
+        flush();
+
+        String[] keys = new String[keysToPurge.size()];
+        keys = keysToPurge.toArray(keys);
+        forceCompact(keys);
+
+        verifyNotContainsTombstones();
+    }
+
+    @Test
+    public void forceCompactUpdateCellTombstoneTest() throws Throwable
+    {
+        String keyToPurge = "k0";
+        testHelper("UPDATE %s SET c2 = null WHERE key = ? AND c1 = 'c1_0'", 
keyToPurge);
+    }
+
+    @Test
+    public void forceCompactTTLExpiryTest() throws Throwable
+    {
+        int ttlSec = 2;
+        String keyToPurge = "k0";
+
+        execute("UPDATE %s USING TTL ? SET c2 = 'bbb' WHERE key = ? AND c1 = 
'c1_0'", ttlSec, keyToPurge);
+
+        flush();
+
+        // Wait until the TTL has been expired
+        // NOTE: we double the wait time of the ttl to be on the safer side 
and avoid the flakiness of the test
+        Thread.sleep(ttlSec * 1000 * 2);
+
+        String[] keysToPurge = new String[]{keyToPurge};
+        forceCompact(keysToPurge);
+
+        verifyNotContainsTombstones();
+    }
+
+    @Test
+    public void forceCompactCompositePartitionKeysTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key1 text, key2 text, c1 text, c2 text, 
PRIMARY KEY ((key1, key2), c1))");
+
+        for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; 
partitionCount++)
+        {
+            for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++)
+            {
+                execute("INSERT INTO %s (key1, key2, c1, c2) VALUES (?, ?, ?, 
?)",
+                        "k1_" + partitionCount, "k2_" + partitionCount, "c1_" 
+ rowCount, "c2_" + rowCount);
+            }
+        }
+
+        // Disable auto compaction
+        // NOTE: We can only disable the auto compaction once the table is 
created because the setting is on
+        // the table level. And we don't need to re-enable it back because the 
table will be dropped after the test.
+        disableCompaction();
+
+        String keyToPurge = "k1_0:k2_0";
+
+        execute("DELETE FROM %s WHERE key1 = 'k1_0' and key2 = 'k2_0'");
+
+        flush();
+
+        String[] keysToPurge = new String[]{keyToPurge};
+        forceCompact(keysToPurge);
+
+        verifyNotContainsTombstones();
+    }
+
+    private void testHelper(String cqlStatement, String keyToPurge) throws 
Throwable
+    {
+        execute(cqlStatement, keyToPurge);
+
+        flush();
+
+        String[] keysToPurge = new String[]{keyToPurge};
+        forceCompact(keysToPurge);
+
+        verifyNotContainsTombstones();
+    }
+
+    private void forceCompact(String[] partitionKeysIgnoreGcGrace)
+    {
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        if (cfs != null)
+        {
+            cfs.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace);
+        }
+    }
+
+    private void verifyNotContainsTombstones()
+    {
+        // Get sstables
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+
+        // always run a major compaction before calling this
+        assertTrue(sstables.size() == 1);
+
+        SSTableReader sstable = sstables.iterator().next();
+        int actualPurgedTombstoneCount = 0;
+        try (ISSTableScanner scanner = sstable.getScanner())
+        {
+            while (scanner.hasNext())
+            {
+                try (UnfilteredRowIterator iter = scanner.next())
+                {
+                    // Partition should be all alive
+                    assertTrue(iter.partitionLevelDeletion().isLive());
+
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        if (atom.isRow())
+                        {
+                            Row r = (Row)atom;
+
+                            // Row should be all alive
+                            assertTrue(r.deletion().isLive());
+
+                            // Cell should be alive as well
+                            for (Cell c : r.cells())
+                            {
+                                assertFalse(c.isTombstone());
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to