This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6aa69069 CASSSIDECAR-229: SidecarInstanceCodec is failing to find
codec for type (#209)
6aa69069 is described below
commit 6aa69069f44b970af79759f2eec134cb251845be
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Mar 17 12:18:16 2025 -0700
CASSSIDECAR-229: SidecarInstanceCodec is failing to find codec for type
(#209)
Patch by Francisco Guerrero; reviewed by Yifan Cai, Bernardo Botella for
CASSSIDECAR-229
---
CHANGES.txt | 1 +
.../InnerDcTokenAdjacentPeerTestProvider.java | 78 ------
.../testing/SharedClusterIntegrationTestBase.java | 175 ++++++------
.../SidecarPeerDownDetectorIntegrationTest.java | 304 ++++++++++++++-------
.../CassandraSchemaRouteIntegrationTest.java | 10 +-
.../routes/CassandraStatsIntegrationTest.java | 36 +--
.../sidecar/routes/CdcConfigIntegrationTest.java | 12 +-
.../sidecar/routes/RoutesIntegrationTest.java | 16 +-
.../sidecar/codecs/SidecarInstanceCodec.java | 17 +-
.../InnerDcTokenAdjacentPeerProvider.java | 37 ++-
.../coordination/SidecarPeerHealthMonitorTask.java | 34 +--
.../coordination/SidecarPeerHealthProvider.java | 15 +-
.../cassandra/sidecar/modules/CdcModule.java | 10 +-
.../apache/cassandra/sidecar/server/Server.java | 19 +-
.../sidecar/codecs/SidecarInstanceCodecTest.java | 84 ++++++
.../InnerDcTokenAdjacentPeerProviderTests.java | 108 ++++----
16 files changed, 528 insertions(+), 428 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e07b173b..90497fa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * SidecarInstanceCodec is failing to find codec for type (CASSSIDECAR-229)
* Retry Failed Schema Reports (CASSSIDECAR-217)
* Guice modularization (CASSSIDECAR-208)
* Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203)
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
deleted file mode 100644
index 933e6872..00000000
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.sidecar.testing;
-
-import java.util.List;
-import java.util.function.Supplier;
-
-import com.datastax.driver.core.Host;
-import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
-import
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
-import org.apache.cassandra.sidecar.server.Server;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-
-/**
- * Text helper to find out server ports on integration tests.
- */
-public class InnerDcTokenAdjacentPeerTestProvider extends
InnerDcTokenAdjacentPeerProvider
-{
- private final Supplier<List<TestSidecarHostInfo>> sidecarServerSupplier;
-
- public InnerDcTokenAdjacentPeerTestProvider(InstanceMetadataFetcher
metadataFetcher,
-
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
- ServiceConfiguration
serviceConfiguration,
- DnsResolver dnsResolver,
-
Supplier<List<TestSidecarHostInfo>> sidecarServerSupplier)
- {
- super(metadataFetcher, cassandraClientTokenRingProvider,
serviceConfiguration, dnsResolver);
- this.sidecarServerSupplier = sidecarServerSupplier;
- }
-
- @Override
- protected int sidecarServicePort(Host host)
- {
- return sidecarServerSupplier.get().stream()
- .filter(s ->
s.instance.broadcastAddress().getHostName()
-
.equals(host.getBroadcastAddress().getHostName()))
- .findAny()
- .orElseThrow()
- .port;
- }
-
- /**
- * Class encapsulating different bits of information needed on integration
tests.
- */
- public static class TestSidecarHostInfo
- {
- public final IInstance instance;
- public final Server sidecarServer;
- public final int port;
-
- public TestSidecarHostInfo(IInstance instance, Server sidecarServer,
int port)
- {
- this.instance = instance;
- this.sidecarServer = sidecarServer;
- this.port = port;
- }
- }
-}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 05000a58..7ea374ce 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -26,8 +26,6 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -35,7 +33,6 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -59,7 +56,6 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
-import com.google.inject.name.Named;
import com.google.inject.util.Modules;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
@@ -79,7 +75,6 @@ import
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.common.server.JmxClient;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
-import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
@@ -88,28 +83,20 @@ import
org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
-import
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
-import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
-import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
-import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
import org.apache.cassandra.sidecar.server.SidecarServerEvents;
import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
import org.apache.cassandra.testing.IClusterExtension;
import org.apache.cassandra.testing.IsolatedDTestClassLoaderWrapper;
@@ -168,14 +155,10 @@ public abstract class SharedClusterIntegrationTestBase
protected DnsResolver dnsResolver = new LocalhostResolver();
protected IClusterExtension<? extends IInstance> cluster;
- protected Server server;
+ protected ServerWrapper serverWrapper;
protected TestVersion testVersion;
protected MtlsTestHelper mtlsTestHelper;
- private final CountDownLatch sidecarSchemaReadyLatch = new
CountDownLatch(1);
private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
- private Injector sidecarServerInjector;
- protected HashMap<Server, String> serverDeploymentIds;
- protected HashMap<Server, SidecarPeerHealthMonitorTask> peerHealthMonitors;
static
{
@@ -194,9 +177,6 @@ public abstract class SharedClusterIntegrationTestBase
classLoaderWrapper = new IsolatedDTestClassLoaderWrapper();
classLoaderWrapper.initializeDTestJarClassLoader(testVersion,
TestVersion.class);
- serverDeploymentIds = new HashMap<>();
- peerHealthMonitors = new HashMap<>();
-
beforeClusterProvisioning();
cluster = provisionClusterWithRetries(this.testVersion);
assertThat(cluster).isNotNull();
@@ -341,6 +321,7 @@ public abstract class SharedClusterIntegrationTestBase
/**
* Override to provide additional options to configure sidecar
+ *
* @return function to update {@link SidecarConfigurationImpl.Builder}
*/
protected Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides()
@@ -356,17 +337,17 @@ public abstract class SharedClusterIntegrationTestBase
*/
protected void startSidecar(ICluster<? extends IInstance> cluster) throws
InterruptedException
{
- server = startSidecarWithInstances(cluster);
+ serverWrapper = startSidecarWithInstances(cluster);
}
/**
* Starts Sidecar configured to run with the provided {@link IInstance}s
from the cluster.
*
* @param instances the Cassandra instances Sidecar will manage
- * @return the started server
+ * @return a wrapper with the started server
* @throws InterruptedException when the server start operation is
interrupted
*/
- protected Server startSidecarWithInstances(Iterable<? extends IInstance>
instances) throws InterruptedException
+ protected ServerWrapper startSidecarWithInstances(Iterable<? extends
IInstance> instances) throws InterruptedException
{
return startSidecarWithInstances(instances, null);
}
@@ -374,12 +355,12 @@ public abstract class SharedClusterIntegrationTestBase
/**
* Starts Sidecar configured to run with the provided {@link IInstance}s
from the cluster.
*
- * @param instances the Cassandra instances Sidecar will manage
+ * @param instances the Cassandra instances Sidecar will manage
* @param customModule an optional custom module that overrides during
injection
* @return the started server
* @throws InterruptedException when the server start operation is
interrupted
*/
- protected Server startSidecarWithInstances(Iterable<? extends IInstance>
instances, AbstractModule customModule) throws InterruptedException
+ protected ServerWrapper startSidecarWithInstances(Iterable<? extends
IInstance> instances, AbstractModule customModule) throws InterruptedException
{
VertxTestContext context = new VertxTestContext();
AbstractModule testModule = new IntegrationTestModule(instances,
classLoaderWrapper, mtlsTestHelper,
@@ -389,33 +370,28 @@ public abstract class SharedClusterIntegrationTestBase
{
module = Modules.override(testModule).with(customModule);
}
- sidecarServerInjector =
Guice.createInjector(Modules.override(SidecarModules.all()).with(module));
- Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
- vertx.eventBus()
-
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg
-> {
- sidecarSchemaReadyLatch.countDown();
- });
- Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
- SidecarPeerHealthMonitorTask peerHealthMonitorTask =
sidecarServerInjector.getInstance(SidecarPeerHealthMonitorTask.class);
+ Injector injector =
Guice.createInjector(Modules.override(SidecarModules.all()).with(module));
+ Server sidecarServer = injector.getInstance(Server.class);
sidecarServer.start()
- .onSuccess(deploymentId -> {
- serverDeploymentIds.put(sidecarServer, deploymentId);
- peerHealthMonitors.put(sidecarServer,
peerHealthMonitorTask);
- context.completeNow();
- })
+ .onSuccess(s -> context.completeNow())
.onFailure(context::failNow);
assertThat(context.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
- return sidecarServer;
+ return new ServerWrapper(injector, sidecarServer);
}
protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
{
- assertThat(sidecarServerInjector)
+ waitForSchemaReady(serverWrapper, timeout, timeUnit);
+ }
+
+ protected void waitForSchemaReady(ServerWrapper serverWrapper, long
timeout, TimeUnit timeUnit)
+ {
+ assertThat(serverWrapper)
.describedAs("Sidecar should be started")
.isNotNull();
-
assertThat(Uninterruptibles.awaitUninterruptibly(sidecarSchemaReadyLatch,
timeout, timeUnit))
+
assertThat(Uninterruptibles.awaitUninterruptibly(serverWrapper.sidecarSchemaReadyLatch,
timeout, timeUnit))
.describedAs("Sidecar schema is not initialized after " + timeout + '
' + timeUnit)
.isTrue();
}
@@ -427,7 +403,7 @@ public abstract class SharedClusterIntegrationTestBase
*/
protected void stopSidecar() throws InterruptedException
{
- closeServer(server);
+ closeServer(serverWrapper.server);
}
protected void closeServer(Server s) throws InterruptedException
@@ -437,7 +413,7 @@ public abstract class SharedClusterIntegrationTestBase
return;
}
CountDownLatch closeLatch = new CountDownLatch(1);
- server.close().onSuccess(res -> closeLatch.countDown());
+ s.close().onSuccess(res -> closeLatch.countDown());
if (closeLatch.await(60, TimeUnit.SECONDS))
{
logger.info("Close event received before timeout.");
@@ -562,6 +538,30 @@ public abstract class SharedClusterIntegrationTestBase
return builder.build();
}
+ /**
+ * Wraps the Sidecar server and keeps a reference to the injector to be
able to dynamically retrieve
+ * objects for testing purposes
+ */
+ public static class ServerWrapper
+ {
+ public final Injector injector;
+ public final Server server;
+ public volatile int serverPort;
+ private final CountDownLatch sidecarSchemaReadyLatch = new
CountDownLatch(1);
+
+ public ServerWrapper(Injector sidecarServerInjector, Server server)
+ {
+ this.injector = sidecarServerInjector;
+ this.server = server;
+ // Server must have started to retrieve the port
+ this.serverPort = server.actualPort();
+
+ Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
+
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
+ msg ->
sidecarSchemaReadyLatch.countDown());
+ }
+ }
+
/**
* Test module that configures the instances based on the cluster instances
*/
@@ -630,42 +630,35 @@ public abstract class SharedClusterIntegrationTestBase
@Provides
@Singleton
- @Named("sidecarInstanceSupplier")
- public
Supplier<List<InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo>>
supplier()
+ public SidecarConfiguration sidecarConfiguration()
{
- return ArrayList::new;
+ return defaultConfigurationBuilder(mtlsTestHelper,
configurationOverrides).build();
}
@Provides
@Singleton
- public SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration()
+ public DnsResolver dnsResolver()
{
- return new SidecarPeerHealthConfigurationImpl(false,
- new
MillisecondBoundConfiguration(1, TimeUnit.SECONDS),
- 1,
- new
MillisecondBoundConfiguration(500, TimeUnit.MILLISECONDS));
+ return dnsResolver;
}
@Provides
@Singleton
- public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher
metadataFetcher,
-
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
- SidecarConfiguration
configuration,
- DnsResolver dnsResolver,
-
@Named("sidecarInstanceSupplier")
- Supplier<List
-
<InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo>> supplier)
+ public ClusterLease clusterLease()
{
- return new InnerDcTokenAdjacentPeerTestProvider(metadataFetcher,
-
cassandraClientTokenRingProvider,
-
configuration.serviceConfiguration(),
- dnsResolver,
- supplier);
+ return new ClusterLease(ClusterLease.Ownership.CLAIMED);
}
- @Provides
- @Singleton
- public SidecarConfiguration
sidecarConfiguration(SidecarPeerHealthConfiguration
sidecarPeerHealthConfiguration)
+ private List<InetSocketAddress> buildContactPoints()
+ {
+ return StreamSupport.stream(instances.spliterator(), false)
+ .map(instance -> new
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
+
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
+ .collect(Collectors.toList());
+ }
+
+ public static SidecarConfigurationImpl.Builder
defaultConfigurationBuilder(MtlsTestHelper mtlsTestHelper,
+
Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides)
{
ServiceConfiguration conf = ServiceConfigurationImpl.builder()
.host("0.0.0.0") // binds to all interfaces, potential security issue if left
running for long
@@ -708,46 +701,18 @@ public abstract class SharedClusterIntegrationTestBase
5242880, DEFAULT_API_CALL_TIMEOUT,
buildTestS3ProxyConfig());
- SidecarClientConfiguration sidecarClientConfiguration = new
SidecarClientConfigurationImpl(sslConfiguration);
-
-
-
SidecarConfigurationImpl.Builder builder =
SidecarConfigurationImpl.builder()
.serviceConfiguration(conf)
.s3ClientConfiguration(s3ClientConfig)
-
.sslConfiguration(sslConfiguration)
-
.sidecarClientConfiguration(sidecarClientConfiguration)
-
.sidecarPeerHealthConfiguration(sidecarPeerHealthConfiguration);
+
.sslConfiguration(sslConfiguration);
if (configurationOverrides != null)
{
builder = configurationOverrides.apply(builder);
}
- return builder.build();
+ return builder;
}
- @Provides
- @Singleton
- public DnsResolver dnsResolver()
- {
- return dnsResolver;
- }
-
- @Provides
- @Singleton
- public ClusterLease clusterLease()
- {
- return new ClusterLease(ClusterLease.Ownership.CLAIMED);
- }
-
- private List<InetSocketAddress> buildContactPoints()
- {
- return StreamSupport.stream(instances.spliterator(), false)
- .map(instance -> new
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
-
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
- .collect(Collectors.toList());
- }
-
- private S3ProxyConfiguration buildTestS3ProxyConfig()
+ private static S3ProxyConfiguration buildTestS3ProxyConfig()
{
return new S3ProxyConfiguration()
{
@@ -789,6 +754,20 @@ public abstract class SharedClusterIntegrationTestBase
}
}
+ public static String cassandraInstanceHostname(IInstance
cassandraInstance, DnsResolver dnsResolver)
+ {
+ IInstanceConfig config = cassandraInstance.config();
+ String ipAddress = JMXUtil.getJmxHost(config);
+ try
+ {
+ return dnsResolver.reverseResolve(ipAddress);
+ }
+ catch (UnknownHostException e)
+ {
+ return ipAddress;
+ }
+ }
+
static InstanceMetadata buildInstanceMetadata(Vertx vertx,
IInstance
cassandraInstance,
CassandraVersionProvider
versionProvider,
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
index d69683e0..1d7addc3 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
@@ -19,11 +19,11 @@
package org.apache.cassandra.sidecar.health;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -32,70 +32,98 @@ import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
-import com.google.inject.name.Named;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
+import
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
+import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
+import
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider;
+import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
import org.apache.cassandra.sidecar.server.Server;
-import
org.apache.cassandra.sidecar.testing.InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo;
import org.apache.cassandra.sidecar.testing.QualifiedName;
import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
+import static
org.apache.cassandra.sidecar.testing.MtlsTestHelper.EMPTY_PASSWORD_STRING;
+import static
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.cassandraInstanceHostname;
+import static
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.defaultConfigurationBuilder;
import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+/**
+ * Integration test for the Sidecar peer monitoring feature
+ */
class SidecarPeerDownDetectorIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
{
-
private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarPeerDownDetectorIntegrationTest.class);
+ private static final SchemaKeyspaceConfiguration SCHEMA_KEYSPACE_CONFIG =
SchemaKeyspaceConfigurationImpl.builder()
+
.isEnabled(true)
+
.build();
- List<TestSidecarHostInfo> sidecarServerList = new ArrayList<>();
+ // Key is the sidecar IP address and value is the server wrapper
+ private final Map<String, ServerWrapper> sidecarServerMap = new
HashMap<>();
+ private final DriverUtils driverUtils = new DriverUtils();
@Override
protected ClusterBuilderConfiguration testClusterConfiguration()
{
- return new ClusterBuilderConfiguration().nodesPerDc(3);
+ return super.testClusterConfiguration().nodesPerDc(3);
}
@Override
protected void startSidecar(ICluster<? extends IInstance> cluster) throws
InterruptedException
{
- Supplier<List<TestSidecarHostInfo>> supplier = () -> sidecarServerList;
- PeersModule peersModule = new PeersModule(supplier);
for (IInstance cassandraInstance : cluster)
{
// Storing all the created sidecar instances into a list for
further reference.
LOGGER.info("Starting Sidecar instance for Cassandra instance {}",
cassandraInstance.config().num());
- Server server =
startSidecarWithInstances(List.of(cassandraInstance), peersModule);
- sidecarServerList.add(new TestSidecarHostInfo(cassandraInstance,
server, server.actualPort()));
- }
-
- assertThat(sidecarServerList.size()).as("Each Cassandra Instance will
be managed by a single Sidecar instance")
- .isEqualTo(cluster.size());
+ String cassandraInstanceHostname =
cassandraInstanceHostname(cassandraInstance, dnsResolver);
+ PeersTestModule peersModule = new
PeersTestModule(cassandraInstanceHostname);
+ ServerWrapper serverWrapper =
startSidecarWithInstances(List.of(cassandraInstance), peersModule);
+ sidecarServerMap.put(cassandraInstanceHostname, serverWrapper);
+ if (this.serverWrapper == null)
+ {
+ // assign the server to the first instance
+ this.serverWrapper = serverWrapper;
+ }
+ }
- // assign the server to the first instance
- server = sidecarServerList.get(0).sidecarServer;
+ assertThat(sidecarServerMap.size()).as("Each Cassandra Instance will
be managed by a single Sidecar instance")
+ .isEqualTo(cluster.size());
}
@Override
protected void stopSidecar()
{
- sidecarServerList.forEach(s -> {
+ sidecarServerMap.values().forEach(serverWrapper -> {
try
{
- closeServer(s.sidecarServer);
+ closeServer(serverWrapper.server);
}
catch (Exception e)
{
@@ -104,118 +132,184 @@ class SidecarPeerDownDetectorIntegrationTest extends
SharedClusterSidecarIntegra
});
}
- class PeersModule extends AbstractModule
+ @Test
+ void testOnePeerDown()
{
- Supplier<List<TestSidecarHostInfo>> supplier;
+ SidecarPeerHealthMonitorTask monitor =
serverWrapper.injector.getInstance(SidecarPeerHealthMonitorTask.class);
+ assertThat(monitor.status()).as("Monitor hasn't had time to perform
checks").isEmpty();
- public PeersModule(Supplier<List<TestSidecarHostInfo>> supplier)
- {
- this.supplier = supplier;
- }
+ loopAssert(30, () -> assertThat(checkHostUp(monitor, "localhost2"))
+ .as("After some time, peer is up")
+ .isTrue());
- @Provides
- @Singleton
- @Named("sidecarInstanceSupplier")
- public Supplier<List<TestSidecarHostInfo>> supplier()
- {
- return supplier;
- }
+ stopSidecarInstanceForTest("localhost2");
+ loopAssert(30, () -> assertThat(checkHostDown(monitor, "localhost2"))
+ .as("After killing peer sidecar instance, monitor
caches up and the host is down")
+ .isTrue());
+ startSidecarInstanceForTest("localhost2");
+ loopAssert(30, () -> assertThat(checkHostUp(monitor, "localhost2"))
+ .as("After restarting peer sidecar instance,
monitor caches up and the host is down")
+ .isTrue());
+ }
- @Provides
- @Singleton
- public SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration()
- {
- return new SidecarPeerHealthConfigurationImpl(true,
- new
MillisecondBoundConfiguration(1, TimeUnit.SECONDS),
- 1,
- new
MillisecondBoundConfiguration(500, TimeUnit.MILLISECONDS));
- }
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ QualifiedName qualifiedName = new QualifiedName("cdc", "test");
+ createTestKeyspace(qualifiedName, DC1_RF3);
+ createTestTable(qualifiedName, "CREATE TABLE %s (id text PRIMARY KEY,
name text) WITH cdc=true");
}
- void stopSidecarInstanceForTest(int instanceId)
+ private boolean checkHostUp(SidecarPeerHealthMonitorTask monitor, String
hostname)
{
- assertThat(sidecarServerList).isNotEmpty();
- Server server = sidecarServerList.get(instanceId).sidecarServer;
- String deploymentId = serverDeploymentIds.get(server);
- getBlocking(server.stop(deploymentId), 30, TimeUnit.SECONDS, "Stopping
server " + deploymentId);
+ return checkHostStatus(monitor, hostname,
SidecarPeerHealthProvider.Health.UP);
}
- void startSidecarInstanceForTest(int instanceId) throws Exception
+ private boolean checkHostDown(SidecarPeerHealthMonitorTask monitor, String
hostname)
{
- assertThat(sidecarServerList).isNotEmpty();
- TestSidecarHostInfo server = sidecarServerList.get(instanceId);
- getBlocking(server.sidecarServer.start(), 30, TimeUnit.SECONDS,
"Starting server...");
+ return checkHostStatus(monitor, hostname,
SidecarPeerHealthProvider.Health.DOWN);
}
- @Override
- protected Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides()
+ private boolean checkHostStatus(SidecarPeerHealthMonitorTask monitor,
String hostname, SidecarPeerHealthProvider.Health status)
{
- return builder -> {
- ServiceConfiguration conf;
- if (sidecarServerList.isEmpty())
- {
- // As opposed to the base class, this binds the host to a
specific interface (localhost)
- conf = ServiceConfigurationImpl.builder()
- .host("localhost")
- .port(0) // let the test find
an available port
- .build();
- }
- else
+ for (Map.Entry<SidecarInstance, SidecarPeerHealthProvider.Health>
entry : monitor.status().entrySet())
+ {
+ if (hostname.equals(entry.getKey().hostname()))
{
- // Use the same port number for all Sidecar instances that we
bring up. We use the port
- // bound for the first instance, but we bind it to a different
interface (localhost2, localhost3)
- conf = ServiceConfigurationImpl.builder()
- .host("localhost" +
(sidecarServerList.size() + 1))
-
.port(sidecarServerList.get(0).sidecarServer.actualPort())
- .build();
+ return status.equals(entry.getValue());
}
- builder.serviceConfiguration(conf);
-
- return builder;
- };
+ }
+ fail("Status unavailable for host " + hostname);
+ // this code block is not reachable but required for compilation
+ // we could alternatively just throw an AssertionError above
+ return false;
}
- @Test
- void onePeerDownTest() throws Exception
+ void stopSidecarInstanceForTest(String hostname)
{
- SidecarPeerHealthMonitorTask monitor =
peerHealthMonitors.get(sidecarServerList.get(0).sidecarServer);
- // Monitor hasn't had time to perform checks
- assertTrue(monitor.getStatus().isEmpty());
-
- // After some time, peer is up
- Thread.sleep(5000);
- checkHostUp(monitor, "localhost2");
-
- stopSidecarInstanceForTest(1);
- Thread.sleep(5000);
- // After killing peer sidecar instance, monitor caches up and the host
is down
- checkHostDown(monitor, "localhost2");
- startSidecarInstanceForTest(1);
- Thread.sleep(5000);
- // After restarting peer sidecar instance, monitor caches up and the
host is down
- checkHostUp(monitor, "localhost2");
+ assertThat(sidecarServerMap).containsKey(hostname);
+ Server server = sidecarServerMap.get(hostname).server;
+ String deploymentId = server.deploymentId();
+ LOGGER.info("Stopping Sidecar server {} with deployment ID {}",
hostname, deploymentId);
+ getBlocking(server.stop(deploymentId), 30, TimeUnit.SECONDS,
+ "Stopping Sidecar server " + hostname + " with deployment
ID " + deploymentId);
}
- private boolean checkHostUp(SidecarPeerHealthMonitorTask monitor, String
hostname)
+ void startSidecarInstanceForTest(String hostname)
{
- return checkHostStatus(monitor, hostname,
SidecarPeerHealthProvider.Health.UP);
+ ServerWrapper serverWrapper = sidecarServerMap.get(hostname);
+ assertThat(serverWrapper).isNotNull();
+ Server server = serverWrapper.server;
+ LOGGER.info("Starting Sidecar server {}...", hostname);
+ getBlocking(server.start(), 30, TimeUnit.SECONDS, "Starting Sidecar
server " + hostname + "...");
+ // Update the server port after starting a new time
+ // because a new port will be assigned
+ serverWrapper.serverPort = server.actualPort();
}
- private boolean checkHostDown(SidecarPeerHealthMonitorTask monitor, String
hostname)
+ class PeersTestModule extends AbstractModule
{
- return checkHostStatus(monitor, hostname,
SidecarPeerHealthProvider.Health.DOWN);
- }
+ private final String cassandraInstanceHostname;
- private boolean checkHostStatus(SidecarPeerHealthMonitorTask monitor,
String hostname, SidecarPeerHealthProvider.Health status)
- {
- return monitor.getStatus().entrySet().stream().filter(e ->
e.getKey().hostname().equals(hostname)).findAny().orElseThrow().getValue().equals(status);
+ public PeersTestModule(String cassandraInstanceHostname)
+ {
+ this.cassandraInstanceHostname = cassandraInstanceHostname;
+ }
+
+ @Provides
+ @Singleton
+ public SidecarConfiguration sidecarConfiguration()
+ {
+ Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides = builder -> {
+
+ // Override the service configuration such that we listen on
the host associated with the
+ // Cassandra instance associated with this Sidecar instance
+ ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+
.host(cassandraInstanceHostname)
+ .port(0)
// let the test find an available port
+
.schemaKeyspaceConfiguration(SCHEMA_KEYSPACE_CONFIG)
+ .build();
+
+ // We need to provide mTLS configuration for the Sidecar
client so it can talk to
+ // other sidecars using mTLS
+ SslConfiguration clientSslConfiguration = null;
+ if (mtlsTestHelper.isEnabled())
+ {
+ LOGGER.info("Enabling test mTLS certificate/keystore.");
+
+ KeyStoreConfiguration truststoreConfiguration =
+ new
KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
+
mtlsTestHelper.trustStorePassword(),
+
mtlsTestHelper.trustStoreType(),
+
SecondBoundConfiguration.parse("60s"));
+
+ KeyStoreConfiguration keyStoreConfiguration =
+ new
KeyStoreConfigurationImpl(mtlsTestHelper.clientKeyStorePath(),
+ EMPTY_PASSWORD_STRING,
+
mtlsTestHelper.serverKeyStoreType(), // server and client keystore types are
the same
+
SecondBoundConfiguration.parse("60s"));
+
+ clientSslConfiguration = SslConfigurationImpl.builder()
+ .enabled(true)
+
.keystore(keyStoreConfiguration)
+
.truststore(truststoreConfiguration)
+ .build();
+ }
+ else
+ {
+ LOGGER.info("Not enabling mTLS for testing purposes. Set
'{}' to 'true' if you would " +
+ "like mTLS enabled.",
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+ }
+
+ SidecarClientConfiguration sidecarClientConfiguration = new
SidecarClientConfigurationImpl(clientSslConfiguration);
+
+ // Let's run this very frequently for testing purposes
+ SidecarPeerHealthConfigurationImpl
sidecarPeerHealthConfiguration
+ = new SidecarPeerHealthConfigurationImpl(true,
+
MillisecondBoundConfiguration.parse("100ms"),
+ 1,
+
MillisecondBoundConfiguration.parse("50ms"));
+ builder.serviceConfiguration(conf)
+ .sidecarClientConfiguration(sidecarClientConfiguration)
+
.sidecarPeerHealthConfiguration(sidecarPeerHealthConfiguration);
+ return builder;
+ };
+
+ return defaultConfigurationBuilder(mtlsTestHelper,
configurationOverrides).build();
+ }
+
+ @Provides
+ @Singleton
+ public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher
metadataFetcher,
+
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
+ SidecarConfiguration
configuration,
+ DnsResolver dnsResolver)
+ {
+ return new InnerDcTokenAdjacentPeerTestProvider(metadataFetcher,
+
cassandraClientTokenRingProvider,
+
configuration.serviceConfiguration(),
+ dnsResolver);
+ }
}
- @Override
- protected void initializeSchemaForTest()
+ /**
+ * Test helper to find out server ports on integration tests.
+ */
+ class InnerDcTokenAdjacentPeerTestProvider extends
InnerDcTokenAdjacentPeerProvider
{
- createTestKeyspace("cdc", DC1_RF3);
- createTestTable(new QualifiedName("cdc", "test"), "CREATE TABLE %s (id
text PRIMARY KEY, name text) WITH cdc=true");
+ InnerDcTokenAdjacentPeerTestProvider(InstanceMetadataFetcher
metadataFetcher,
+ CassandraClientTokenRingProvider
cassandraClientTokenRingProvider,
+ ServiceConfiguration
serviceConfiguration,
+ DnsResolver dnsResolver)
+ {
+ super(metadataFetcher, cassandraClientTokenRingProvider,
serviceConfiguration, dnsResolver, driverUtils);
+ }
+
+ @Override
+ protected int sidecarServicePort(String sidecarHostname)
+ {
+ return sidecarServerMap.get(sidecarHostname).serverPort;
+ }
}
}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
index 201ab2c7..56cb507a 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
@@ -44,7 +44,7 @@ class CassandraSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegratio
{
String testRoute = "/api/v1/schema/keyspaces";
SchemaResponse response = getBlocking(trustedClient()
- .get(server.actualPort(),
"localhost", testRoute)
+ .get(serverWrapper.serverPort,
"localhost", testRoute)
.send()
.expecting(HttpResponseExpectation.SC_OK))
.bodyAsJson(SchemaResponse.class);
@@ -58,7 +58,7 @@ class CassandraSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegratio
{
String testRoute = "/api/v1/schema/keyspaces/non_existent";
getBlocking(trustedClient()
- .get(server.actualPort(), "localhost", testRoute)
+ .get(serverWrapper.serverPort, "localhost", testRoute)
.send()
.expecting(HttpResponseExpectation.SC_NOT_FOUND));
}
@@ -68,7 +68,7 @@ class CassandraSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegratio
{
String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
SchemaResponse response = getBlocking(trustedClient()
- .get(server.actualPort(),
"localhost", testRoute)
+ .get(serverWrapper.serverPort,
"localhost", testRoute)
.send()
.expecting(HttpResponseExpectation.SC_OK))
.bodyAsJson(SchemaResponse.class);
@@ -82,7 +82,7 @@ class CassandraSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegratio
{
String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
SchemaResponse response = getBlocking(trustedClient()
- .get(server.actualPort(),
"localhost", testRoute)
+ .get(serverWrapper.serverPort,
"localhost", testRoute)
.send()
.expecting(HttpResponseExpectation.SC_OK))
.bodyAsJson(SchemaResponse.class);
@@ -96,7 +96,7 @@ class CassandraSchemaRouteIntegrationTest extends
SharedClusterSidecarIntegratio
{
String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
SchemaResponse response = getBlocking(trustedClient()
- .get(server.actualPort(),
"localhost", testRoute)
+ .get(serverWrapper.serverPort,
"localhost", testRoute)
.send()
.expecting(HttpResponseExpectation.SC_OK))
.bodyAsJson(SchemaResponse.class);
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
index 3d09bc4a..6bde4cb4 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
@@ -28,8 +28,8 @@ import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpResponseExpectation;
import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.TableStatsResponse;
import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
@@ -70,9 +70,9 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
{
Map<String, Boolean> expectedParams = Map.of("summary", true);
String testRoute = "/api/v1/cassandra/stats/connected-clients";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
- .send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertClientStatsResponse(response, expectedParams);
}
@@ -81,9 +81,9 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
{
Map<String, Boolean> expectedParams = Map.of("summary", false);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
- .send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertClientStatsResponse(response, expectedParams);
}
@@ -96,9 +96,9 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
Map<String, Boolean> expectedParams = Map.of("summary", false);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
-
.send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertClientStatsResponse(response, expectedParams, 4, true);
}
}
@@ -111,9 +111,9 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
{
Map<String, Boolean> expectedParams = Map.of("summary", false);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=false";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
-
.send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertClientStatsResponse(response, expectedParams, 4);
}
}
@@ -127,9 +127,9 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
{
Map<String, Boolean> expectedParams = Map.of("summary", true);
String testRoute =
"/api/v1/cassandra/stats/connected-clients?summary=123&bad-arg=xyz";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
- .send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertClientStatsResponse(response, expectedParams);
}
@@ -167,7 +167,7 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
String testRoute =
String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/" + tableName.table() +
"-snapshot",
tableName.keyspace(),
tableName.table());
HttpResponse<Buffer> resp;
- resp = getBlocking(trustedClient().put(server.actualPort(),
"localhost", testRoute)
+ resp = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", testRoute)
.send());
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
}
@@ -176,7 +176,7 @@ class CassandraStatsIntegrationTest extends
SharedClusterSidecarIntegrationTestB
{
String testRoute = "/api/v1/cassandra/keyspaces/" +
tableName.keyspace() + "/tables/" + tableName.table() + "/stats";
HttpResponse<Buffer> resp;
- resp = getBlocking(trustedClient().get(server.actualPort(),
"localhost", testRoute)
+ resp = getBlocking(trustedClient().get(serverWrapper.serverPort,
"localhost", testRoute)
.send());
assertTableStatsResponse(tableName, resp);
}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
index a2de83f3..ab5d6826 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
@@ -69,7 +69,7 @@ class CdcConfigIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
// Create new configs
UpdateCdcServiceConfigPayload payload = new
UpdateCdcServiceConfigPayload(Map.of("k1", "v1"));
- UpdateCdcServiceConfigPayload newConfigResponse =
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+ UpdateCdcServiceConfigPayload newConfigResponse =
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost",
configRoute)
.as(BodyCodec.json(UpdateCdcServiceConfigPayload.class))
.sendJson(JsonObject.mapFrom(payload))
.expecting(HttpResponseExpectation.SC_OK))
@@ -78,7 +78,7 @@ class CdcConfigIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
// update configs
UpdateCdcServiceConfigPayload updatedPayload = new
UpdateCdcServiceConfigPayload(Map.of("k3", "v3"));
- UpdateCdcServiceConfigPayload updatedConfigResponse =
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+ UpdateCdcServiceConfigPayload updatedConfigResponse =
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost",
configRoute)
.as(BodyCodec.json(UpdateCdcServiceConfigPayload.class))
.sendJson(JsonObject.mapFrom(updatedPayload)))
.body();
@@ -86,7 +86,7 @@ class CdcConfigIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
// GetConfigs should give updated configs
String getConfigsRoute = ApiEndpointsV1.SERVICES_CONFIG_ROUTE;
- AllServicesConfigPayload getServicesResponse =
getBlocking(trustedClient().get(server.actualPort(), "localhost",
getConfigsRoute)
+ AllServicesConfigPayload getServicesResponse =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
getConfigsRoute)
.as(BodyCodec.json(AllServicesConfigPayload.class))
.sendJson(JsonObject.mapFrom(updatedPayload)))
.body();
@@ -97,11 +97,11 @@ class CdcConfigIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
assertThat(getServicesResponse).isEqualTo(expectedConfigPayload);
// delete all CDC configs
- assertThat(getBlocking(trustedClient().delete(server.actualPort(),
"localhost", configRoute)
+
assertThat(getBlocking(trustedClient().delete(serverWrapper.serverPort,
"localhost", configRoute)
.send()).bodyAsJsonObject().getString("status")).isEqualTo("OK");
// Get configs should have no configs
- AllServicesConfigPayload response =
getBlocking(trustedClient().get(server.actualPort(), "localhost",
getConfigsRoute)
+ AllServicesConfigPayload response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
getConfigsRoute)
.as(BodyCodec.json(AllServicesConfigPayload.class))
.sendJson(JsonObject.mapFrom(updatedPayload)))
.body();
@@ -117,7 +117,7 @@ class CdcConfigIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
// Update with Invalid service
Map<String, String> configs = Map.of("k1", "v1");
UpdateCdcServiceConfigPayload payload = new
UpdateCdcServiceConfigPayload(configs);
- HttpResponse<Buffer> response =
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost",
configRoute)
.sendJson(JsonObject.mapFrom(payload)));
assertThat(HttpResponseStatus.BAD_REQUEST.code()).isEqualTo(response.statusCode());
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
index 0667f034..fdd9b82b 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
@@ -22,8 +22,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpResponseExpectation;
import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
@@ -47,7 +47,7 @@ class RoutesIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
@Test
void healthHappyPathTest()
{
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost",
"/api/v1/__health")
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
"/api/v1/__health")
.send());
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
}
@@ -56,9 +56,9 @@ class RoutesIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
void retrieveGossipInfo()
{
String testRoute = "/api/v1/cassandra/gossip";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
- .send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
GossipInfoResponse gossipResponse =
response.bodyAsJson(GossipInfoResponse.class);
assertThat(gossipResponse).isNotNull()
.hasSize(1);
@@ -87,9 +87,9 @@ class RoutesIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
private HealthResponse getGossipHealth()
{
String testRoute = "/api/v1/cassandra/gossip/__health";
- HttpResponse<Buffer> response =
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-
.expect(ResponsePredicate.SC_OK)
- .send());
+ HttpResponse<Buffer> response =
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost",
testRoute)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
assertThat(response.statusCode()).isEqualTo(OK.code());
return response.bodyAsJson(HealthResponse.class);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
index 022cebd4..1b9b7780 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
@@ -18,26 +18,23 @@
package org.apache.cassandra.sidecar.codecs;
-import com.google.inject.Singleton;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
-import io.vertx.core.eventbus.impl.codecs.StringMessageCodec;
+import io.vertx.core.eventbus.impl.CodecManager;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
/**
* Codecs for Sidecar instances
+ * @param <T> a type implementing {@link SidecarInstance}
*/
-@Singleton
-public class SidecarInstanceCodec implements MessageCodec<SidecarInstance,
SidecarInstance>
+public class SidecarInstanceCodec<T extends SidecarInstance> implements
MessageCodec<T, SidecarInstance>
{
- public static final StringMessageCodec STRING = new StringMessageCodec();
-
@Override
- public void encodeToWire(Buffer buf, SidecarInstance instance)
+ public void encodeToWire(Buffer buf, T instance)
{
buf.appendInt(instance.port());
- STRING.encodeToWire(buf, instance.hostname());
+ CodecManager.STRING_MESSAGE_CODEC.encodeToWire(buf,
instance.hostname());
}
@Override
@@ -45,11 +42,11 @@ public class SidecarInstanceCodec implements
MessageCodec<SidecarInstance, Sidec
{
int port = buf.getInt(pos);
pos += 4; // advance 4 bytes after reading int
- return new SidecarInstanceImpl(STRING.decodeFromWire(pos, buf), port);
+ return new
SidecarInstanceImpl(CodecManager.STRING_MESSAGE_CODEC.decodeFromWire(pos, buf),
port);
}
@Override
- public SidecarInstance transform(SidecarInstance instance)
+ public SidecarInstance transform(T instance)
{
return new SidecarInstanceImpl(instance.hostname(), instance.port());
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
index eef20bfd..5941da03 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.coordination;
import java.math.BigInteger;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
@@ -47,9 +48,11 @@ import
org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.common.server.cluster.locator.Token;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.common.utils.Preconditions;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.VisibleForTesting;
import static
org.apache.cassandra.sidecar.config.yaml.CassandraInputValidationConfigurationImpl.DEFAULT_FORBIDDEN_KEYSPACES;
@@ -66,19 +69,23 @@ public class InnerDcTokenAdjacentPeerProvider implements
SidecarPeerProvider
private final CassandraClientTokenRingProvider
cassandraClientTokenRingProvider;
private final ServiceConfiguration serviceConfiguration;
private final DnsResolver dnsResolver;
+ private final DriverUtils driverUtils;
@Inject
public InnerDcTokenAdjacentPeerProvider(InstanceMetadataFetcher
instanceFetcher,
CassandraClientTokenRingProvider
cassandraClientTokenRingProvider,
ServiceConfiguration
serviceConfiguration,
- DnsResolver dnsResolver)
+ DnsResolver dnsResolver,
+ DriverUtils driverUtils)
{
this.instanceFetcher = instanceFetcher;
this.cassandraClientTokenRingProvider =
cassandraClientTokenRingProvider;
this.serviceConfiguration = serviceConfiguration;
this.dnsResolver = dnsResolver;
+ this.driverUtils = driverUtils;
}
+ @Override
public Set<SidecarInstance> get()
{
Metadata metadata;
@@ -129,24 +136,26 @@ public class InnerDcTokenAdjacentPeerProvider implements
SidecarPeerProvider
.collect(Collectors.toList());
BigInteger localMinToken = minToken(localHosts);
- return adjacentHosts(localHosts::contains, localMinToken,
sortedLocalDcHosts, quorum)
+ return adjacentHosts(driverUtils, localHosts::contains, localMinToken,
sortedLocalDcHosts, quorum)
.stream()
- .map(host -> Pair.of(host, host.getAddress().getHostAddress()))
- .map(pair -> {
+ .map(host ->
driverUtils.getSocketAddress(host).getAddress().getHostAddress())
+ .map(sidecarIpAddress -> {
+ String sidecarHostname = sidecarIpAddress;
try
{
- return Pair.of(pair.getKey(),
dnsResolver.reverseResolve(pair.getValue()));
+ sidecarHostname =
dnsResolver.reverseResolve(sidecarIpAddress);
}
- catch (UnknownHostException e)
+ catch (UnknownHostException unknownHostException)
{
- return pair;
+ LOGGER.warn("Unable to reverse resolve hostname for
{}", sidecarHostname, unknownHostException);
}
+ return new SidecarInstanceImpl(sidecarHostname,
sidecarServicePort(sidecarHostname));
})
- .map(pair -> new SidecarInstanceImpl(pair.getValue(),
sidecarServicePort(pair.getKey())))
.collect(Collectors.toSet());
}
- protected int sidecarServicePort(Host host)
+ @VisibleForTesting
+ protected int sidecarServicePort(String sidecarHostname)
{
return serviceConfiguration.port();
}
@@ -200,7 +209,8 @@ public class InnerDcTokenAdjacentPeerProvider implements
SidecarPeerProvider
* @param quorum minimum availability required to meet maximum
replication factor in DC
* @return set of hosts that are adjacent to current Sidecar
*/
- protected static Set<Host> adjacentHosts(Predicate<Host> isLocal,
+ protected static Set<Host> adjacentHosts(DriverUtils driverUtils,
+ Predicate<Host> isLocal,
BigInteger localMinToken,
List<Pair<Host, BigInteger>>
sortedLocalDcHosts,
int quorum)
@@ -243,12 +253,13 @@ public class InnerDcTokenAdjacentPeerProvider implements
SidecarPeerProvider
{
if (isLocal.test(host))
{
+ InetAddress address =
driverUtils.getSocketAddress(host).getAddress();
LOGGER.warn("Local instance selected as adjacent host
localMinToken={} hostId={} address={} hostname={} canonicalHostname={}",
localMinToken,
host.getHostId(),
- host.getAddress().getHostAddress(),
- host.getAddress().getHostName(),
- host.getAddress().getCanonicalHostName());
+ address.getHostAddress(),
+ address.getHostName(),
+ address.getCanonicalHostName());
throw new IllegalArgumentException(String.format("Local
instance selected as adjacent host: %s", host.getHostId()));
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
index 4ad8b431..c5541fe7 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
@@ -36,6 +36,7 @@ import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodec;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
@@ -61,20 +62,22 @@ public class SidecarPeerHealthMonitorTask implements
PeriodicTask
private final SidecarPeerHealthConfiguration config;
private final SidecarPeerProvider sidecarPeerProvider;
private final SidecarPeerHealthProvider healthProvider;
- private final SidecarInstanceCodec sidecarInstanceCodec;
private final Map<SidecarInstance, SidecarPeerHealthProvider.Health>
status = new ConcurrentHashMap<>();
@Inject
public SidecarPeerHealthMonitorTask(SidecarConfiguration
sidecarConfiguration,
SidecarPeerProvider
sidecarPeerProvider,
- SidecarPeerHealthProvider
healthProvider,
- SidecarInstanceCodec
sidecarInstanceCodec)
+ SidecarPeerHealthProvider
healthProvider)
{
this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
this.sidecarPeerProvider = sidecarPeerProvider;
this.healthProvider = healthProvider;
- this.sidecarInstanceCodec = sidecarInstanceCodec;
+ }
+
+ public Map<SidecarInstance, SidecarPeerHealthProvider.Health> status()
+ {
+ return status;
}
@Override
@@ -82,7 +85,7 @@ public class SidecarPeerHealthMonitorTask implements
PeriodicTask
{
this.eventBus = vertx.eventBus();
// TODO: Find a better place to register this codec
- eventBus.registerDefaultCodec(SidecarInstance.class,
sidecarInstanceCodec);
+ eventBus.registerDefaultCodec(SidecarInstanceImpl.class, new
SidecarInstanceCodec<>());
EventBusUtils.onceLocalConsumer(eventBus,
ON_CASSANDRA_CQL_READY.address(), ignored -> executor.schedule(this));
}
@@ -127,17 +130,11 @@ public class SidecarPeerHealthMonitorTask implements
PeriodicTask
sidecarPeers.stream()
.map(instance ->
healthProvider.health(instance)
- .andThen(ar -> {
- if (ar.succeeded())
- {
- updateHealth(instance,
ar.result());
- }
- else
- {
- LOGGER.error("Failed to run
health check, marking instance as DOWN host={} port={}",
-
instance.hostname(), instance.port(), ar.cause());
- markDown(instance);
- }
+ .onSuccess(healthCheckResult ->
updateHealth(instance, healthCheckResult))
+ .onFailure(throwable -> {
+ LOGGER.error("Failed to run health
check, marking instance as DOWN host={} port={}",
+ instance.hostname(),
instance.port(), throwable);
+ markDown(instance);
}))
.collect(Collectors.toList());
@@ -190,9 +187,4 @@ public class SidecarPeerHealthMonitorTask implements
PeriodicTask
{
return status.put(instance, newStatus) != newStatus;
}
-
- public Map<SidecarInstance, SidecarPeerHealthProvider.Health> getStatus()
- {
- return status;
- }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
index 8b7d5b63..5658e020 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
@@ -28,14 +28,19 @@ import
org.apache.cassandra.sidecar.common.client.SidecarInstance;
public interface SidecarPeerHealthProvider
{
/**
- * Possible Health states:
- *
- * - UP: Peer sidecar is alive
- * - DOWN: Peer sidecar is not reachable
+ * Possible Health States
*/
enum Health
{
- UP, DOWN
+ /**
+ * Peer Sidecar is alive
+ */
+ UP,
+
+ /**
+ * Peer Sidecar is not unreachable
+ */
+ DOWN
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 3cb0393a..450151cb 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -24,11 +24,9 @@ import com.google.inject.Singleton;
import com.google.inject.multibindings.ProvidesIntoMap;
import org.apache.cassandra.sidecar.cdc.CdcLogCache;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
-import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
import
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
@@ -48,7 +46,6 @@ import
org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
import org.apache.cassandra.sidecar.routes.RouteBuilder;
import org.apache.cassandra.sidecar.routes.VertxRoute;
import org.apache.cassandra.sidecar.tasks.PeriodicTask;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
/**
@@ -129,11 +126,8 @@ public class CdcModule extends AbstractModule
@Provides
@Singleton
- public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher
metadataFetcher,
-
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
- SidecarConfiguration
configuration,
- DnsResolver dnsResolver)
+ public SidecarPeerProvider
sidecarPeerProvider(InnerDcTokenAdjacentPeerProvider
innerDcTokenAdjacentPeerProvider)
{
- return new InnerDcTokenAdjacentPeerProvider(metadataFetcher,
cassandraClientTokenRingProvider, configuration.serviceConfiguration(),
dnsResolver);
+ return innerDcTokenAdjacentPeerProvider;
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
index c4ab5091..8a0bc437 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
@@ -46,7 +46,6 @@ import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SSLOptions;
import io.vertx.core.net.TrafficShapingOptions;
import io.vertx.ext.web.Router;
-import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.utils.Preconditions;
@@ -168,11 +167,7 @@ public class Server
.runBlocking(() -> {
try
{
- CassandraAdapterDelegate delegate =
instance.delegate();
- if (delegate != null)
- {
- delegate.close();
- }
+ instance.delegate().close();
}
catch (Exception e)
{
@@ -250,6 +245,18 @@ public class Server
throw new IllegalStateException("No deployed server verticles. Maybe
server failed to deploy due to port conflict");
}
+ /**
+ * @return the deployment ID for this server
+ * @throws IllegalStateException if the server has not been deployed
+ */
+ @VisibleForTesting
+ public String deploymentId()
+ {
+ if (!deployedServerVerticles.isEmpty())
+ return deployedServerVerticles.get(0).deploymentID();
+ throw new IllegalStateException("No deployed server verticles");
+ }
+
protected Future<String> notifyServerStart(String deploymentId)
{
LOGGER.info("Successfully started Cassandra Sidecar");
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
new file mode 100644
index 00000000..36b792a4
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.codecs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for the {@link SidecarInstanceCodec}
+ */
+class SidecarInstanceCodecTest
+{
+ static Vertx vertx;
+ private static EventBus eventBus;
+ private static SidecarInstanceCodec<SidecarInstanceImpl> codec;
+
+ @BeforeAll
+ static void setup()
+ {
+ vertx = Vertx.vertx();
+ eventBus = vertx.eventBus();
+ codec = new SidecarInstanceCodec<>();
+ eventBus.registerDefaultCodec(SidecarInstanceImpl.class, codec);
+ }
+
+ @AfterAll
+ static void tearDown()
+ {
+ getBlocking(TestResourceReaper.create().with(vertx).close(), 30,
TimeUnit.SECONDS, "Closing vertx");
+ }
+
+ @Test
+ void testCodec() throws InterruptedException
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ SidecarInstanceImpl sidecarInstance = new
SidecarInstanceImpl("localhost", 1234);
+ eventBus.consumer("test address", message -> {
+ assertThat(message.body()).isEqualTo(sidecarInstance);
+ latch.countDown();
+ });
+ eventBus.send("test address", sidecarInstance);
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+ }
+
+ @Test
+ void testEncodingDecoding()
+ {
+ Buffer buffer = Buffer.buffer(1024);
+ SidecarInstanceImpl sidecarInstance = new
SidecarInstanceImpl("127.0.0.1", 9876);
+ codec.encodeToWire(buffer, sidecarInstance);
+ SidecarInstance decoded = codec.decodeFromWire(0, buffer);
+ assertThat(decoded).isEqualTo(sidecarInstance);
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
b/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
index 273c97cc..61e4c9ae 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.coordination;
import java.math.BigInteger;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -32,10 +33,10 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
-
import org.junit.jupiter.api.Test;
import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.EndPoint;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
@@ -45,6 +46,7 @@ import
org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -61,6 +63,7 @@ import static org.mockito.Mockito.when;
*/
public class InnerDcTokenAdjacentPeerProviderTests
{
+ private static final DriverUtils DRIVER_UTILS = new DriverUtils();
private static final List<BigInteger> TOKENS = Stream.of(
"-9223372036854775808",
"-8070450532247928832",
@@ -111,7 +114,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
for (int i = 0; i < numHosts - 1; i++)
{
int hostId = i + 1;
- final List<InstanceMetadata> localInstances = List.of(
+ List<InstanceMetadata> localInstances = List.of(
mockInstanceMetadata(1, "dc1-host" + hostId + "-i1", metadata),
mockInstanceMetadata(2, "dc1-host" + hostId + "-i2", metadata),
mockInstanceMetadata(3, "dc1-host" + hostId + "-i3", metadata),
@@ -120,9 +123,10 @@ public class InnerDcTokenAdjacentPeerProviderTests
when(metadataFetcher.allLocalInstances()).thenReturn(localInstances);
CassandraClientTokenRingProvider cachedLocalTokenRanges =
mock(CassandraClientTokenRingProvider.class);
- Set<Host> localHosts = allHosts.stream().filter(host ->
host.getAddress()
-
.getHostName()
-
.startsWith("dc1-host" + hostId + "-"))
+ Set<Host> localHosts = allHosts.stream().filter(host ->
DRIVER_UTILS.getSocketAddress(host)
+
.getAddress()
+
.getHostName()
+
.startsWith("dc1-host" + hostId + "-"))
.collect(Collectors.toSet());
when(cachedLocalTokenRanges.localInstances()).thenReturn(localHosts);
Map<Integer, Set<TokenRange>> localRanges = Map.of(
@@ -134,24 +138,25 @@ public class InnerDcTokenAdjacentPeerProviderTests
when(cachedLocalTokenRanges.localTokenRanges(anyString())).thenReturn(localRanges);
when(cachedLocalTokenRanges.allInstances()).thenReturn(allHosts);
- final InnerDcTokenAdjacentPeerProvider provider = new
InnerDcTokenAdjacentPeerProvider(metadataFetcher,
-
cachedLocalTokenRanges,
-
serviceConfiguration,
-
new DnsResolver()
-
{
-
@Override
-
public String resolve(String hostname)
-
{
-
return hostname;
-
}
+ InnerDcTokenAdjacentPeerProvider provider = new
InnerDcTokenAdjacentPeerProvider(metadataFetcher,
+
cachedLocalTokenRanges,
+
serviceConfiguration,
+
new DnsResolver()
+
{
+
@Override
+
public String resolve(String hostname)
+
{
+
return hostname;
+
}
-
@Override
-
public String reverseResolve(String address)
-
{
-
return address;
-
}
-
});
- final Set<SidecarInstance> buddies = provider.get();
+
@Override
+
public String reverseResolve(String address)
+
{
+
return address;
+
}
+
},
+
DRIVER_UTILS);
+ Set<SidecarInstance> buddies = provider.get();
assertEquals(1, buddies.size());
assertTrue(buddies.stream().findFirst().orElseThrow().hostname().startsWith("dc1-host"
+ (hostId + 1)));
}
@@ -160,23 +165,26 @@ public class InnerDcTokenAdjacentPeerProviderTests
@Test
public void testAdjacentHosts()
{
- final List<Pair<Host, BigInteger>> allHosts = IntStream.range(0,
TOKENS.size())
- .mapToObj(idx
-> {
- BigInteger
token = tokenAt(idx);
- return
Pair.of(mockHost(INSTANCES.get(idx), token), token);
- })
-
.collect(Collectors.toList());
+ List<Pair<Host, BigInteger>> allHosts = IntStream.range(0,
TOKENS.size())
+ .mapToObj(idx -> {
+ BigInteger token
= tokenAt(idx);
+ return
Pair.of(mockHost(INSTANCES.get(idx), token), token);
+ })
+
.collect(Collectors.toList());
- final Set<String> adjacentHosts = new HashSet<>(TOKENS.size());
+ Set<String> adjacentHosts = new HashSet<>(TOKENS.size());
for (int i = 0; i < TOKENS.size(); i++)
{
- final String localhost = INSTANCES.get(i);
- final BigInteger token = tokenAt(i);
+ String localhost = INSTANCES.get(i);
+ BigInteger token = tokenAt(i);
test(localhost, token, allHosts, 1, tokenAt(i + 1));
- final Set<Host> adjacent =
InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) -> isLocal(localhost,
-
host), token, allHosts, 1);
+ Set<Host> adjacent =
InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+
(host) -> isLocal(localhost, host),
+
token,
+
allHosts,
+
1);
assertEquals(1, adjacent.size());
- final String adjacentStr =
adjacent.stream().findFirst().map(Host::toString).orElseThrow();
+ String adjacentStr =
adjacent.stream().findFirst().map(Host::toString).orElseThrow();
assertFalse(adjacentHosts.contains(adjacentStr));
adjacentHosts.add(adjacentStr);
}
@@ -191,7 +199,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
@Test
public void testMinToken()
{
- final List<TokenRange> tokens = TOKENS.stream().map(start -> new
TokenRange(start, start.add(BigInteger.ONE))).collect(Collectors.toList());
+ List<TokenRange> tokens = TOKENS.stream().map(start -> new
TokenRange(start, start.add(BigInteger.ONE))).collect(Collectors.toList());
Collections.shuffle(tokens);
assertEquals(tokenAt(0),
InnerDcTokenAdjacentPeerProvider.minToken(tokens.stream()));
}
@@ -206,15 +214,16 @@ public class InnerDcTokenAdjacentPeerProviderTests
"local1-i2", "local2-i2", "local3-i2",
"local1-i3", "local2-i3", "local3-i3",
"local1-i4", "local2-i4", "local3-i4");
- final BigInteger token = new
BigInteger(tokens.stream().findFirst().orElseThrow());
+ BigInteger token = new
BigInteger(tokens.stream().findFirst().orElseThrow());
List<Pair<Host, BigInteger>> sortedLocalDcHosts = IntStream.range(0,
tokens.size())
.mapToObj(i
-> {
BigInteger t = new BigInteger(tokens.get(i));
return
Pair.of(mockHost(hosts.get(i), t), t);
})
.collect(Collectors.toList());
- final int quorum = 5;
- InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) ->
host.getAddress().getHostName().startsWith("local1-"),
+ int quorum = 5;
+ InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+ (host) ->
DRIVER_UTILS.getSocketAddress(host).getAddress().getHostName().startsWith("local1-"),
token,
sortedLocalDcHosts,
quorum)
@@ -229,12 +238,13 @@ public class InnerDcTokenAdjacentPeerProviderTests
private static void test(String localhost, BigInteger token,
List<Pair<Host, BigInteger>> allHosts, int quorum, BigInteger... expected)
{
- final Set<String> result =
InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) -> isLocal(localhost,
host),
-
token,
-
allHosts,
-
quorum)
-
.stream().map(Host::toString)
-
.collect(Collectors.toSet());
+ Set<String> result =
InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+
(host) -> isLocal(localhost, host),
+
token,
+
allHosts,
+
quorum)
+
.stream().map(Host::toString)
+
.collect(Collectors.toSet());
assertFalse(result.contains(token.toString()));
for (BigInteger bi : expected)
{
@@ -245,7 +255,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
private static boolean isLocal(String localhost, Host host)
{
- return isLocal(localhost, host.getAddress().getHostName());
+ return isLocal(localhost,
DRIVER_UTILS.getSocketAddress(host).getAddress().getHostName());
}
private static boolean isLocal(String localhost, String addr)
@@ -283,12 +293,16 @@ public class InnerDcTokenAdjacentPeerProviderTests
{
Host host = mock(Host.class);
InetAddress addr = mock(InetAddress.class);
+ EndPoint mockEndpoint = mock(EndPoint.class);
+ InetSocketAddress mockInetSocketAddress =
mock(InetSocketAddress.class);
when(addr.getHostName()).thenReturn(hostname);
when(addr.getHostAddress()).thenReturn(hostname);
- when(host.getAddress()).thenReturn(addr);
+ when(host.getEndPoint()).thenReturn(mockEndpoint);
+ when(mockEndpoint.resolve()).thenReturn(mockInetSocketAddress);
+ when(mockInetSocketAddress.getAddress()).thenReturn(addr);
when(host.toString()).thenReturn(token.toString());
when(host.getDatacenter()).thenReturn(dc);
- final Token t = mock(Token.class);
+ Token t = mock(Token.class);
when(t.getType()).thenReturn(DataType.bigint());
when(t.getValue()).thenReturn(token.longValue());
when(host.getTokens()).thenReturn(Set.of(t));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]