This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f213ac49 CASSSIDECAR-325: CdcRawDirectorySpaceCleaner always uses 
fallback aft… (#237)
f213ac49 is described below

commit f213ac49ef5c5c889c2f10936d3db111100f8cde
Author: Francisco Guerrero <[email protected]>
AuthorDate: Wed Jul 23 13:59:17 2025 -0700

    CASSSIDECAR-325: CdcRawDirectorySpaceCleaner always uses fallback aft… 
(#237)
    
    Patch by Francisco Guerrero; reviewed by Yifan Cai, Bernardo Botella for 
CASSSIDECAR-325
---
 CHANGES.txt                                        |   1 +
 .../sidecar/db/schema/AbstractSchema.java          |   2 +-
 .../db/schema/CassandraSystemTableSchema.java      |   1 +
 .../sidecar/db/SystemViewsDatabaseAccessor.java    |  22 ++-
 .../cassandra/sidecar/modules/CdcModule.java       |   2 +-
 .../sidecar/modules/SidecarSchemaModule.java       |   6 +
 .../sidecar/tasks/CdcRawDirectorySpaceCleaner.java |  34 +++--
 .../apache/cassandra/sidecar/utils/FileUtils.java  |   2 +-
 .../db/SystemViewsDatabaseAccessorIntTest.java     |   2 +-
 .../tasks/CdcRawDirectorySpaceCleanerTest.java     | 165 +++++++++++++++++----
 10 files changed, 187 insertions(+), 50 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cc436d6e..8c35c05f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * CdcRawDirectorySpaceCleaner always uses fallback after the first time it is 
queried from system_views.settings (CASSSIDECAR-325)
  * Allow restore jobs to restore to the local datacenter only (CASSSIDECAR-269)
  * Avoid ending response in authentication handler, doesn't allow chaining of 
auth handlers (CASSSIDECAR-270)
  * Enhanced Sidecarclient to list Cassandra instance files and to download 
them for Live Migration (CASSSIDECAR-224)
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
index 3c8c6cf4..0622c191 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
@@ -88,7 +88,7 @@ public abstract class AbstractSchema
         }
 
         prepareStatements(session);
-        logger.debug("{} is initialized!", this.getClass().getSimpleName());
+        logger.info("{} table schema is initialized", 
this.getClass().getSimpleName());
         return true;
     }
 
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
index af9632f5..ffd2e728 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
@@ -40,6 +40,7 @@ public abstract class CassandraSystemTableSchema extends 
TableSchema
                                          @NotNull Predicate<AbstractSchema> 
shouldCreateSchema)
     {
         prepareStatements(session);
+        logger.info("{} Cassandra system table schema is initialized", 
this.getClass().getSimpleName());
         return true;
     }
 
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
index 60280c26..db79ae24 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
@@ -22,6 +22,9 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ResultSet;
 import com.google.inject.Inject;
@@ -34,11 +37,13 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Database Accessor that queries cassandra to get information maintained 
under system_auth keyspace.
+ * Database Accessor that queries cassandra to get information maintained 
under system_views keyspace.
  */
 @Singleton
 public class SystemViewsDatabaseAccessor extends 
DatabaseAccessor<SystemViewsSchema>
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemViewsDatabaseAccessor.class);
+
     private static final String YAML_PROP_IN_MB = "cdc_total_space_in_mb";
     private static final String YAML_PROP_WITH_UNIT = "cdc_total_space"; // 
expects value with units e.g. "5MiB"
 
@@ -49,11 +54,16 @@ public class SystemViewsDatabaseAccessor extends 
DatabaseAccessor<SystemViewsSch
         super(systemViewsSchema, sessionProvider);
     }
 
+    /**
+     * @return the value of the cdc_total_space setting in bytes
+     * @throws SchemaUnavailableException when the schema is not initialized
+     */
     @Nullable
-    public Long getCdcTotalSpaceSetting() throws SchemaUnavailableException
+    public Long cdcTotalSpaceBytesSetting() throws SchemaUnavailableException
     {
         // attempt to parse Cassandra v4.0 'cdc_total_space_in_mb' yaml prop
-        Map<String, String> settings = getSettings(YAML_PROP_IN_MB, 
YAML_PROP_WITH_UNIT);
+        String[] cdcTotalSpaceSettingNames = { YAML_PROP_IN_MB, 
YAML_PROP_WITH_UNIT };
+        Map<String, String> settings = getSettings(cdcTotalSpaceSettingNames);
         String cdcTotalSpaceInMb = settings.get(YAML_PROP_IN_MB);
         if (cdcTotalSpaceInMb != null)
         {
@@ -67,11 +77,15 @@ public class SystemViewsDatabaseAccessor extends 
DatabaseAccessor<SystemViewsSch
             return FileUtils.storageStringToBytes(storageStringToBytes);
         }
 
+        // This is not expected to ever be logged, but adding the log entry 
for completeness
+        // in case debugging is needed for this unexpected case.
+        LOGGER.warn("Unable to determine the CDC total space value from 
setting names {}",
+                    (Object) cdcTotalSpaceSettingNames);
         return null;
     }
 
     /**
-     * Load a setting values from the `system_views.settings` table.
+     * Load a setting values from the `system_views.settings` virtual table.
      *
      * @param names names of settings
      * @return map of setting values keyed on `name` loaded from the 
`system_views.settings` table.
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 322b80bf..77a8e5b7 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -81,7 +81,7 @@ public class CdcModule extends AbstractModule
 
     @ProvidesIntoMap
     @KeyClassMapKey(TableSchemaMapKeys.SystemViewsSchemaKey.class)
-    TableSchema systemViewssSchema(SystemViewsSchema schema)
+    TableSchema systemViewsSchema(SystemViewsSchema schema)
     {
         return schema;
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
index 51aa2eb0..498bad72 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.sidecar.modules;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
@@ -41,6 +44,8 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTask;
  */
 public class SidecarSchemaModule extends AbstractModule
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarSchemaModule.class);
+
     @Provides
     @Singleton
     public SidecarSchema sidecarSchema(Vertx vertx,
@@ -50,6 +55,7 @@ public class SidecarSchemaModule extends AbstractModule
         SidecarInternalKeyspace sidecarInternalKeyspace = new 
SidecarInternalKeyspace(configuration);
         // register table schema when enabled
         resolver.resolve().values().forEach(tableSchema -> {
+            LOGGER.info("Registering table schema: {}", tableSchema);
             try
             {
                 sidecarInternalKeyspace.registerTableSchema(tableSchema);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
index ec309aac..28234689 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +131,7 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
                
TimeUnit.MILLISECONDS.toNanos(cdcConfiguration.cacheMaxUsage().toMillis());
     }
 
-    protected long maxUsage()
+    protected long maxUsageBytes()
     {
         if (!shouldRefreshCachedMaxUsage())
         {
@@ -140,18 +141,17 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
 
         try
         {
-            Long newValue = 
systemViewsDatabaseAccessor.getCdcTotalSpaceSetting();
+            Long newValue = 
systemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting();
             if (newValue != null)
             {
                 if (!newValue.equals(maxUsageBytes))
                 {
-                    LOGGER.info(
-                    "Change in cdc_total_space from system_views.settings 
prev={} latest={}",
-                    maxUsageBytes, newValue);
+                    LOGGER.info("Change in cdc_total_space from 
system_views.settings prev={} latest={}",
+                                maxUsageBytes, newValue);
                     this.maxUsageBytes = newValue;
-                    this.maxUsageLastReadNanos = timeProvider.nanoTime();
-                    return newValue;
                 }
+                this.maxUsageLastReadNanos = timeProvider.nanoTime();
+                return this.maxUsageBytes;
             }
         }
         catch (SchemaUnavailableException e)
@@ -160,7 +160,7 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
         }
         catch (Throwable t)
         {
-            LOGGER.warn("Error reading cdc_total_space from 
system_views.settings", t);
+            LOGGER.error("Error reading cdc_total_space from 
system_views.settings", t);
         }
 
         LOGGER.warn("Could not read cdc_total_space from 
system_views.settings, falling back to props");
@@ -226,19 +226,21 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
             return;
         }
 
-        long directorySize = FileUtils.directorySize(cdcRawDirectory);
-        long upperLimitBytes =
-        (long) (maxUsage() * 
cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
+        long directorySizeBytes = 
FileUtils.directorySizeBytes(cdcRawDirectory);
+        long maxUsageBytes = maxUsageBytes();
+        long upperLimitBytes = (long) (maxUsageBytes * 
cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
         // Sort the files by segmentId to delete commit log segments in write 
order
         // The latest file is the current active segment, but it could be 
created before the retention duration, e.g. slow data ingress
         Collections.sort(segmentFiles);
         long nowInMillis = timeProvider.currentTimeMillis();
 
         // track the age of the oldest commit log segment to give indication 
of the time-window buffer available
-        cdcMetrics.oldestSegmentAge.metric.setValue(
-        (int) MILLISECONDS.toSeconds(nowInMillis - 
segmentFiles.get(0).lastModified()));
+        cdcMetrics.oldestSegmentAge.metric.setValue((int) 
MILLISECONDS.toSeconds(nowInMillis - segmentFiles.get(0).lastModified()));
+
+        LOGGER.debug("Cdc data cleaner directorySizeBytes={} 
maxedUsageBytes={} upperLimitBytes={}",
+                     directorySizeBytes, maxUsageBytes, upperLimitBytes);
 
-        if (directorySize > upperLimitBytes)
+        if (directorySizeBytes > upperLimitBytes)
         {
             if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId)
             {
@@ -251,7 +253,7 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
 
             // we keep the last commit log segment as it may still be actively 
written to
             int i = 0;
-            while (i < segmentFiles.size() - 1 && directorySize > 
upperLimitBytes)
+            while (i < segmentFiles.size() - 1 && directorySizeBytes > 
upperLimitBytes)
             {
                 CdcRawSegmentFile segment = segmentFiles.get(i);
                 long ageMillis = nowInMillis - segment.lastModified();
@@ -280,7 +282,7 @@ public class CdcRawDirectorySpaceCleaner implements 
PeriodicTask
                 {
                     LOGGER.warn("Failed to delete cdc segment", e);
                 }
-                directorySize -= length;
+                directorySizeBytes -= length;
                 i++;
             }
         }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
index d8bddb75..403b38f1 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
@@ -48,7 +48,7 @@ public class FileUtils
      * @param directory the directory
      * @return the size in bytes of all files in a directory, non-recursively.
      */
-    public static long directorySize(File directory)
+    public static long directorySizeBytes(File directory)
     {
         long size = 0;
         final File[] files = directory.listFiles();
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
index 3ddb384b..e72b878e 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
@@ -39,7 +39,7 @@ class SystemViewsDatabaseAccessorIntTest extends 
IntegrationTestBase
         waitForSchemaReady(10, TimeUnit.SECONDS);
 
         SystemViewsDatabaseAccessor accessor = 
injector.getInstance(SystemViewsDatabaseAccessor.class);
-        Long cdcTotalSpaceSettings = accessor.getCdcTotalSpaceSetting();
+        Long cdcTotalSpaceSettings = accessor.cdcTotalSpaceBytesSetting();
         
assertThat(cdcTotalSpaceSettings).isNotNull().isEqualTo(cdcSizeLimitInMiB * 
1024 * 1024);
     }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
index 5390ff68..86670fb7 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
@@ -27,8 +27,11 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -40,7 +43,9 @@ import 
org.apache.cassandra.sidecar.common.server.dns.DnsResolvers;
 import org.apache.cassandra.sidecar.config.CdcConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.server.CdcMetrics;
 import org.apache.cassandra.sidecar.metrics.server.ServerMetrics;
@@ -48,17 +53,18 @@ import org.apache.cassandra.sidecar.utils.CdcUtil;
 import org.apache.cassandra.sidecar.utils.TimeProvider;
 import org.mockito.stubbing.Answer;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 /**
  * Unit tests for the {@link CdcRawDirectorySpaceCleaner}
  */
-public class CdcRawDirectorySpaceCleanerTest
+class CdcRawDirectorySpaceCleanerTest
 {
     private static final MetricRegistry METRIC_REGISTRY = new MetricRegistry();
     private static final String TEST_SEGMENT_FILE_NAME_1 = 
"CommitLog-2-1250512736956320000.log";
@@ -66,31 +72,38 @@ public class CdcRawDirectorySpaceCleanerTest
     private static final String TEST_SEGMENT_FILE_NAME_3 = 
"CommitLog-2-1340512736956320000.log";
     private static final String TEST_ORPHANED_SEGMENT_FILE_NAME = 
"CommitLog-2-1240512736956320000.log";
     private static final String TEST_INTACT_SEGMENT_FILE_NAME = 
"CommitLog-2-1340512736959990000.log";
+    SidecarMetrics mockSidecarMetrics;
+    CdcMetrics cdcMetrics;
+
+    @BeforeEach
+    void setup()
+    {
+        mockSidecarMetrics = mock(SidecarMetrics.class);
+        ServerMetrics mockServerMetrics = mock(ServerMetrics.class);
+        cdcMetrics = new CdcMetrics(METRIC_REGISTRY);
+        when(mockSidecarMetrics.server()).thenReturn(mockServerMetrics);
+        when(mockServerMetrics.cdc()).thenReturn(cdcMetrics);
+    }
 
     @Test
-    public void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws 
IOException
+    void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws 
IOException
     {
         TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER;
         SystemViewsDatabaseAccessor systemViewsDatabaseAccessor = 
mock(SystemViewsDatabaseAccessor.class);
         when(systemViewsDatabaseAccessor.getSettings(any()))
         .thenAnswer((Answer<Map<String, String>>) invocation -> 
Map.of("cdc_total_space", "1MiB"));
-        
when(systemViewsDatabaseAccessor.getCdcTotalSpaceSetting()).thenCallRealMethod();
+        
when(systemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenCallRealMethod();
         CdcConfiguration cdcConfiguration = new CdcConfigurationImpl();
         ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
         
when(serviceConfiguration.cdcConfiguration()).thenReturn(cdcConfiguration);
 
         InstancesMetadata instancesMetadata = mockInstanceMetadata(tempDir);
-        SidecarMetrics sidecarMetrics = mock(SidecarMetrics.class);
-        ServerMetrics serverMetrics = mock(ServerMetrics.class);
-        CdcMetrics cdcMetrics = new CdcMetrics(METRIC_REGISTRY);
-        when(sidecarMetrics.server()).thenReturn(serverMetrics);
-        when(serverMetrics.cdc()).thenReturn(cdcMetrics);
         CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
         timeProvider,
         systemViewsDatabaseAccessor,
         serviceConfiguration,
         instancesMetadata,
-        sidecarMetrics
+        mockSidecarMetrics
         );
 
         checkExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME, true, false);
@@ -99,9 +112,9 @@ public class CdcRawDirectorySpaceCleanerTest
         checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3);
         checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
 
-        assertEquals(0L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
-        assertEquals(0L, cdcMetrics.orphanedIdx.metric.getValue());
-        assertEquals(0L, cdcMetrics.deletedSegment.metric.getValue());
+        assertThat(cdcMetrics.criticalCdcRawSpace.metric.getValue()).isZero();
+        assertThat(cdcMetrics.orphanedIdx.metric.getValue()).isZero();
+        assertThat(cdcMetrics.deletedSegment.metric.getValue()).isZero();
 
         cleaner.routineCleanUp();
 
@@ -115,11 +128,11 @@ public class CdcRawDirectorySpaceCleanerTest
         checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
 
         // verify metrics match expected
-        assertEquals(1L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
-        assertEquals(1L, cdcMetrics.orphanedIdx.metric.getValue());
-        assertTrue(cdcMetrics.totalCdcSpaceUsed.metric.getValue() > 2097152L);
-        assertTrue(cdcMetrics.deletedSegment.metric.getValue() > 2097152L);
-        assertEquals(0, cdcMetrics.oldestSegmentAge.metric.getValue());
+        assertThat(cdcMetrics.criticalCdcRawSpace.metric.getValue()).isOne();
+        assertThat(cdcMetrics.orphanedIdx.metric.getValue()).isOne();
+        
assertThat(cdcMetrics.totalCdcSpaceUsed.metric.getValue()).isGreaterThan(2097152L);
+        
assertThat(cdcMetrics.deletedSegment.metric.getValue()).isGreaterThan(2097152L);
+        assertThat(cdcMetrics.oldestSegmentAge.metric.getValue()).isZero();
 
         // delete all cdc files, in order to test the scenario that we do not 
have current cdc file, but have cdc file in the prior round.
         // We do not expect all CDC file to be cleaned up in a running system. 
But test it for robustness.
@@ -127,6 +140,78 @@ public class CdcRawDirectorySpaceCleanerTest
         cleaner.routineCleanUp(); // it should run fine.
     }
 
+    @Test
+    void testMaxUsageBytes()
+    {
+        FakeTimeProvider fakeTimeProvider = new FakeTimeProvider();
+        InstancesMetadata instancesMetadata = mock(InstancesMetadata.class);
+        SystemViewsDatabaseAccessor mockSystemViewsDatabaseAccessor = 
mock(SystemViewsDatabaseAccessor.class);
+        // First return 1MiB
+        
when(mockSystemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenReturn(1L 
<< 20)
+                                                                         // 
Next return 1GiB
+                                                                         
.thenReturn(1L << 30)
+                                                                         // 
Next throw an exception when accessing Accessor layer
+                                                                         
.thenThrow(new SchemaUnavailableException("system_views", "settings"))
+                                                                         // 
Next throw a runtime exception
+                                                                         
.thenThrow(new RuntimeException("error accessing data"))
+                                                                         // 
Next return 1GiB
+                                                                         
.thenReturn(1L << 30);
+
+        ServiceConfigurationImpl serviceConfiguration = new 
ServiceConfigurationImpl();
+        CdcConfiguration cdcConfiguration = 
serviceConfiguration.cdcConfiguration();
+        assertThat(cdcConfiguration).isNotNull();
+
+        CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
+        fakeTimeProvider,
+        mockSystemViewsDatabaseAccessor,
+        serviceConfiguration,
+        instancesMetadata,
+        mockSidecarMetrics
+        );
+
+        // start with no interactions
+        verifyNoInteractions(mockSystemViewsDatabaseAccessor);
+
+        assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(1)).cdcTotalSpaceBytesSetting();
+
+        assertThat(cleaner.maxUsageBytes()).as("Should read from the cached 
value").isEqualTo(1_024L * 1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(1)).cdcTotalSpaceBytesSetting();
+
+        // Advance the time provider to 1 millisecond before the cache expires
+        fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis() - 
1, TimeUnit.MILLISECONDS);
+
+        // Let's assert it again to ensure we are not reading from Accessor 
layer
+        assertThat(cleaner.maxUsageBytes()).as("Should read from the cached 
value").isEqualTo(1_024L * 1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(1)).cdcTotalSpaceBytesSetting();
+
+        // Now advance the time provider by the configured cache max usage
+        fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(), 
TimeUnit.MILLISECONDS);
+
+        // and we should now read 1GiB
+        assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L * 
1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(2)).cdcTotalSpaceBytesSetting();
+
+        // Now advance the time provider by the configured cache max usage
+        fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(), 
TimeUnit.MILLISECONDS);
+
+        // we should fall back when a SchemaUnavailableException is thrown
+        
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
+        verify(mockSystemViewsDatabaseAccessor, 
times(3)).cdcTotalSpaceBytesSetting();
+
+        // also fall back when another exception is thrown
+        
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
+        verify(mockSystemViewsDatabaseAccessor, 
times(4)).cdcTotalSpaceBytesSetting();
+
+        // Finally, we recover and are able to read from accessor. Should read 
1GiB
+        assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L * 
1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(5)).cdcTotalSpaceBytesSetting();
+
+        // And read cached value
+        assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L * 
1_024L);
+        verify(mockSystemViewsDatabaseAccessor, 
times(5)).cdcTotalSpaceBytesSetting();
+    }
+
     /* test utils */
 
     private static InstancesMetadata mockInstanceMetadata(Path tempDir) throws 
IOException
@@ -157,14 +242,14 @@ public class CdcRawDirectorySpaceCleanerTest
         if (!orphaned)
         {
             final File f1 = new File(cdcDir, filename);
-            assertTrue(f1.createNewFile());
+            assertThat(f1.createNewFile()).isTrue();
             Files.write(f1.toPath(), RandomUtils.nextBytes(size));
         }
 
         if (!intact)
         {
             final File f2 = new File(cdcDir, CdcUtil.getIdxFileName(filename));
-            assertTrue(f2.createNewFile());
+            assertThat(f2.createNewFile()).isTrue();
             Files.write(f2.toPath(), (size + (complete ? "\nCOMPLETED" : 
"")).getBytes(StandardCharsets.UTF_8));
         }
     }
@@ -176,13 +261,41 @@ public class CdcRawDirectorySpaceCleanerTest
 
     private void checkExists(Path tempDir, String logFileName, boolean 
orphaned, boolean intact)
     {
-        assertEquals(!orphaned, Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
-        assertEquals(!intact, Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName))));
+        assertThat(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))).isEqualTo(!orphaned);
+        assertThat(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName)))).isEqualTo(!intact);
     }
 
     private void checkNotExists(Path tempDir, String logFileName)
     {
-        assertFalse(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
-        assertFalse(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName))));
+        assertThat(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)).doesNotExist();
+        assertThat(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName))).doesNotExist();
+    }
+
+    static class FakeTimeProvider implements TimeProvider
+    {
+        private final AtomicLong nanos = new AtomicLong(0);
+
+        @Override
+        public long currentTimeMillis()
+        {
+            return TimeUnit.NANOSECONDS.toMillis(nanos.get());
+        }
+
+        @Override
+        public long nanoTime()
+        {
+            return nanos.get();
+        }
+
+        /**
+         * Artificially advance time for a given {@code value} in the given 
{@link TimeUnit}
+         *
+         * @param value the value to advance
+         * @param unit  the {@link TimeUnit} for the given {@code value}
+         */
+        public void advance(long value, TimeUnit unit)
+        {
+            nanos.addAndGet(unit.toNanos(value));
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to