This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6cac24f581e9f6b719f3c0f9bc5c9df2e03c682a
Merge: 5c44922a5a 3ba6de70c7
Author: Bereng <berenguerbl...@gmail.com>
AuthorDate: Mon Nov 20 07:35:51 2023 +0100

    Merge branch 'cassandra-4.0' into cassandra-4.1
    
    * cassandra-4.0:
      Test failure: 
org.apache.cassandra.db.commitlog.CommitLogSegmentManagerCDCTest

 .../cassandra/db/commitlog/CommitLogSegment.java   |  2 +-
 .../commitlog/CommitLogSegmentManagerCDCTest.java  | 54 ++++++++++++----------
 2 files changed, 30 insertions(+), 26 deletions(-)

diff --cc 
test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 3789b51714,21962c8b7b..9ad386ecbe
--- 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@@ -78,17 -94,13 +77,16 @@@ public class CommitLogSegmentManagerCDC
              execute("INSERT INTO %s (idx, data) VALUES (1, '1');");
  
              // Confirm that, on flush+recyle, we see files show up in cdc_raw
 -            
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
 +            CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
 +            Keyspace.open(keyspace())
 +                    .getColumnFamilyStore(currentTable())
 +                    
.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
              CommitLog.instance.forceRecycleAllSegments();
              cdcMgr.awaitManagementTasksCompletion();
-             Assert.assertTrue("Expected files to be moved to overflow.", 
getCDCRawCount() > 0);
+             Assert.assertTrue("Expected files to be moved to overflow.", 
getCDCRawFiles().length > 0);
  
              // Simulate a CDC consumer reading files then deleting them
-             for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
-                 FileUtils.deleteWithConfirm(f);
+             deleteCDCRawFiles();
  
              // Update size tracker to reflect deleted files. Should flip flag 
on current allocatingFrom to allow.
              cdcMgr.updateCDCTotalSize();
@@@ -336,9 -375,9 +337,9 @@@
          List<CDCIndexData> results = new ArrayList<>();
          try
          {
-             for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
+             for (File f : getCDCRawFiles())
              {
 -                if (f.getName().contains("_cdc.idx"))
 +                if (f.name().contains("_cdc.idx"))
                      results.add(new CDCIndexData(f));
              }
          }
@@@ -356,16 -395,13 +357,12 @@@
  
          CDCIndexData(File f) throws IOException
          {
-             String line = "";
+             String line;
 -            try (BufferedReader br = new BufferedReader(new 
InputStreamReader(new FileInputStream(f))))
 +            try (BufferedReader br = new BufferedReader(new FileReader(f)))
              {
                  line = br.readLine();
              }
-             catch (Exception e)
-             {
-                 throw e;
-             }
 -
 -            fileName = f.getName();
 +            fileName = f.name();
              offset = Integer.parseInt(line);
          }
  
@@@ -407,107 -440,16 +401,117 @@@
          }
      }
  
 +    private void testWithNonblockingMode(Testable test) throws Throwable
 +    {
 +        boolean original = DatabaseDescriptor.getCDCBlockWrites();
 +        CommitLog.instance.setCDCBlockWrites(false);
 +        try
 +        {
 +            test.run();
 +        }
 +        finally
 +        {
 +            CommitLog.instance.setCDCBlockWrites(original);
 +        }
 +    }
 +
 +    private void testWithCDCSpaceInMb(int size, Testable test) throws 
Throwable
 +    {
 +        int origSize = (int) DatabaseDescriptor.getCDCTotalSpace() / 1024 / 
1024;
 +        DatabaseDescriptor.setCDCTotalSpaceInMiB(size);
 +        try
 +        {
 +            test.run();
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setCDCTotalSpaceInMiB(origSize);
 +        }
 +    }
 +
 +    private String createTableAndBulkWrite() throws Throwable
 +    {
 +        return 
createTableAndBulkWrite(DatabaseDescriptor.getCommitLogSegmentSize() / 3);
 +    }
 +
 +    private String createTableAndBulkWrite(int mutationSize) throws Throwable
 +    {
 +        String tableName = createTable("CREATE TABLE %s (idx int, data text, 
primary key(idx)) WITH cdc=true;");
 +        bulkWrite(tableName, mutationSize);
 +        return tableName;
 +    }
 +
 +    private void bulkWrite(String tableName) throws Throwable
 +    {
 +        bulkWrite(tableName, DatabaseDescriptor.getCommitLogSegmentSize() / 
3);
 +    }
 +
 +    private void bulkWrite(String tableName, int mutationSize) throws 
Throwable
 +    {
 +        TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName).metadata();
 +        boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
 +        // Spin to make sure we hit CDC capacity
 +        try
 +        {
 +            for (int i = 0; i < 1000; i++)
 +            {
 +                new RowUpdateBuilder(ccfm, 0, i)
 +                .add("data", randomizeBuffer(mutationSize))
 +                .build().applyFuture().get();
 +            }
 +            if (blockWrites)
 +                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
 +        }
 +        catch (CDCWriteException e)
 +        {
 +            if (!blockWrites)
 +                Assert.fail("Excepted no CDCWriteException when not blocking 
writes but received it.");
 +        }
 +    }
 +
 +    private void testSegmentFlaggingOnCreation0() throws Throwable
 +    {
 +        testWithCDCSpaceInMb(16, () -> {
 +            boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
 +
 +            createTableAndBulkWrite();
 +
 +            CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
 +            expectCurrentCDCState(blockWrites? CDCState.FORBIDDEN : 
CDCState.CONTAINS);
 +
 +            // When block writes, releasing CDC commit logs should update the 
CDC state to PERMITTED
 +            if (blockWrites)
 +            {
 +                CommitLog.instance.forceRecycleAllSegments();
 +
 +                cdcMgr.awaitManagementTasksCompletion();
 +                // Delete all files in cdc_raw
-                 for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
-                     f.delete();
++                deleteCDCRawFiles();
 +                cdcMgr.updateCDCTotalSize();
 +                // Confirm cdc update process changes flag on active segment
 +                expectCurrentCDCState(CDCState.PERMITTED);
 +            }
 +
 +            // Clear out archived CDC files
-             for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
-                 FileUtils.deleteWithConfirm(f);
-             }
++            deleteCDCRawFiles();
 +        });
 +    }
 +
+     private static File[] getCDCRawFiles()
+     {
 -        return new File(DatabaseDescriptor.getCDCLogLocation()).listFiles();
++        return new File(DatabaseDescriptor.getCDCLogLocation()).tryList();
+     }
+ 
+     private static void deleteCDCRawFiles()
+     {
+         for (File f : getCDCRawFiles())
+         {
 -            f.delete();
++            f.deleteIfExists();
+         }
+     }
++
 +    private interface Testable
 +    {
 +        void run() throws Throwable;
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to