This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new f213ac49 CASSSIDECAR-325: CdcRawDirectorySpaceCleaner always uses
fallback aft… (#237)
f213ac49 is described below
commit f213ac49ef5c5c889c2f10936d3db111100f8cde
Author: Francisco Guerrero <[email protected]>
AuthorDate: Wed Jul 23 13:59:17 2025 -0700
CASSSIDECAR-325: CdcRawDirectorySpaceCleaner always uses fallback aft…
(#237)
Patch by Francisco Guerrero; reviewed by Yifan Cai, Bernardo Botella for
CASSSIDECAR-325
---
CHANGES.txt | 1 +
.../sidecar/db/schema/AbstractSchema.java | 2 +-
.../db/schema/CassandraSystemTableSchema.java | 1 +
.../sidecar/db/SystemViewsDatabaseAccessor.java | 22 ++-
.../cassandra/sidecar/modules/CdcModule.java | 2 +-
.../sidecar/modules/SidecarSchemaModule.java | 6 +
.../sidecar/tasks/CdcRawDirectorySpaceCleaner.java | 34 +++--
.../apache/cassandra/sidecar/utils/FileUtils.java | 2 +-
.../db/SystemViewsDatabaseAccessorIntTest.java | 2 +-
.../tasks/CdcRawDirectorySpaceCleanerTest.java | 165 +++++++++++++++++----
10 files changed, 187 insertions(+), 50 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index cc436d6e..8c35c05f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * CdcRawDirectorySpaceCleaner always uses fallback after the first time it is
queried from system_views.settings (CASSSIDECAR-325)
* Allow restore jobs to restore to the local datacenter only (CASSSIDECAR-269)
* Avoid ending response in authentication handler, doesn't allow chaining of
auth handlers (CASSSIDECAR-270)
* Enhanced Sidecarclient to list Cassandra instance files and to download
them for Live Migration (CASSSIDECAR-224)
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
index 3c8c6cf4..0622c191 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java
@@ -88,7 +88,7 @@ public abstract class AbstractSchema
}
prepareStatements(session);
- logger.debug("{} is initialized!", this.getClass().getSimpleName());
+ logger.info("{} table schema is initialized",
this.getClass().getSimpleName());
return true;
}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
index af9632f5..ffd2e728 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/CassandraSystemTableSchema.java
@@ -40,6 +40,7 @@ public abstract class CassandraSystemTableSchema extends
TableSchema
@NotNull Predicate<AbstractSchema>
shouldCreateSchema)
{
prepareStatements(session);
+ logger.info("{} Cassandra system table schema is initialized",
this.getClass().getSimpleName());
return true;
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
index 60280c26..db79ae24 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
@@ -22,6 +22,9 @@ import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.google.inject.Inject;
@@ -34,11 +37,13 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Database Accessor that queries cassandra to get information maintained
under system_auth keyspace.
+ * Database Accessor that queries cassandra to get information maintained
under system_views keyspace.
*/
@Singleton
public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSchema>
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SystemViewsDatabaseAccessor.class);
+
private static final String YAML_PROP_IN_MB = "cdc_total_space_in_mb";
private static final String YAML_PROP_WITH_UNIT = "cdc_total_space"; //
expects value with units e.g. "5MiB"
@@ -49,11 +54,16 @@ public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSch
super(systemViewsSchema, sessionProvider);
}
+ /**
+ * @return the value of the cdc_total_space setting in bytes
+ * @throws SchemaUnavailableException when the schema is not initialized
+ */
@Nullable
- public Long getCdcTotalSpaceSetting() throws SchemaUnavailableException
+ public Long cdcTotalSpaceBytesSetting() throws SchemaUnavailableException
{
// attempt to parse Cassandra v4.0 'cdc_total_space_in_mb' yaml prop
- Map<String, String> settings = getSettings(YAML_PROP_IN_MB,
YAML_PROP_WITH_UNIT);
+ String[] cdcTotalSpaceSettingNames = { YAML_PROP_IN_MB,
YAML_PROP_WITH_UNIT };
+ Map<String, String> settings = getSettings(cdcTotalSpaceSettingNames);
String cdcTotalSpaceInMb = settings.get(YAML_PROP_IN_MB);
if (cdcTotalSpaceInMb != null)
{
@@ -67,11 +77,15 @@ public class SystemViewsDatabaseAccessor extends
DatabaseAccessor<SystemViewsSch
return FileUtils.storageStringToBytes(storageStringToBytes);
}
+ // This is not expected to ever be logged, but adding the log entry
for completeness
+ // in case debugging is needed for this unexpected case.
+ LOGGER.warn("Unable to determine the CDC total space value from
setting names {}",
+ (Object) cdcTotalSpaceSettingNames);
return null;
}
/**
- * Load a setting values from the `system_views.settings` table.
+ * Load a setting values from the `system_views.settings` virtual table.
*
* @param names names of settings
* @return map of setting values keyed on `name` loaded from the
`system_views.settings` table.
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 322b80bf..77a8e5b7 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -81,7 +81,7 @@ public class CdcModule extends AbstractModule
@ProvidesIntoMap
@KeyClassMapKey(TableSchemaMapKeys.SystemViewsSchemaKey.class)
- TableSchema systemViewssSchema(SystemViewsSchema schema)
+ TableSchema systemViewsSchema(SystemViewsSchema schema)
{
return schema;
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
index 51aa2eb0..498bad72 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarSchemaModule.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.sidecar.modules;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -41,6 +44,8 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTask;
*/
public class SidecarSchemaModule extends AbstractModule
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarSchemaModule.class);
+
@Provides
@Singleton
public SidecarSchema sidecarSchema(Vertx vertx,
@@ -50,6 +55,7 @@ public class SidecarSchemaModule extends AbstractModule
SidecarInternalKeyspace sidecarInternalKeyspace = new
SidecarInternalKeyspace(configuration);
// register table schema when enabled
resolver.resolve().values().forEach(tableSchema -> {
+ LOGGER.info("Registering table schema: {}", tableSchema);
try
{
sidecarInternalKeyspace.registerTableSchema(tableSchema);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
index ec309aac..28234689 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
@@ -33,6 +33,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +131,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
TimeUnit.MILLISECONDS.toNanos(cdcConfiguration.cacheMaxUsage().toMillis());
}
- protected long maxUsage()
+ protected long maxUsageBytes()
{
if (!shouldRefreshCachedMaxUsage())
{
@@ -140,18 +141,17 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
try
{
- Long newValue =
systemViewsDatabaseAccessor.getCdcTotalSpaceSetting();
+ Long newValue =
systemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting();
if (newValue != null)
{
if (!newValue.equals(maxUsageBytes))
{
- LOGGER.info(
- "Change in cdc_total_space from system_views.settings
prev={} latest={}",
- maxUsageBytes, newValue);
+ LOGGER.info("Change in cdc_total_space from
system_views.settings prev={} latest={}",
+ maxUsageBytes, newValue);
this.maxUsageBytes = newValue;
- this.maxUsageLastReadNanos = timeProvider.nanoTime();
- return newValue;
}
+ this.maxUsageLastReadNanos = timeProvider.nanoTime();
+ return this.maxUsageBytes;
}
}
catch (SchemaUnavailableException e)
@@ -160,7 +160,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
}
catch (Throwable t)
{
- LOGGER.warn("Error reading cdc_total_space from
system_views.settings", t);
+ LOGGER.error("Error reading cdc_total_space from
system_views.settings", t);
}
LOGGER.warn("Could not read cdc_total_space from
system_views.settings, falling back to props");
@@ -226,19 +226,21 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
return;
}
- long directorySize = FileUtils.directorySize(cdcRawDirectory);
- long upperLimitBytes =
- (long) (maxUsage() *
cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
+ long directorySizeBytes =
FileUtils.directorySizeBytes(cdcRawDirectory);
+ long maxUsageBytes = maxUsageBytes();
+ long upperLimitBytes = (long) (maxUsageBytes *
cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
// Sort the files by segmentId to delete commit log segments in write
order
// The latest file is the current active segment, but it could be
created before the retention duration, e.g. slow data ingress
Collections.sort(segmentFiles);
long nowInMillis = timeProvider.currentTimeMillis();
// track the age of the oldest commit log segment to give indication
of the time-window buffer available
- cdcMetrics.oldestSegmentAge.metric.setValue(
- (int) MILLISECONDS.toSeconds(nowInMillis -
segmentFiles.get(0).lastModified()));
+ cdcMetrics.oldestSegmentAge.metric.setValue((int)
MILLISECONDS.toSeconds(nowInMillis - segmentFiles.get(0).lastModified()));
+
+ LOGGER.debug("Cdc data cleaner directorySizeBytes={}
maxedUsageBytes={} upperLimitBytes={}",
+ directorySizeBytes, maxUsageBytes, upperLimitBytes);
- if (directorySize > upperLimitBytes)
+ if (directorySizeBytes > upperLimitBytes)
{
if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId)
{
@@ -251,7 +253,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
// we keep the last commit log segment as it may still be actively
written to
int i = 0;
- while (i < segmentFiles.size() - 1 && directorySize >
upperLimitBytes)
+ while (i < segmentFiles.size() - 1 && directorySizeBytes >
upperLimitBytes)
{
CdcRawSegmentFile segment = segmentFiles.get(i);
long ageMillis = nowInMillis - segment.lastModified();
@@ -280,7 +282,7 @@ public class CdcRawDirectorySpaceCleaner implements
PeriodicTask
{
LOGGER.warn("Failed to delete cdc segment", e);
}
- directorySize -= length;
+ directorySizeBytes -= length;
i++;
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
index d8bddb75..403b38f1 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
@@ -48,7 +48,7 @@ public class FileUtils
* @param directory the directory
* @return the size in bytes of all files in a directory, non-recursively.
*/
- public static long directorySize(File directory)
+ public static long directorySizeBytes(File directory)
{
long size = 0;
final File[] files = directory.listFiles();
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
index 3ddb384b..e72b878e 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
@@ -39,7 +39,7 @@ class SystemViewsDatabaseAccessorIntTest extends
IntegrationTestBase
waitForSchemaReady(10, TimeUnit.SECONDS);
SystemViewsDatabaseAccessor accessor =
injector.getInstance(SystemViewsDatabaseAccessor.class);
- Long cdcTotalSpaceSettings = accessor.getCdcTotalSpaceSetting();
+ Long cdcTotalSpaceSettings = accessor.cdcTotalSpaceBytesSetting();
assertThat(cdcTotalSpaceSettings).isNotNull().isEqualTo(cdcSizeLimitInMiB *
1024 * 1024);
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
index 5390ff68..86670fb7 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
@@ -27,8 +27,11 @@ import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -40,7 +43,9 @@ import
org.apache.cassandra.sidecar.common.server.dns.DnsResolvers;
import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.server.CdcMetrics;
import org.apache.cassandra.sidecar.metrics.server.ServerMetrics;
@@ -48,17 +53,18 @@ import org.apache.cassandra.sidecar.utils.CdcUtil;
import org.apache.cassandra.sidecar.utils.TimeProvider;
import org.mockito.stubbing.Answer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
/**
* Unit tests for the {@link CdcRawDirectorySpaceCleaner}
*/
-public class CdcRawDirectorySpaceCleanerTest
+class CdcRawDirectorySpaceCleanerTest
{
private static final MetricRegistry METRIC_REGISTRY = new MetricRegistry();
private static final String TEST_SEGMENT_FILE_NAME_1 =
"CommitLog-2-1250512736956320000.log";
@@ -66,31 +72,38 @@ public class CdcRawDirectorySpaceCleanerTest
private static final String TEST_SEGMENT_FILE_NAME_3 =
"CommitLog-2-1340512736956320000.log";
private static final String TEST_ORPHANED_SEGMENT_FILE_NAME =
"CommitLog-2-1240512736956320000.log";
private static final String TEST_INTACT_SEGMENT_FILE_NAME =
"CommitLog-2-1340512736959990000.log";
+ SidecarMetrics mockSidecarMetrics;
+ CdcMetrics cdcMetrics;
+
+ @BeforeEach
+ void setup()
+ {
+ mockSidecarMetrics = mock(SidecarMetrics.class);
+ ServerMetrics mockServerMetrics = mock(ServerMetrics.class);
+ cdcMetrics = new CdcMetrics(METRIC_REGISTRY);
+ when(mockSidecarMetrics.server()).thenReturn(mockServerMetrics);
+ when(mockServerMetrics.cdc()).thenReturn(cdcMetrics);
+ }
@Test
- public void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws
IOException
+ void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws
IOException
{
TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER;
SystemViewsDatabaseAccessor systemViewsDatabaseAccessor =
mock(SystemViewsDatabaseAccessor.class);
when(systemViewsDatabaseAccessor.getSettings(any()))
.thenAnswer((Answer<Map<String, String>>) invocation ->
Map.of("cdc_total_space", "1MiB"));
-
when(systemViewsDatabaseAccessor.getCdcTotalSpaceSetting()).thenCallRealMethod();
+
when(systemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenCallRealMethod();
CdcConfiguration cdcConfiguration = new CdcConfigurationImpl();
ServiceConfiguration serviceConfiguration =
mock(ServiceConfiguration.class);
when(serviceConfiguration.cdcConfiguration()).thenReturn(cdcConfiguration);
InstancesMetadata instancesMetadata = mockInstanceMetadata(tempDir);
- SidecarMetrics sidecarMetrics = mock(SidecarMetrics.class);
- ServerMetrics serverMetrics = mock(ServerMetrics.class);
- CdcMetrics cdcMetrics = new CdcMetrics(METRIC_REGISTRY);
- when(sidecarMetrics.server()).thenReturn(serverMetrics);
- when(serverMetrics.cdc()).thenReturn(cdcMetrics);
CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
timeProvider,
systemViewsDatabaseAccessor,
serviceConfiguration,
instancesMetadata,
- sidecarMetrics
+ mockSidecarMetrics
);
checkExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME, true, false);
@@ -99,9 +112,9 @@ public class CdcRawDirectorySpaceCleanerTest
checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3);
checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
- assertEquals(0L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
- assertEquals(0L, cdcMetrics.orphanedIdx.metric.getValue());
- assertEquals(0L, cdcMetrics.deletedSegment.metric.getValue());
+ assertThat(cdcMetrics.criticalCdcRawSpace.metric.getValue()).isZero();
+ assertThat(cdcMetrics.orphanedIdx.metric.getValue()).isZero();
+ assertThat(cdcMetrics.deletedSegment.metric.getValue()).isZero();
cleaner.routineCleanUp();
@@ -115,11 +128,11 @@ public class CdcRawDirectorySpaceCleanerTest
checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
// verify metrics match expected
- assertEquals(1L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
- assertEquals(1L, cdcMetrics.orphanedIdx.metric.getValue());
- assertTrue(cdcMetrics.totalCdcSpaceUsed.metric.getValue() > 2097152L);
- assertTrue(cdcMetrics.deletedSegment.metric.getValue() > 2097152L);
- assertEquals(0, cdcMetrics.oldestSegmentAge.metric.getValue());
+ assertThat(cdcMetrics.criticalCdcRawSpace.metric.getValue()).isOne();
+ assertThat(cdcMetrics.orphanedIdx.metric.getValue()).isOne();
+
assertThat(cdcMetrics.totalCdcSpaceUsed.metric.getValue()).isGreaterThan(2097152L);
+
assertThat(cdcMetrics.deletedSegment.metric.getValue()).isGreaterThan(2097152L);
+ assertThat(cdcMetrics.oldestSegmentAge.metric.getValue()).isZero();
// delete all cdc files, in order to test the scenario that we do not
have current cdc file, but have cdc file in the prior round.
// We do not expect all CDC file to be cleaned up in a running system.
But test it for robustness.
@@ -127,6 +140,78 @@ public class CdcRawDirectorySpaceCleanerTest
cleaner.routineCleanUp(); // it should run fine.
}
+ @Test
+ void testMaxUsageBytes()
+ {
+ FakeTimeProvider fakeTimeProvider = new FakeTimeProvider();
+ InstancesMetadata instancesMetadata = mock(InstancesMetadata.class);
+ SystemViewsDatabaseAccessor mockSystemViewsDatabaseAccessor =
mock(SystemViewsDatabaseAccessor.class);
+ // First return 1MiB
+
when(mockSystemViewsDatabaseAccessor.cdcTotalSpaceBytesSetting()).thenReturn(1L
<< 20)
+ //
Next return 1GiB
+
.thenReturn(1L << 30)
+ //
Next throw an exception when accessing Accessor layer
+
.thenThrow(new SchemaUnavailableException("system_views", "settings"))
+ //
Next throw a runtime exception
+
.thenThrow(new RuntimeException("error accessing data"))
+ //
Next return 1GiB
+
.thenReturn(1L << 30);
+
+ ServiceConfigurationImpl serviceConfiguration = new
ServiceConfigurationImpl();
+ CdcConfiguration cdcConfiguration =
serviceConfiguration.cdcConfiguration();
+ assertThat(cdcConfiguration).isNotNull();
+
+ CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
+ fakeTimeProvider,
+ mockSystemViewsDatabaseAccessor,
+ serviceConfiguration,
+ instancesMetadata,
+ mockSidecarMetrics
+ );
+
+ // start with no interactions
+ verifyNoInteractions(mockSystemViewsDatabaseAccessor);
+
+ assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+
+ assertThat(cleaner.maxUsageBytes()).as("Should read from the cached
value").isEqualTo(1_024L * 1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+
+ // Advance the time provider to 1 millisecond before the cache expires
+ fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis() -
1, TimeUnit.MILLISECONDS);
+
+ // Let's assert it again to ensure we are not reading from Accessor
layer
+ assertThat(cleaner.maxUsageBytes()).as("Should read from the cached
value").isEqualTo(1_024L * 1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(1)).cdcTotalSpaceBytesSetting();
+
+ // Now advance the time provider by the configured cache max usage
+ fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(),
TimeUnit.MILLISECONDS);
+
+ // and we should now read 1GiB
+ assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(2)).cdcTotalSpaceBytesSetting();
+
+ // Now advance the time provider by the configured cache max usage
+ fakeTimeProvider.advance(cdcConfiguration.cacheMaxUsage().toMillis(),
TimeUnit.MILLISECONDS);
+
+ // we should fall back when a SchemaUnavailableException is thrown
+
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
+ verify(mockSystemViewsDatabaseAccessor,
times(3)).cdcTotalSpaceBytesSetting();
+
+ // also fall back when another exception is thrown
+
assertThat(cleaner.maxUsageBytes()).isEqualTo(cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes());
+ verify(mockSystemViewsDatabaseAccessor,
times(4)).cdcTotalSpaceBytesSetting();
+
+ // Finally, we recover and are able to read from accessor. Should read
1GiB
+ assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
+
+ // And read cached value
+ assertThat(cleaner.maxUsageBytes()).isEqualTo(1_024L * 1_024L *
1_024L);
+ verify(mockSystemViewsDatabaseAccessor,
times(5)).cdcTotalSpaceBytesSetting();
+ }
+
/* test utils */
private static InstancesMetadata mockInstanceMetadata(Path tempDir) throws
IOException
@@ -157,14 +242,14 @@ public class CdcRawDirectorySpaceCleanerTest
if (!orphaned)
{
final File f1 = new File(cdcDir, filename);
- assertTrue(f1.createNewFile());
+ assertThat(f1.createNewFile()).isTrue();
Files.write(f1.toPath(), RandomUtils.nextBytes(size));
}
if (!intact)
{
final File f2 = new File(cdcDir, CdcUtil.getIdxFileName(filename));
- assertTrue(f2.createNewFile());
+ assertThat(f2.createNewFile()).isTrue();
Files.write(f2.toPath(), (size + (complete ? "\nCOMPLETED" :
"")).getBytes(StandardCharsets.UTF_8));
}
}
@@ -176,13 +261,41 @@ public class CdcRawDirectorySpaceCleanerTest
private void checkExists(Path tempDir, String logFileName, boolean
orphaned, boolean intact)
{
- assertEquals(!orphaned, Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
- assertEquals(!intact, Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME,
CdcUtil.getIdxFileName(logFileName))));
+ assertThat(Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))).isEqualTo(!orphaned);
+ assertThat(Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME,
CdcUtil.getIdxFileName(logFileName)))).isEqualTo(!intact);
}
private void checkNotExists(Path tempDir, String logFileName)
{
- assertFalse(Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
- assertFalse(Files.exists(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME,
CdcUtil.getIdxFileName(logFileName))));
+ assertThat(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)).doesNotExist();
+ assertThat(Paths.get(tempDir.toString(),
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME,
CdcUtil.getIdxFileName(logFileName))).doesNotExist();
+ }
+
+ static class FakeTimeProvider implements TimeProvider
+ {
+ private final AtomicLong nanos = new AtomicLong(0);
+
+ @Override
+ public long currentTimeMillis()
+ {
+ return TimeUnit.NANOSECONDS.toMillis(nanos.get());
+ }
+
+ @Override
+ public long nanoTime()
+ {
+ return nanos.get();
+ }
+
+ /**
+ * Artificially advance time for a given {@code value} in the given
{@link TimeUnit}
+ *
+ * @param value the value to advance
+ * @param unit the {@link TimeUnit} for the given {@code value}
+ */
+ public void advance(long value, TimeUnit unit)
+ {
+ nanos.addAndGet(unit.toNanos(value));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]