This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0767c834162d74bdc857f47615b9e8a5c7e76d5b
Merge: baa9d0327f 730b898b74
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Fri Dec 23 14:41:50 2022 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../compaction/TimeWindowCompactionStrategy.java   | 14 ++++++++++++
 test/unit/org/apache/cassandra/MockSchema.java     | 19 ++++++++++++----
 .../TimeWindowCompactionStrategyTest.java          | 25 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index ad25802839,3900ab5b58..ee21b34ebc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.29
 +3.11.15
 + * Fix Splitter sometimes creating more splits than requested 
(CASSANDRA-18013)
 +
 +Merged from 3.0:
+  * Avoid anticompaction mixing data from two different time windows with TWCS 
(CASSANDRA-17970)
   * Do not spam the logs with MigrationCoordinator not being able to pull 
schemas (CASSANDRA-18096)
   * Fix incorrect resource name in LIST PERMISSION output (CASSANDRA-17848)
   * Suppress CVE-2022-41854 and similar (CASSANDRA-18083)
diff --cc 
src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 74e5f9d95a,5ae1cc784c..bbc9cdf8ee
--- 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@@ -376,9 -333,23 +376,23 @@@ public class TimeWindowCompactionStrate
          LifecycleTransaction txn = 
cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
          if (txn == null)
              return null;
 -        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
 +        return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, 
gcBefore, options.ignoreOverlaps));
      }
  
+     /**
+      * TWCS should not group sstables for anticompaction - this can mix new 
and old data
+      */
+     @Override
+     public Collection<Collection<SSTableReader>> 
groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+     {
+         Collection<Collection<SSTableReader>> groups = new 
ArrayList<>(sstablesToGroup.size());
+         for (SSTableReader sstable : sstablesToGroup)
+         {
+             groups.add(Collections.singleton(sstable));
+         }
+         return groups;
+     }
+ 
      @Override
      @SuppressWarnings("resource") // transaction is closed by 
AbstractCompactionTask::execute
      public synchronized AbstractCompactionTask 
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
diff --cc test/unit/org/apache/cassandra/MockSchema.java
index 2b480d8e3c,5f3198dad0..90c8e4c705
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@@ -85,27 -86,18 +85,37 @@@ public class MockSchem
          return sstable(generation, size, false, cfs);
      }
  
 +    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, keepRef, generation, generation, 
cfs);
 +    }
 +
 +    public static SSTableReader sstableWithLevel(int generation, long 
firstToken, long lastToken, int level, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, 0, false, firstToken, lastToken, level, 
cfs);
 +    }
 +
 +    public static SSTableReader sstableWithLevel(int generation, int size, 
int level, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, false, generation, generation, 
level, cfs);
 +    }
 +
+     public static SSTableReader sstableWithTimestamp(int generation, long 
timestamp, ColumnFamilyStore cfs)
+     {
 -        return sstable(generation, 0, false, timestamp, cfs);
++        return sstable(generation, 0, false, 0, 1000, 0, Integer.MAX_VALUE, 
timestamp, cfs);
+     }
+ 
 -    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, ColumnFamilyStore cfs)
 +    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
      {
 -        return sstable(generation, size, keepRef, System.currentTimeMillis() 
* 1000, cfs);
 +        return sstable(generation, size, keepRef, firstToken, lastToken, 0, 
cfs);
      }
  
 +    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
++    {
++        return sstable(generation, size, keepRef, firstToken, lastToken, 
level, Integer.MAX_VALUE, System.currentTimeMillis() * 1000, cfs);
++    }
+ 
 -    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long timestamp, ColumnFamilyStore cfs)
++    public static SSTableReader sstable(int generation, int size, boolean 
keepRef, long firstToken, long lastToken, int level, int minLocalDeletionTime, 
long timestamp, ColumnFamilyStore cfs)
      {
          Descriptor descriptor = new 
Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                 cfs.keyspace.getName(),
@@@ -123,40 -115,37 +133,41 @@@
              {
              }
          }
 -        SegmentedFile segmentedFile = new BufferedSegmentedFile(new 
ChannelProxy(tempFile), RandomAccessReader.DEFAULT_BUFFER_SIZE, size);
 -        if (size > 0)
 +        // .complete() with size to make sstable.onDiskLength work
 +        try (FileHandle.Builder builder = new FileHandle.Builder(new 
ChannelProxy(tempFile)).bufferSize(size);
 +             FileHandle fileHandle = builder.complete(size))
          {
 -            try
 +            if (size > 0)
              {
 -                File file = new File(descriptor.filenameFor(Component.DATA));
 -                try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
 +                try
                  {
 -                    raf.setLength(size);
 +                    File file = new 
File(descriptor.filenameFor(Component.DATA));
 +                    try (RandomAccessFile raf = new RandomAccessFile(file, 
"rw"))
 +                    {
 +                        raf.setLength(size);
 +                    }
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new RuntimeException(e);
                  }
              }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 +            SerializationHeader header = 
SerializationHeader.make(cfs.metadata, Collections.emptyList());
-             StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata.comparator)
-                     .sstableLevel(level)
-                     
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 
0.01f, UNREPAIRED_SSTABLE, header)
-                     .get(MetadataType.STATS);
++            MetadataCollector collector = new 
MetadataCollector(cfs.metadata.comparator);
++            collector.update(new DeletionTime(timestamp, 
minLocalDeletionTime));
++            StatsMetadata metadata = (StatsMetadata) 
collector.sstableLevel(level)
++                                                              
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 
0.01f, UNREPAIRED_SSTABLE, header)
++                                                              
.get(MetadataType.STATS);
 +            SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
 +                    fileHandle.sharedCopy(), fileHandle.sharedCopy(), 
indexSummary.sharedCopy(),
 +                    new AlwaysPresentFilter(), 1L, metadata, 
SSTableReader.OpenReason.NORMAL, header);
 +            reader.first = readerBounds(firstToken);
 +            reader.last = readerBounds(lastToken);
 +            if (!keepRef)
 +                reader.selfRef().release();
 +            return reader;
          }
 -        SerializationHeader header = SerializationHeader.make(cfs.metadata, 
Collections.emptyList());
 -        MetadataCollector collector = new 
MetadataCollector(cfs.metadata.comparator);
 -        collector.update(new DeletionTime(timestamp, (int) 
(System.currentTimeMillis() / 1000)));
 -        StatsMetadata metadata = (StatsMetadata) 
collector.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(),
 -                                                                            
0.01f,
 -                                                                            
-1,
 -                                                                            
header).get(MetadataType.STATS);
 -
 -        SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
 -                                                          
segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), 
indexSummary.sharedCopy(),
 -                                                          new 
AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
 -        reader.first = reader.last = readerBounds(generation);
 -        if (!keepRef)
 -            reader.selfRef().release();
 -        return reader;
 +
      }
  
      public static ColumnFamilyStore newCFS()
diff --cc 
test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index ee7952bde4,9bed7c1c94..15d2a2e705
--- 
a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@@ -25,8 -26,10 +26,9 @@@ import java.util.Map
  import java.util.concurrent.TimeUnit;
  
  import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
  
 -
  import org.junit.BeforeClass;
  import org.junit.Test;
  import static org.junit.Assert.assertEquals;
@@@ -46,7 -48,8 +47,8 @@@ import org.apache.cassandra.db.RowUpdat
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.MockSchema;
 +import org.apache.cassandra.utils.Pair;
  
  import static 
org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
  import static 
org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
@@@ -307,67 -282,24 +309,88 @@@ public class TimeWindowCompactionStrate
          t.transaction.abort();
      }
  
 +    @Test
 +    public void testDropOverlappingExpiredSSTables() throws 
InterruptedException
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 +        cfs.truncateBlocking();
 +        cfs.disableAutoCompaction();
 +
 +        long timestamp = System.currentTimeMillis();
 +        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
 +
 +        // Create a expiring sstable with a TTL
 +        DecoratedKey key = Util.dk("expired");
 +        new RowUpdateBuilder(cfs.metadata, timestamp, TTL_SECONDS, 
key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +
 +        cfs.forceBlockingFlush();
 +        SSTableReader expiredSSTable = 
cfs.getLiveSSTables().iterator().next();
 +        Thread.sleep(10);
 +
 +        // Create a second sstable without TTL and with a row superceded by 
the expiring row
 +        new RowUpdateBuilder(cfs.metadata, timestamp - 1000, key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +        key = Util.dk("nonexpired");
 +        new RowUpdateBuilder(cfs.metadata, timestamp, key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +
 +        cfs.forceBlockingFlush();
 +        assertEquals(cfs.getLiveSSTables().size(), 2);
 +
 +        Map<String, String> options = new HashMap<>();
 +        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
 +        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"SECONDS");
 +        
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
 +        
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
 "0");
 +        TimeWindowCompactionStrategy twcs = new 
TimeWindowCompactionStrategy(cfs, options);
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +            twcs.addSSTable(sstable);
 +
 +        twcs.startup();
 +        assertNull(twcs.getNextBackgroundTask(nowInSeconds()));
 +
 +        // Wait for the expiration of the first sstable
 +        Thread.sleep(TimeUnit.SECONDS.toMillis(TTL_SECONDS + 1));
 +        assertNull(twcs.getNextBackgroundTask(nowInSeconds()));
 +
 +        
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY,
 "true");
 +        twcs = new TimeWindowCompactionStrategy(cfs, options);
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +            twcs.addSSTable(sstable);
 +
 +        twcs.startup();
 +        AbstractCompactionTask t = twcs.getNextBackgroundTask(nowInSeconds());
 +        assertNotNull(t);
 +        assertEquals(1, Iterables.size(t.transaction.originals()));
 +        SSTableReader sstable = t.transaction.originals().iterator().next();
 +        assertEquals(sstable, expiredSSTable);
 +        twcs.shutdown();
 +        t.transaction.abort();
 +    }
++
+     @Test
+     public void testGroupForAntiCompaction()
+     {
+         ColumnFamilyStore cfs = 
MockSchema.newCFS("test_group_for_anticompaction");
+         cfs.setCompactionParameters(ImmutableMap.of("class", 
"TimeWindowCompactionStrategy",
+                                                     "timestamp_resolution", 
"MILLISECONDS",
+                                                     "compaction_window_size", 
"1",
+                                                     "compaction_window_unit", 
"MINUTES"));
+ 
+         List<SSTableReader> sstables = new ArrayList<>(10);
+         long curr = System.currentTimeMillis();
+         for (int i = 0; i < 10; i++)
+             sstables.add(MockSchema.sstableWithTimestamp(i, curr + 
TimeUnit.MILLISECONDS.convert(i, TimeUnit.MINUTES), cfs));
+ 
+         cfs.addSSTables(sstables);
 -        Collection<Collection<SSTableReader>> groups = 
cfs.getCompactionStrategyManager().getStrategies().get(1).groupSSTablesForAntiCompaction(sstables);
++        Collection<Collection<SSTableReader>> groups = 
cfs.getCompactionStrategyManager().getCompactionStrategyFor(sstables.get(0)).groupSSTablesForAntiCompaction(sstables);
+         assertTrue(groups.size() > 0);
+         for (Collection<SSTableReader> group : groups)
+             assertEquals(1, group.size());
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to