This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6aa69069 CASSSIDECAR-229: SidecarInstanceCodec is failing to find 
codec for type (#209)
6aa69069 is described below

commit 6aa69069f44b970af79759f2eec134cb251845be
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Mar 17 12:18:16 2025 -0700

    CASSSIDECAR-229: SidecarInstanceCodec is failing to find codec for type 
(#209)
    
    Patch by Francisco Guerrero; reviewed by Yifan Cai, Bernardo Botella for 
CASSSIDECAR-229
---
 CHANGES.txt                                        |   1 +
 .../InnerDcTokenAdjacentPeerTestProvider.java      |  78 ------
 .../testing/SharedClusterIntegrationTestBase.java  | 175 ++++++------
 .../SidecarPeerDownDetectorIntegrationTest.java    | 304 ++++++++++++++-------
 .../CassandraSchemaRouteIntegrationTest.java       |  10 +-
 .../routes/CassandraStatsIntegrationTest.java      |  36 +--
 .../sidecar/routes/CdcConfigIntegrationTest.java   |  12 +-
 .../sidecar/routes/RoutesIntegrationTest.java      |  16 +-
 .../sidecar/codecs/SidecarInstanceCodec.java       |  17 +-
 .../InnerDcTokenAdjacentPeerProvider.java          |  37 ++-
 .../coordination/SidecarPeerHealthMonitorTask.java |  34 +--
 .../coordination/SidecarPeerHealthProvider.java    |  15 +-
 .../cassandra/sidecar/modules/CdcModule.java       |  10 +-
 .../apache/cassandra/sidecar/server/Server.java    |  19 +-
 .../sidecar/codecs/SidecarInstanceCodecTest.java   |  84 ++++++
 .../InnerDcTokenAdjacentPeerProviderTests.java     | 108 ++++----
 16 files changed, 528 insertions(+), 428 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e07b173b..90497fa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * SidecarInstanceCodec is failing to find codec for type (CASSSIDECAR-229)
  * Retry Failed Schema Reports (CASSSIDECAR-217)
  * Guice modularization (CASSSIDECAR-208)
  * Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203)
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
deleted file mode 100644
index 933e6872..00000000
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/InnerDcTokenAdjacentPeerTestProvider.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.sidecar.testing;
-
-import java.util.List;
-import java.util.function.Supplier;
-
-import com.datastax.driver.core.Host;
-import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
-import 
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
-import org.apache.cassandra.sidecar.server.Server;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-
-/**
- * Text helper to find out server ports on integration tests.
- */
-public class InnerDcTokenAdjacentPeerTestProvider extends 
InnerDcTokenAdjacentPeerProvider
-{
-    private final Supplier<List<TestSidecarHostInfo>> sidecarServerSupplier;
-
-    public InnerDcTokenAdjacentPeerTestProvider(InstanceMetadataFetcher 
metadataFetcher,
-                                                
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
-                                                ServiceConfiguration 
serviceConfiguration,
-                                                DnsResolver dnsResolver,
-                                                
Supplier<List<TestSidecarHostInfo>> sidecarServerSupplier)
-    {
-        super(metadataFetcher, cassandraClientTokenRingProvider, 
serviceConfiguration, dnsResolver);
-        this.sidecarServerSupplier = sidecarServerSupplier;
-    }
-
-    @Override
-    protected int sidecarServicePort(Host host)
-    {
-        return sidecarServerSupplier.get().stream()
-                                    .filter(s -> 
s.instance.broadcastAddress().getHostName()
-                                                           
.equals(host.getBroadcastAddress().getHostName()))
-                                    .findAny()
-                                    .orElseThrow()
-                                    .port;
-    }
-
-    /**
-     * Class encapsulating different bits of information needed on integration 
tests.
-     */
-    public static class TestSidecarHostInfo
-    {
-        public final IInstance instance;
-        public final Server sidecarServer;
-        public final int port;
-
-        public TestSidecarHostInfo(IInstance instance, Server sidecarServer, 
int port)
-        {
-            this.instance = instance;
-            this.sidecarServer = sidecarServer;
-            this.port = port;
-        }
-    }
-}
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 05000a58..7ea374ce 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -26,8 +26,6 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -35,7 +33,6 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -59,7 +56,6 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
-import com.google.inject.name.Named;
 import com.google.inject.util.Modules;
 import io.vertx.core.Vertx;
 import io.vertx.junit5.VertxExtension;
@@ -79,7 +75,6 @@ import 
org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
-import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
@@ -88,28 +83,20 @@ import 
org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
 import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
 import org.apache.cassandra.sidecar.config.SslConfiguration;
 import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
-import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
-import 
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
-import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
-import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
-import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
 import org.apache.cassandra.sidecar.modules.SidecarModules;
 import org.apache.cassandra.sidecar.server.Server;
 import org.apache.cassandra.sidecar.server.SidecarServerEvents;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 import org.apache.cassandra.testing.IClusterExtension;
 import org.apache.cassandra.testing.IsolatedDTestClassLoaderWrapper;
@@ -168,14 +155,10 @@ public abstract class SharedClusterIntegrationTestBase
 
     protected DnsResolver dnsResolver = new LocalhostResolver();
     protected IClusterExtension<? extends IInstance> cluster;
-    protected Server server;
+    protected ServerWrapper serverWrapper;
     protected TestVersion testVersion;
     protected MtlsTestHelper mtlsTestHelper;
-    private final CountDownLatch sidecarSchemaReadyLatch = new 
CountDownLatch(1);
     private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
-    private Injector sidecarServerInjector;
-    protected HashMap<Server, String> serverDeploymentIds;
-    protected HashMap<Server, SidecarPeerHealthMonitorTask> peerHealthMonitors;
 
     static
     {
@@ -194,9 +177,6 @@ public abstract class SharedClusterIntegrationTestBase
         classLoaderWrapper = new IsolatedDTestClassLoaderWrapper();
         classLoaderWrapper.initializeDTestJarClassLoader(testVersion, 
TestVersion.class);
 
-        serverDeploymentIds = new HashMap<>();
-        peerHealthMonitors = new HashMap<>();
-
         beforeClusterProvisioning();
         cluster = provisionClusterWithRetries(this.testVersion);
         assertThat(cluster).isNotNull();
@@ -341,6 +321,7 @@ public abstract class SharedClusterIntegrationTestBase
 
     /**
      * Override to provide additional options to configure sidecar
+     *
      * @return function to update {@link SidecarConfigurationImpl.Builder}
      */
     protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
@@ -356,17 +337,17 @@ public abstract class SharedClusterIntegrationTestBase
      */
     protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
     {
-        server = startSidecarWithInstances(cluster);
+        serverWrapper = startSidecarWithInstances(cluster);
     }
 
     /**
      * Starts Sidecar configured to run with the provided {@link IInstance}s 
from the cluster.
      *
      * @param instances the Cassandra instances Sidecar will manage
-     * @return the started server
+     * @return a wrapper with the started server
      * @throws InterruptedException when the server start operation is 
interrupted
      */
-    protected Server startSidecarWithInstances(Iterable<? extends IInstance> 
instances) throws InterruptedException
+    protected ServerWrapper startSidecarWithInstances(Iterable<? extends 
IInstance> instances) throws InterruptedException
     {
         return startSidecarWithInstances(instances, null);
     }
@@ -374,12 +355,12 @@ public abstract class SharedClusterIntegrationTestBase
     /**
      * Starts Sidecar configured to run with the provided {@link IInstance}s 
from the cluster.
      *
-     * @param instances the Cassandra instances Sidecar will manage
+     * @param instances    the Cassandra instances Sidecar will manage
      * @param customModule an optional custom module that overrides during 
injection
      * @return the started server
      * @throws InterruptedException when the server start operation is 
interrupted
      */
-    protected Server startSidecarWithInstances(Iterable<? extends IInstance> 
instances, AbstractModule customModule) throws InterruptedException
+    protected ServerWrapper startSidecarWithInstances(Iterable<? extends 
IInstance> instances, AbstractModule customModule) throws InterruptedException
     {
         VertxTestContext context = new VertxTestContext();
         AbstractModule testModule = new IntegrationTestModule(instances, 
classLoaderWrapper, mtlsTestHelper,
@@ -389,33 +370,28 @@ public abstract class SharedClusterIntegrationTestBase
         {
             module = Modules.override(testModule).with(customModule);
         }
-        sidecarServerInjector = 
Guice.createInjector(Modules.override(SidecarModules.all()).with(module));
-        Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
-        vertx.eventBus()
-             
.localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(), msg 
-> {
-                 sidecarSchemaReadyLatch.countDown();
-             });
-        Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
-        SidecarPeerHealthMonitorTask peerHealthMonitorTask = 
sidecarServerInjector.getInstance(SidecarPeerHealthMonitorTask.class);
+        Injector injector = 
Guice.createInjector(Modules.override(SidecarModules.all()).with(module));
+        Server sidecarServer = injector.getInstance(Server.class);
         sidecarServer.start()
-                     .onSuccess(deploymentId -> {
-                         serverDeploymentIds.put(sidecarServer, deploymentId);
-                         peerHealthMonitors.put(sidecarServer, 
peerHealthMonitorTask);
-                         context.completeNow();
-                     })
+                     .onSuccess(s -> context.completeNow())
                      .onFailure(context::failNow);
 
         assertThat(context.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
-        return sidecarServer;
+        return new ServerWrapper(injector, sidecarServer);
     }
 
     protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
     {
-        assertThat(sidecarServerInjector)
+        waitForSchemaReady(serverWrapper, timeout, timeUnit);
+    }
+
+    protected void waitForSchemaReady(ServerWrapper serverWrapper, long 
timeout, TimeUnit timeUnit)
+    {
+        assertThat(serverWrapper)
         .describedAs("Sidecar should be started")
         .isNotNull();
 
-        
assertThat(Uninterruptibles.awaitUninterruptibly(sidecarSchemaReadyLatch, 
timeout, timeUnit))
+        
assertThat(Uninterruptibles.awaitUninterruptibly(serverWrapper.sidecarSchemaReadyLatch,
 timeout, timeUnit))
         .describedAs("Sidecar schema is not initialized after " + timeout + ' 
' + timeUnit)
         .isTrue();
     }
@@ -427,7 +403,7 @@ public abstract class SharedClusterIntegrationTestBase
      */
     protected void stopSidecar() throws InterruptedException
     {
-        closeServer(server);
+        closeServer(serverWrapper.server);
     }
 
     protected void closeServer(Server s) throws InterruptedException
@@ -437,7 +413,7 @@ public abstract class SharedClusterIntegrationTestBase
             return;
         }
         CountDownLatch closeLatch = new CountDownLatch(1);
-        server.close().onSuccess(res -> closeLatch.countDown());
+        s.close().onSuccess(res -> closeLatch.countDown());
         if (closeLatch.await(60, TimeUnit.SECONDS))
         {
             logger.info("Close event received before timeout.");
@@ -562,6 +538,30 @@ public abstract class SharedClusterIntegrationTestBase
         return builder.build();
     }
 
+    /**
+     * Wraps the Sidecar server and keeps a reference to the injector to be 
able to dynamically retrieve
+     * objects for testing purposes
+     */
+    public static class ServerWrapper
+    {
+        public final Injector injector;
+        public final Server server;
+        public volatile int serverPort;
+        private final CountDownLatch sidecarSchemaReadyLatch = new 
CountDownLatch(1);
+
+        public ServerWrapper(Injector sidecarServerInjector, Server server)
+        {
+            this.injector = sidecarServerInjector;
+            this.server = server;
+            // Server must have started to retrieve the port
+            this.serverPort = server.actualPort();
+
+            Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
+            
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
+                                           msg -> 
sidecarSchemaReadyLatch.countDown());
+        }
+    }
+
     /**
      * Test module that configures the instances based on the cluster instances
      */
@@ -630,42 +630,35 @@ public abstract class SharedClusterIntegrationTestBase
 
         @Provides
         @Singleton
-        @Named("sidecarInstanceSupplier")
-        public 
Supplier<List<InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo>> 
supplier()
+        public SidecarConfiguration sidecarConfiguration()
         {
-            return ArrayList::new;
+            return defaultConfigurationBuilder(mtlsTestHelper, 
configurationOverrides).build();
         }
 
         @Provides
         @Singleton
-        public SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration()
+        public DnsResolver dnsResolver()
         {
-            return new SidecarPeerHealthConfigurationImpl(false,
-                                                          new 
MillisecondBoundConfiguration(1, TimeUnit.SECONDS),
-                                                          1,
-                                                          new 
MillisecondBoundConfiguration(500, TimeUnit.MILLISECONDS));
+            return dnsResolver;
         }
 
         @Provides
         @Singleton
-        public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher 
metadataFetcher,
-                                                       
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
-                                                       SidecarConfiguration 
configuration,
-                                                       DnsResolver dnsResolver,
-                                                       
@Named("sidecarInstanceSupplier")
-                                                               Supplier<List
-                                                                        
<InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo>> supplier)
+        public ClusterLease clusterLease()
         {
-            return new InnerDcTokenAdjacentPeerTestProvider(metadataFetcher,
-                                                            
cassandraClientTokenRingProvider,
-                                                            
configuration.serviceConfiguration(),
-                                                            dnsResolver,
-                                                            supplier);
+            return new ClusterLease(ClusterLease.Ownership.CLAIMED);
         }
 
-        @Provides
-        @Singleton
-        public SidecarConfiguration 
sidecarConfiguration(SidecarPeerHealthConfiguration 
sidecarPeerHealthConfiguration)
+        private List<InetSocketAddress> buildContactPoints()
+        {
+            return StreamSupport.stream(instances.spliterator(), false)
+                                .map(instance -> new 
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
+                                                                       
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
+                                .collect(Collectors.toList());
+        }
+
+        public static SidecarConfigurationImpl.Builder 
defaultConfigurationBuilder(MtlsTestHelper mtlsTestHelper,
+                                                                               
    Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides)
         {
             ServiceConfiguration conf = ServiceConfigurationImpl.builder()
                                                                 
.host("0.0.0.0") // binds to all interfaces, potential security issue if left 
running for long
@@ -708,46 +701,18 @@ public abstract class SharedClusterIntegrationTestBase
                                                                                
  5242880, DEFAULT_API_CALL_TIMEOUT,
                                                                                
  buildTestS3ProxyConfig());
 
-            SidecarClientConfiguration sidecarClientConfiguration = new 
SidecarClientConfigurationImpl(sslConfiguration);
-
-
-
             SidecarConfigurationImpl.Builder builder = 
SidecarConfigurationImpl.builder()
                                                                                
.serviceConfiguration(conf)
                                                                                
.s3ClientConfiguration(s3ClientConfig)
-                                                                               
.sslConfiguration(sslConfiguration)
-                                                                               
.sidecarClientConfiguration(sidecarClientConfiguration)
-                                                                               
.sidecarPeerHealthConfiguration(sidecarPeerHealthConfiguration);
+                                                                               
.sslConfiguration(sslConfiguration);
             if (configurationOverrides != null)
             {
                 builder = configurationOverrides.apply(builder);
             }
-            return builder.build();
+            return builder;
         }
 
-        @Provides
-        @Singleton
-        public DnsResolver dnsResolver()
-        {
-            return dnsResolver;
-        }
-
-        @Provides
-        @Singleton
-        public ClusterLease clusterLease()
-        {
-            return new ClusterLease(ClusterLease.Ownership.CLAIMED);
-        }
-
-        private List<InetSocketAddress> buildContactPoints()
-        {
-            return StreamSupport.stream(instances.spliterator(), false)
-                                .map(instance -> new 
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
-                                                                       
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
-                                .collect(Collectors.toList());
-        }
-
-        private S3ProxyConfiguration buildTestS3ProxyConfig()
+        private static S3ProxyConfiguration buildTestS3ProxyConfig()
         {
             return new S3ProxyConfiguration()
             {
@@ -789,6 +754,20 @@ public abstract class SharedClusterIntegrationTestBase
             }
         }
 
+        public static String cassandraInstanceHostname(IInstance 
cassandraInstance, DnsResolver dnsResolver)
+        {
+            IInstanceConfig config = cassandraInstance.config();
+            String ipAddress = JMXUtil.getJmxHost(config);
+            try
+            {
+                return dnsResolver.reverseResolve(ipAddress);
+            }
+            catch (UnknownHostException e)
+            {
+                return ipAddress;
+            }
+        }
+
         static InstanceMetadata buildInstanceMetadata(Vertx vertx,
                                                       IInstance 
cassandraInstance,
                                                       CassandraVersionProvider 
versionProvider,
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
index d69683e0..1d7addc3 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java
@@ -19,11 +19,11 @@
 
 package org.apache.cassandra.sidecar.health;
 
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -32,70 +32,98 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
-import com.google.inject.name.Named;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
+import 
org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import 
org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
+import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
+import 
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider;
+import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
 import org.apache.cassandra.sidecar.server.Server;
-import 
org.apache.cassandra.sidecar.testing.InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS;
+import static 
org.apache.cassandra.sidecar.testing.MtlsTestHelper.EMPTY_PASSWORD_STRING;
+import static 
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.cassandraInstanceHostname;
+import static 
org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.defaultConfigurationBuilder;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
 import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
+/**
+ * Integration test for the Sidecar peer monitoring feature
+ */
 class SidecarPeerDownDetectorIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
 {
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarPeerDownDetectorIntegrationTest.class);
+    private static final SchemaKeyspaceConfiguration SCHEMA_KEYSPACE_CONFIG = 
SchemaKeyspaceConfigurationImpl.builder()
+                                                                               
                              .isEnabled(true)
+                                                                               
                              .build();
 
-    List<TestSidecarHostInfo> sidecarServerList = new ArrayList<>();
+    // Key is the sidecar IP address and value is the server wrapper
+    private final Map<String, ServerWrapper> sidecarServerMap = new 
HashMap<>();
+    private final DriverUtils driverUtils = new DriverUtils();
 
     @Override
     protected ClusterBuilderConfiguration testClusterConfiguration()
     {
-        return new ClusterBuilderConfiguration().nodesPerDc(3);
+        return super.testClusterConfiguration().nodesPerDc(3);
     }
 
     @Override
     protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
     {
-        Supplier<List<TestSidecarHostInfo>> supplier = () -> sidecarServerList;
-        PeersModule peersModule = new PeersModule(supplier);
         for (IInstance cassandraInstance : cluster)
         {
             // Storing all the created sidecar instances into a list for 
further reference.
             LOGGER.info("Starting Sidecar instance for Cassandra instance {}",
                         cassandraInstance.config().num());
-            Server server = 
startSidecarWithInstances(List.of(cassandraInstance), peersModule);
-            sidecarServerList.add(new TestSidecarHostInfo(cassandraInstance, 
server, server.actualPort()));
-        }
-
-        assertThat(sidecarServerList.size()).as("Each Cassandra Instance will 
be managed by a single Sidecar instance")
-                                            .isEqualTo(cluster.size());
+            String cassandraInstanceHostname = 
cassandraInstanceHostname(cassandraInstance, dnsResolver);
+            PeersTestModule peersModule = new 
PeersTestModule(cassandraInstanceHostname);
+            ServerWrapper serverWrapper = 
startSidecarWithInstances(List.of(cassandraInstance), peersModule);
+            sidecarServerMap.put(cassandraInstanceHostname, serverWrapper);
 
+            if (this.serverWrapper == null)
+            {
+                // assign the server to the first instance
+                this.serverWrapper = serverWrapper;
+            }
+        }
 
-        // assign the server to the first instance
-        server = sidecarServerList.get(0).sidecarServer;
+        assertThat(sidecarServerMap.size()).as("Each Cassandra Instance will 
be managed by a single Sidecar instance")
+                                           .isEqualTo(cluster.size());
     }
 
     @Override
     protected void stopSidecar()
     {
-        sidecarServerList.forEach(s -> {
+        sidecarServerMap.values().forEach(serverWrapper -> {
             try
             {
-                closeServer(s.sidecarServer);
+                closeServer(serverWrapper.server);
             }
             catch (Exception e)
             {
@@ -104,118 +132,184 @@ class SidecarPeerDownDetectorIntegrationTest extends 
SharedClusterSidecarIntegra
         });
     }
 
-    class PeersModule extends AbstractModule
+    @Test
+    void testOnePeerDown()
     {
-        Supplier<List<TestSidecarHostInfo>> supplier;
+        SidecarPeerHealthMonitorTask monitor = 
serverWrapper.injector.getInstance(SidecarPeerHealthMonitorTask.class);
+        assertThat(monitor.status()).as("Monitor hasn't had time to perform 
checks").isEmpty();
 
-        public PeersModule(Supplier<List<TestSidecarHostInfo>> supplier)
-        {
-            this.supplier = supplier;
-        }
+        loopAssert(30, () -> assertThat(checkHostUp(monitor, "localhost2"))
+                             .as("After some time, peer is up")
+                             .isTrue());
 
-        @Provides
-        @Singleton
-        @Named("sidecarInstanceSupplier")
-        public Supplier<List<TestSidecarHostInfo>> supplier()
-        {
-            return supplier;
-        }
+        stopSidecarInstanceForTest("localhost2");
+        loopAssert(30, () -> assertThat(checkHostDown(monitor, "localhost2"))
+                             .as("After killing peer sidecar instance, monitor 
caches up and the host is down")
+                             .isTrue());
+        startSidecarInstanceForTest("localhost2");
+        loopAssert(30, () -> assertThat(checkHostUp(monitor, "localhost2"))
+                             .as("After restarting peer sidecar instance, 
monitor caches up and the host is down")
+                             .isTrue());
+    }
 
-        @Provides
-        @Singleton
-        public SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration()
-        {
-            return new SidecarPeerHealthConfigurationImpl(true,
-                                                          new 
MillisecondBoundConfiguration(1, TimeUnit.SECONDS),
-                                                          1,
-                                                          new 
MillisecondBoundConfiguration(500, TimeUnit.MILLISECONDS));
-        }
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        QualifiedName qualifiedName = new QualifiedName("cdc", "test");
+        createTestKeyspace(qualifiedName, DC1_RF3);
+        createTestTable(qualifiedName, "CREATE TABLE %s (id text PRIMARY KEY, 
name text) WITH cdc=true");
     }
 
-    void stopSidecarInstanceForTest(int instanceId)
+    private boolean checkHostUp(SidecarPeerHealthMonitorTask monitor, String 
hostname)
     {
-        assertThat(sidecarServerList).isNotEmpty();
-        Server server = sidecarServerList.get(instanceId).sidecarServer;
-        String deploymentId = serverDeploymentIds.get(server);
-        getBlocking(server.stop(deploymentId), 30, TimeUnit.SECONDS, "Stopping 
server " + deploymentId);
+        return checkHostStatus(monitor, hostname, 
SidecarPeerHealthProvider.Health.UP);
     }
 
-    void startSidecarInstanceForTest(int instanceId) throws Exception
+    private boolean checkHostDown(SidecarPeerHealthMonitorTask monitor, String 
hostname)
     {
-        assertThat(sidecarServerList).isNotEmpty();
-        TestSidecarHostInfo server = sidecarServerList.get(instanceId);
-        getBlocking(server.sidecarServer.start(), 30, TimeUnit.SECONDS, 
"Starting server...");
+        return checkHostStatus(monitor, hostname, 
SidecarPeerHealthProvider.Health.DOWN);
     }
 
-    @Override
-    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
+    private boolean checkHostStatus(SidecarPeerHealthMonitorTask monitor, 
String hostname, SidecarPeerHealthProvider.Health status)
     {
-        return builder -> {
-            ServiceConfiguration conf;
-            if (sidecarServerList.isEmpty())
-            {
-                // As opposed to the base class, this binds the host to a 
specific interface (localhost)
-                conf = ServiceConfigurationImpl.builder()
-                                               .host("localhost")
-                                               .port(0) // let the test find 
an available port
-                                               .build();
-            }
-            else
+        for (Map.Entry<SidecarInstance, SidecarPeerHealthProvider.Health> 
entry : monitor.status().entrySet())
+        {
+            if (hostname.equals(entry.getKey().hostname()))
             {
-                // Use the same port number for all Sidecar instances that we 
bring up. We use the port
-                // bound for the first instance, but we bind it to a different 
interface (localhost2, localhost3)
-                conf = ServiceConfigurationImpl.builder()
-                                               .host("localhost" + 
(sidecarServerList.size() + 1))
-                                               
.port(sidecarServerList.get(0).sidecarServer.actualPort())
-                                               .build();
+                return status.equals(entry.getValue());
             }
-            builder.serviceConfiguration(conf);
-
-            return builder;
-        };
+        }
+        fail("Status unavailable for host " + hostname);
+        // this code block is not reachable but required for compilation
+        // we could alternatively just throw an AssertionError above
+        return false;
     }
 
-    @Test
-    void onePeerDownTest() throws Exception
+    void stopSidecarInstanceForTest(String hostname)
     {
-        SidecarPeerHealthMonitorTask monitor = 
peerHealthMonitors.get(sidecarServerList.get(0).sidecarServer);
-        // Monitor hasn't had time to perform checks
-        assertTrue(monitor.getStatus().isEmpty());
-
-        // After some time, peer is up
-        Thread.sleep(5000);
-        checkHostUp(monitor, "localhost2");
-
-        stopSidecarInstanceForTest(1);
-        Thread.sleep(5000);
-        // After killing peer sidecar instance, monitor caches up and the host 
is down
-        checkHostDown(monitor, "localhost2");
-        startSidecarInstanceForTest(1);
-        Thread.sleep(5000);
-        // After restarting peer sidecar instance, monitor caches up and the 
host is down
-        checkHostUp(monitor, "localhost2");
+        assertThat(sidecarServerMap).containsKey(hostname);
+        Server server = sidecarServerMap.get(hostname).server;
+        String deploymentId = server.deploymentId();
+        LOGGER.info("Stopping Sidecar server {} with deployment ID {}", 
hostname, deploymentId);
+        getBlocking(server.stop(deploymentId), 30, TimeUnit.SECONDS,
+                    "Stopping Sidecar server " + hostname + " with deployment 
ID " + deploymentId);
     }
 
-    private boolean checkHostUp(SidecarPeerHealthMonitorTask monitor, String 
hostname)
+    void startSidecarInstanceForTest(String hostname)
     {
-        return checkHostStatus(monitor, hostname, 
SidecarPeerHealthProvider.Health.UP);
+        ServerWrapper serverWrapper = sidecarServerMap.get(hostname);
+        assertThat(serverWrapper).isNotNull();
+        Server server = serverWrapper.server;
+        LOGGER.info("Starting Sidecar server {}...", hostname);
+        getBlocking(server.start(), 30, TimeUnit.SECONDS, "Starting Sidecar 
server " + hostname + "...");
+        // Update the server port after starting a new time
+        // because a new port will be assigned
+        serverWrapper.serverPort = server.actualPort();
     }
 
-    private boolean checkHostDown(SidecarPeerHealthMonitorTask monitor, String 
hostname)
+    class PeersTestModule extends AbstractModule
     {
-        return checkHostStatus(monitor, hostname, 
SidecarPeerHealthProvider.Health.DOWN);
-    }
+        private final String cassandraInstanceHostname;
 
-    private boolean checkHostStatus(SidecarPeerHealthMonitorTask monitor, 
String hostname, SidecarPeerHealthProvider.Health status)
-    {
-        return monitor.getStatus().entrySet().stream().filter(e -> 
e.getKey().hostname().equals(hostname)).findAny().orElseThrow().getValue().equals(status);
+        public PeersTestModule(String cassandraInstanceHostname)
+        {
+            this.cassandraInstanceHostname = cassandraInstanceHostname;
+        }
+
+        @Provides
+        @Singleton
+        public SidecarConfiguration sidecarConfiguration()
+        {
+            Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides = builder -> {
+
+                // Override the service configuration such that we listen on 
the host associated with the
+                // Cassandra instance associated with this Sidecar instance
+                ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+                                                                    
.host(cassandraInstanceHostname)
+                                                                    .port(0) 
// let the test find an available port
+                                                                    
.schemaKeyspaceConfiguration(SCHEMA_KEYSPACE_CONFIG)
+                                                                    .build();
+
+                // We need to provide mTLS configuration for the Sidecar 
client so it can talk to
+                // other sidecars using mTLS
+                SslConfiguration clientSslConfiguration = null;
+                if (mtlsTestHelper.isEnabled())
+                {
+                    LOGGER.info("Enabling test mTLS certificate/keystore.");
+
+                    KeyStoreConfiguration truststoreConfiguration =
+                    new 
KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
+                                                  
mtlsTestHelper.trustStorePassword(),
+                                                  
mtlsTestHelper.trustStoreType(),
+                                                  
SecondBoundConfiguration.parse("60s"));
+
+                    KeyStoreConfiguration keyStoreConfiguration =
+                    new 
KeyStoreConfigurationImpl(mtlsTestHelper.clientKeyStorePath(),
+                                                  EMPTY_PASSWORD_STRING,
+                                                  
mtlsTestHelper.serverKeyStoreType(), // server and client keystore types are 
the same
+                                                  
SecondBoundConfiguration.parse("60s"));
+
+                    clientSslConfiguration = SslConfigurationImpl.builder()
+                                                                 .enabled(true)
+                                                                 
.keystore(keyStoreConfiguration)
+                                                                 
.truststore(truststoreConfiguration)
+                                                                 .build();
+                }
+                else
+                {
+                    LOGGER.info("Not enabling mTLS for testing purposes. Set 
'{}' to 'true' if you would " +
+                                "like mTLS enabled.", 
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
+                }
+
+                SidecarClientConfiguration sidecarClientConfiguration = new 
SidecarClientConfigurationImpl(clientSslConfiguration);
+
+                // Let's run this very frequently for testing purposes
+                SidecarPeerHealthConfigurationImpl 
sidecarPeerHealthConfiguration
+                = new SidecarPeerHealthConfigurationImpl(true,
+                                                         
MillisecondBoundConfiguration.parse("100ms"),
+                                                         1,
+                                                         
MillisecondBoundConfiguration.parse("50ms"));
+                builder.serviceConfiguration(conf)
+                       .sidecarClientConfiguration(sidecarClientConfiguration)
+                       
.sidecarPeerHealthConfiguration(sidecarPeerHealthConfiguration);
+                return builder;
+            };
+
+            return defaultConfigurationBuilder(mtlsTestHelper, 
configurationOverrides).build();
+        }
+
+        @Provides
+        @Singleton
+        public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher 
metadataFetcher,
+                                                       
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
+                                                       SidecarConfiguration 
configuration,
+                                                       DnsResolver dnsResolver)
+        {
+            return new InnerDcTokenAdjacentPeerTestProvider(metadataFetcher,
+                                                            
cassandraClientTokenRingProvider,
+                                                            
configuration.serviceConfiguration(),
+                                                            dnsResolver);
+        }
     }
 
-    @Override
-    protected void initializeSchemaForTest()
+    /**
+     * Test helper to find out server ports on integration tests.
+     */
+    class InnerDcTokenAdjacentPeerTestProvider extends 
InnerDcTokenAdjacentPeerProvider
     {
-        createTestKeyspace("cdc", DC1_RF3);
-        createTestTable(new QualifiedName("cdc", "test"), "CREATE TABLE %s (id 
text PRIMARY KEY, name text) WITH cdc=true");
+        InnerDcTokenAdjacentPeerTestProvider(InstanceMetadataFetcher 
metadataFetcher,
+                                             CassandraClientTokenRingProvider 
cassandraClientTokenRingProvider,
+                                             ServiceConfiguration 
serviceConfiguration,
+                                             DnsResolver dnsResolver)
+        {
+            super(metadataFetcher, cassandraClientTokenRingProvider, 
serviceConfiguration, dnsResolver, driverUtils);
+        }
+
+        @Override
+        protected int sidecarServicePort(String sidecarHostname)
+        {
+            return sidecarServerMap.get(sidecarHostname).serverPort;
+        }
     }
 }
 
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
index 201ab2c7..56cb507a 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraSchemaRouteIntegrationTest.java
@@ -44,7 +44,7 @@ class CassandraSchemaRouteIntegrationTest extends 
SharedClusterSidecarIntegratio
     {
         String testRoute = "/api/v1/schema/keyspaces";
         SchemaResponse response = getBlocking(trustedClient()
-                                              .get(server.actualPort(), 
"localhost", testRoute)
+                                              .get(serverWrapper.serverPort, 
"localhost", testRoute)
                                               .send()
                                               
.expecting(HttpResponseExpectation.SC_OK))
                                   .bodyAsJson(SchemaResponse.class);
@@ -58,7 +58,7 @@ class CassandraSchemaRouteIntegrationTest extends 
SharedClusterSidecarIntegratio
     {
         String testRoute = "/api/v1/schema/keyspaces/non_existent";
         getBlocking(trustedClient()
-                    .get(server.actualPort(), "localhost", testRoute)
+                    .get(serverWrapper.serverPort, "localhost", testRoute)
                     .send()
                     .expecting(HttpResponseExpectation.SC_NOT_FOUND));
     }
@@ -68,7 +68,7 @@ class CassandraSchemaRouteIntegrationTest extends 
SharedClusterSidecarIntegratio
     {
         String testRoute = "/api/v1/schema/keyspaces/testkeyspace";
         SchemaResponse response = getBlocking(trustedClient()
-                                              .get(server.actualPort(), 
"localhost", testRoute)
+                                              .get(serverWrapper.serverPort, 
"localhost", testRoute)
                                               .send()
                                               
.expecting(HttpResponseExpectation.SC_OK))
                                   .bodyAsJson(SchemaResponse.class);
@@ -82,7 +82,7 @@ class CassandraSchemaRouteIntegrationTest extends 
SharedClusterSidecarIntegratio
     {
         String testRoute = "/api/v1/schema/keyspaces/\"Cycling\"";
         SchemaResponse response = getBlocking(trustedClient()
-                                              .get(server.actualPort(), 
"localhost", testRoute)
+                                              .get(serverWrapper.serverPort, 
"localhost", testRoute)
                                               .send()
                                               
.expecting(HttpResponseExpectation.SC_OK))
                                   .bodyAsJson(SchemaResponse.class);
@@ -96,7 +96,7 @@ class CassandraSchemaRouteIntegrationTest extends 
SharedClusterSidecarIntegratio
     {
         String testRoute = "/api/v1/schema/keyspaces/\"keyspace\"";
         SchemaResponse response = getBlocking(trustedClient()
-                                              .get(server.actualPort(), 
"localhost", testRoute)
+                                              .get(serverWrapper.serverPort, 
"localhost", testRoute)
                                               .send()
                                               
.expecting(HttpResponseExpectation.SC_OK))
                                   .bodyAsJson(SchemaResponse.class);
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
index 3d09bc4a..6bde4cb4 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraStatsIntegrationTest.java
@@ -28,8 +28,8 @@ import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpResponseExpectation;
 import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.TableStatsResponse;
 import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
@@ -70,9 +70,9 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
     {
         Map<String, Boolean> expectedParams = Map.of("summary", true);
         String testRoute = "/api/v1/cassandra/stats/connected-clients";
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                   
.expect(ResponsePredicate.SC_OK)
-                                                                   .send());
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                   .send()
+                                                                   
.expecting(HttpResponseExpectation.SC_OK));
         assertClientStatsResponse(response, expectedParams);
     }
 
@@ -81,9 +81,9 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
     {
         Map<String, Boolean> expectedParams = Map.of("summary", false);
         String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                   
.expect(ResponsePredicate.SC_OK)
-                                                                   .send());
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                   .send()
+                                                                   
.expecting(HttpResponseExpectation.SC_OK));
         assertClientStatsResponse(response, expectedParams);
     }
 
@@ -96,9 +96,9 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
 
             Map<String, Boolean> expectedParams = Map.of("summary", false);
             String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
-            HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                       
.expect(ResponsePredicate.SC_OK)
-                                                                       
.send());
+            HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                       .send()
+                                                                       
.expecting(HttpResponseExpectation.SC_OK));
             assertClientStatsResponse(response, expectedParams, 4, true);
         }
     }
@@ -111,9 +111,9 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
         {
             Map<String, Boolean> expectedParams = Map.of("summary", false);
             String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=false";
-            HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                       
.expect(ResponsePredicate.SC_OK)
-                                                                       
.send());
+            HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                       .send()
+                                                                       
.expecting(HttpResponseExpectation.SC_OK));
             assertClientStatsResponse(response, expectedParams, 4);
         }
     }
@@ -127,9 +127,9 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
     {
         Map<String, Boolean> expectedParams = Map.of("summary", true);
         String testRoute = 
"/api/v1/cassandra/stats/connected-clients?summary=123&bad-arg=xyz";
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                   
.expect(ResponsePredicate.SC_OK)
-                                                                   .send());
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                   .send()
+                                                                   
.expecting(HttpResponseExpectation.SC_OK));
         assertClientStatsResponse(response, expectedParams);
     }
 
@@ -167,7 +167,7 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
         String testRoute = 
String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/" + tableName.table() + 
"-snapshot",
                                          tableName.keyspace(), 
tableName.table());
         HttpResponse<Buffer> resp;
-        resp = getBlocking(trustedClient().put(server.actualPort(), 
"localhost", testRoute)
+        resp = getBlocking(trustedClient().put(serverWrapper.serverPort, 
"localhost", testRoute)
                                  .send());
         assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
     }
@@ -176,7 +176,7 @@ class CassandraStatsIntegrationTest extends 
SharedClusterSidecarIntegrationTestB
     {
         String testRoute = "/api/v1/cassandra/keyspaces/" + 
tableName.keyspace() + "/tables/" + tableName.table() + "/stats";
         HttpResponse<Buffer> resp;
-        resp = getBlocking(trustedClient().get(server.actualPort(), 
"localhost", testRoute)
+        resp = getBlocking(trustedClient().get(serverWrapper.serverPort, 
"localhost", testRoute)
                                  .send());
         assertTableStatsResponse(tableName, resp);
     }
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
index a2de83f3..ab5d6826 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CdcConfigIntegrationTest.java
@@ -69,7 +69,7 @@ class CdcConfigIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
 
         // Create new configs
         UpdateCdcServiceConfigPayload payload = new 
UpdateCdcServiceConfigPayload(Map.of("k1", "v1"));
-        UpdateCdcServiceConfigPayload newConfigResponse = 
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+        UpdateCdcServiceConfigPayload newConfigResponse = 
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", 
configRoute)
                                                                                
      .as(BodyCodec.json(UpdateCdcServiceConfigPayload.class))
                                                                                
      .sendJson(JsonObject.mapFrom(payload))
                                                                                
      .expecting(HttpResponseExpectation.SC_OK))
@@ -78,7 +78,7 @@ class CdcConfigIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
 
         // update configs
         UpdateCdcServiceConfigPayload updatedPayload = new 
UpdateCdcServiceConfigPayload(Map.of("k3", "v3"));
-        UpdateCdcServiceConfigPayload updatedConfigResponse = 
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+        UpdateCdcServiceConfigPayload updatedConfigResponse = 
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", 
configRoute)
                                                                                
          .as(BodyCodec.json(UpdateCdcServiceConfigPayload.class))
                                                                                
          .sendJson(JsonObject.mapFrom(updatedPayload)))
                                                               .body();
@@ -86,7 +86,7 @@ class CdcConfigIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
 
         // GetConfigs should give updated configs
         String getConfigsRoute = ApiEndpointsV1.SERVICES_CONFIG_ROUTE;
-        AllServicesConfigPayload getServicesResponse = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", 
getConfigsRoute)
+        AllServicesConfigPayload getServicesResponse = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
getConfigsRoute)
                                                                                
   .as(BodyCodec.json(AllServicesConfigPayload.class))
                                                                                
   .sendJson(JsonObject.mapFrom(updatedPayload)))
                                                        .body();
@@ -97,11 +97,11 @@ class CdcConfigIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
         assertThat(getServicesResponse).isEqualTo(expectedConfigPayload);
 
         // delete all CDC configs
-        assertThat(getBlocking(trustedClient().delete(server.actualPort(), 
"localhost", configRoute)
+        
assertThat(getBlocking(trustedClient().delete(serverWrapper.serverPort, 
"localhost", configRoute)
                                               
.send()).bodyAsJsonObject().getString("status")).isEqualTo("OK");
 
         // Get configs should have no configs
-        AllServicesConfigPayload response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", 
getConfigsRoute)
+        AllServicesConfigPayload response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
getConfigsRoute)
                                                                        
.as(BodyCodec.json(AllServicesConfigPayload.class))
                                                                        
.sendJson(JsonObject.mapFrom(updatedPayload)))
                                             .body();
@@ -117,7 +117,7 @@ class CdcConfigIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
         // Update with Invalid service
         Map<String, String> configs = Map.of("k1", "v1");
         UpdateCdcServiceConfigPayload payload = new 
UpdateCdcServiceConfigPayload(configs);
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().put(server.actualPort(), "localhost", configRoute)
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().put(serverWrapper.serverPort, "localhost", 
configRoute)
                                                                    
.sendJson(JsonObject.mapFrom(payload)));
 
         
assertThat(HttpResponseStatus.BAD_REQUEST.code()).isEqualTo(response.statusCode());
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
index 0667f034..fdd9b82b 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RoutesIntegrationTest.java
@@ -22,8 +22,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpResponseExpectation;
 import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
 import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
@@ -47,7 +47,7 @@ class RoutesIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
     @Test
     void healthHappyPathTest()
     {
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", 
"/api/v1/__health")
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
"/api/v1/__health")
                                                                    .send());
         
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
     }
@@ -56,9 +56,9 @@ class RoutesIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
     void retrieveGossipInfo()
     {
         String testRoute = "/api/v1/cassandra/gossip";
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                   
.expect(ResponsePredicate.SC_OK)
-                                                                   .send());
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                   .send()
+                                                                   
.expecting(HttpResponseExpectation.SC_OK));
         GossipInfoResponse gossipResponse = 
response.bodyAsJson(GossipInfoResponse.class);
         assertThat(gossipResponse).isNotNull()
                                   .hasSize(1);
@@ -87,9 +87,9 @@ class RoutesIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
     private HealthResponse getGossipHealth()
     {
         String testRoute = "/api/v1/cassandra/gossip/__health";
-        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(server.actualPort(), "localhost", testRoute)
-                                                                   
.expect(ResponsePredicate.SC_OK)
-                                                                   .send());
+        HttpResponse<Buffer> response = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", 
testRoute)
+                                                                   .send()
+                                                                   
.expecting(HttpResponseExpectation.SC_OK));
         assertThat(response.statusCode()).isEqualTo(OK.code());
         return response.bodyAsJson(HealthResponse.class);
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
index 022cebd4..1b9b7780 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodec.java
@@ -18,26 +18,23 @@
 
 package org.apache.cassandra.sidecar.codecs;
 
-import com.google.inject.Singleton;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.eventbus.MessageCodec;
-import io.vertx.core.eventbus.impl.codecs.StringMessageCodec;
+import io.vertx.core.eventbus.impl.CodecManager;
 import org.apache.cassandra.sidecar.common.client.SidecarInstance;
 import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
 
 /**
  * Codecs for Sidecar instances
+ * @param <T> a type implementing {@link SidecarInstance}
  */
-@Singleton
-public class SidecarInstanceCodec implements MessageCodec<SidecarInstance, 
SidecarInstance>
+public class SidecarInstanceCodec<T extends SidecarInstance> implements 
MessageCodec<T, SidecarInstance>
 {
-    public static final StringMessageCodec STRING = new StringMessageCodec();
-
     @Override
-    public void encodeToWire(Buffer buf, SidecarInstance instance)
+    public void encodeToWire(Buffer buf, T instance)
     {
         buf.appendInt(instance.port());
-        STRING.encodeToWire(buf, instance.hostname());
+        CodecManager.STRING_MESSAGE_CODEC.encodeToWire(buf, 
instance.hostname());
     }
 
     @Override
@@ -45,11 +42,11 @@ public class SidecarInstanceCodec implements 
MessageCodec<SidecarInstance, Sidec
     {
         int port = buf.getInt(pos);
         pos += 4; // advance 4 bytes after reading int
-        return new SidecarInstanceImpl(STRING.decodeFromWire(pos, buf), port);
+        return new 
SidecarInstanceImpl(CodecManager.STRING_MESSAGE_CODEC.decodeFromWire(pos, buf), 
port);
     }
 
     @Override
-    public SidecarInstance transform(SidecarInstance instance)
+    public SidecarInstance transform(T instance)
     {
         return new SidecarInstanceImpl(instance.hostname(), instance.port());
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
index eef20bfd..5941da03 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.coordination;
 
 import java.math.BigInteger;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,9 +48,11 @@ import 
org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.Token;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.VisibleForTesting;
 
 import static 
org.apache.cassandra.sidecar.config.yaml.CassandraInputValidationConfigurationImpl.DEFAULT_FORBIDDEN_KEYSPACES;
 
@@ -66,19 +69,23 @@ public class InnerDcTokenAdjacentPeerProvider implements 
SidecarPeerProvider
     private final CassandraClientTokenRingProvider 
cassandraClientTokenRingProvider;
     private final ServiceConfiguration serviceConfiguration;
     private final DnsResolver dnsResolver;
+    private final DriverUtils driverUtils;
 
     @Inject
     public InnerDcTokenAdjacentPeerProvider(InstanceMetadataFetcher 
instanceFetcher,
                                             CassandraClientTokenRingProvider 
cassandraClientTokenRingProvider,
                                             ServiceConfiguration 
serviceConfiguration,
-                                            DnsResolver dnsResolver)
+                                            DnsResolver dnsResolver,
+                                            DriverUtils driverUtils)
     {
         this.instanceFetcher = instanceFetcher;
         this.cassandraClientTokenRingProvider = 
cassandraClientTokenRingProvider;
         this.serviceConfiguration = serviceConfiguration;
         this.dnsResolver = dnsResolver;
+        this.driverUtils = driverUtils;
     }
 
+    @Override
     public Set<SidecarInstance> get()
     {
         Metadata metadata;
@@ -129,24 +136,26 @@ public class InnerDcTokenAdjacentPeerProvider implements 
SidecarPeerProvider
                                                                  
.collect(Collectors.toList());
 
         BigInteger localMinToken = minToken(localHosts);
-        return adjacentHosts(localHosts::contains, localMinToken, 
sortedLocalDcHosts, quorum)
+        return adjacentHosts(driverUtils, localHosts::contains, localMinToken, 
sortedLocalDcHosts, quorum)
                .stream()
-               .map(host -> Pair.of(host, host.getAddress().getHostAddress()))
-               .map(pair -> {
+               .map(host -> 
driverUtils.getSocketAddress(host).getAddress().getHostAddress())
+               .map(sidecarIpAddress -> {
+                   String sidecarHostname = sidecarIpAddress;
                    try
                    {
-                       return Pair.of(pair.getKey(), 
dnsResolver.reverseResolve(pair.getValue()));
+                       sidecarHostname = 
dnsResolver.reverseResolve(sidecarIpAddress);
                    }
-                   catch (UnknownHostException e)
+                   catch (UnknownHostException unknownHostException)
                    {
-                       return pair;
+                       LOGGER.warn("Unable to reverse resolve hostname for 
{}", sidecarHostname, unknownHostException);
                    }
+                   return new SidecarInstanceImpl(sidecarHostname, 
sidecarServicePort(sidecarHostname));
                })
-               .map(pair -> new SidecarInstanceImpl(pair.getValue(), 
sidecarServicePort(pair.getKey())))
                .collect(Collectors.toSet());
     }
 
-    protected int sidecarServicePort(Host host)
+    @VisibleForTesting
+    protected int sidecarServicePort(String sidecarHostname)
     {
         return serviceConfiguration.port();
     }
@@ -200,7 +209,8 @@ public class InnerDcTokenAdjacentPeerProvider implements 
SidecarPeerProvider
      * @param quorum             minimum availability required to meet maximum 
replication factor in DC
      * @return set of hosts that are adjacent to current Sidecar
      */
-    protected static Set<Host> adjacentHosts(Predicate<Host> isLocal,
+    protected static Set<Host> adjacentHosts(DriverUtils driverUtils,
+                                             Predicate<Host> isLocal,
                                              BigInteger localMinToken,
                                              List<Pair<Host, BigInteger>> 
sortedLocalDcHosts,
                                              int quorum)
@@ -243,12 +253,13 @@ public class InnerDcTokenAdjacentPeerProvider implements 
SidecarPeerProvider
         {
             if (isLocal.test(host))
             {
+                InetAddress address = 
driverUtils.getSocketAddress(host).getAddress();
                 LOGGER.warn("Local instance selected as adjacent host 
localMinToken={} hostId={} address={} hostname={} canonicalHostname={}",
                             localMinToken,
                             host.getHostId(),
-                            host.getAddress().getHostAddress(),
-                            host.getAddress().getHostName(),
-                            host.getAddress().getCanonicalHostName());
+                            address.getHostAddress(),
+                            address.getHostName(),
+                            address.getCanonicalHostName());
                 throw new IllegalArgumentException(String.format("Local 
instance selected as adjacent host: %s", host.getHostId()));
             }
         }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
index 4ad8b431..c5541fe7 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java
@@ -36,6 +36,7 @@ import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
 import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodec;
 import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
@@ -61,20 +62,22 @@ public class SidecarPeerHealthMonitorTask implements 
PeriodicTask
     private final SidecarPeerHealthConfiguration config;
     private final SidecarPeerProvider sidecarPeerProvider;
     private final SidecarPeerHealthProvider healthProvider;
-    private final SidecarInstanceCodec sidecarInstanceCodec;
 
     private final Map<SidecarInstance, SidecarPeerHealthProvider.Health> 
status = new ConcurrentHashMap<>();
 
     @Inject
     public SidecarPeerHealthMonitorTask(SidecarConfiguration 
sidecarConfiguration,
                                         SidecarPeerProvider 
sidecarPeerProvider,
-                                        SidecarPeerHealthProvider 
healthProvider,
-                                        SidecarInstanceCodec 
sidecarInstanceCodec)
+                                        SidecarPeerHealthProvider 
healthProvider)
     {
         this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
         this.sidecarPeerProvider = sidecarPeerProvider;
         this.healthProvider = healthProvider;
-        this.sidecarInstanceCodec = sidecarInstanceCodec;
+    }
+
+    public Map<SidecarInstance, SidecarPeerHealthProvider.Health> status()
+    {
+        return status;
     }
 
     @Override
@@ -82,7 +85,7 @@ public class SidecarPeerHealthMonitorTask implements 
PeriodicTask
     {
         this.eventBus = vertx.eventBus();
         // TODO: Find a better place to register this codec
-        eventBus.registerDefaultCodec(SidecarInstance.class, 
sidecarInstanceCodec);
+        eventBus.registerDefaultCodec(SidecarInstanceImpl.class, new 
SidecarInstanceCodec<>());
         EventBusUtils.onceLocalConsumer(eventBus, 
ON_CASSANDRA_CQL_READY.address(), ignored -> executor.schedule(this));
     }
 
@@ -127,17 +130,11 @@ public class SidecarPeerHealthMonitorTask implements 
PeriodicTask
         sidecarPeers.stream()
                     .map(instance ->
                          healthProvider.health(instance)
-                                       .andThen(ar -> {
-                                           if (ar.succeeded())
-                                           {
-                                               updateHealth(instance, 
ar.result());
-                                           }
-                                           else
-                                           {
-                                               LOGGER.error("Failed to run 
health check, marking instance as DOWN host={} port={}",
-                                                            
instance.hostname(), instance.port(), ar.cause());
-                                               markDown(instance);
-                                           }
+                                       .onSuccess(healthCheckResult -> 
updateHealth(instance, healthCheckResult))
+                                       .onFailure(throwable -> {
+                                           LOGGER.error("Failed to run health 
check, marking instance as DOWN host={} port={}",
+                                                        instance.hostname(), 
instance.port(), throwable);
+                                           markDown(instance);
                                        }))
                     .collect(Collectors.toList());
 
@@ -190,9 +187,4 @@ public class SidecarPeerHealthMonitorTask implements 
PeriodicTask
     {
         return status.put(instance, newStatus) != newStatus;
     }
-
-    public Map<SidecarInstance, SidecarPeerHealthProvider.Health> getStatus()
-    {
-        return status;
-    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
index 8b7d5b63..5658e020 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthProvider.java
@@ -28,14 +28,19 @@ import 
org.apache.cassandra.sidecar.common.client.SidecarInstance;
 public interface SidecarPeerHealthProvider
 {
     /**
-     * Possible Health states:
-     *
-     * - UP: Peer sidecar is alive
-     * - DOWN: Peer sidecar is not reachable
+     * Possible Health States
      */
     enum Health
     {
-        UP, DOWN
+        /**
+         * Peer Sidecar is alive
+         */
+        UP,
+
+        /**
+         * Peer Sidecar is not unreachable
+         */
+        DOWN
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 3cb0393a..450151cb 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -24,11 +24,9 @@ import com.google.inject.Singleton;
 import com.google.inject.multibindings.ProvidesIntoMap;
 import org.apache.cassandra.sidecar.cdc.CdcLogCache;
 import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
-import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
-import 
org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
 import 
org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
 import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
@@ -48,7 +46,6 @@ import 
org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
 import org.apache.cassandra.sidecar.routes.RouteBuilder;
 import org.apache.cassandra.sidecar.routes.VertxRoute;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
 
 /**
@@ -129,11 +126,8 @@ public class CdcModule extends AbstractModule
 
     @Provides
     @Singleton
-    public SidecarPeerProvider sidecarPeerProvider(InstanceMetadataFetcher 
metadataFetcher,
-                                                   
CassandraClientTokenRingProvider cassandraClientTokenRingProvider,
-                                                   SidecarConfiguration 
configuration,
-                                                   DnsResolver dnsResolver)
+    public SidecarPeerProvider 
sidecarPeerProvider(InnerDcTokenAdjacentPeerProvider 
innerDcTokenAdjacentPeerProvider)
     {
-        return new InnerDcTokenAdjacentPeerProvider(metadataFetcher, 
cassandraClientTokenRingProvider, configuration.serviceConfiguration(), 
dnsResolver);
+        return innerDcTokenAdjacentPeerProvider;
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
index c4ab5091..8a0bc437 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java
@@ -46,7 +46,6 @@ import io.vertx.core.json.JsonObject;
 import io.vertx.core.net.SSLOptions;
 import io.vertx.core.net.TrafficShapingOptions;
 import io.vertx.ext.web.Router;
-import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
@@ -168,11 +167,7 @@ public class Server
                          .runBlocking(() -> {
                              try
                              {
-                                 CassandraAdapterDelegate delegate = 
instance.delegate();
-                                 if (delegate != null)
-                                 {
-                                     delegate.close();
-                                 }
+                                 instance.delegate().close();
                              }
                              catch (Exception e)
                              {
@@ -250,6 +245,18 @@ public class Server
         throw new IllegalStateException("No deployed server verticles. Maybe 
server failed to deploy due to port conflict");
     }
 
+    /**
+     * @return the deployment ID for this server
+     * @throws IllegalStateException if the server has not been deployed
+     */
+    @VisibleForTesting
+    public String deploymentId()
+    {
+        if (!deployedServerVerticles.isEmpty())
+            return deployedServerVerticles.get(0).deploymentID();
+        throw new IllegalStateException("No deployed server verticles");
+    }
+
     protected Future<String> notifyServerStart(String deploymentId)
     {
         LOGGER.info("Successfully started Cassandra Sidecar");
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
new file mode 100644
index 00000000..36b792a4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/codecs/SidecarInstanceCodecTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.codecs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for the {@link SidecarInstanceCodec}
+ */
+class SidecarInstanceCodecTest
+{
+    static Vertx vertx;
+    private static EventBus eventBus;
+    private static SidecarInstanceCodec<SidecarInstanceImpl> codec;
+
+    @BeforeAll
+    static void setup()
+    {
+        vertx = Vertx.vertx();
+        eventBus = vertx.eventBus();
+        codec = new SidecarInstanceCodec<>();
+        eventBus.registerDefaultCodec(SidecarInstanceImpl.class, codec);
+    }
+
+    @AfterAll
+    static void tearDown()
+    {
+        getBlocking(TestResourceReaper.create().with(vertx).close(), 30, 
TimeUnit.SECONDS, "Closing vertx");
+    }
+
+    @Test
+    void testCodec() throws InterruptedException
+    {
+        CountDownLatch latch = new CountDownLatch(1);
+        SidecarInstanceImpl sidecarInstance = new 
SidecarInstanceImpl("localhost", 1234);
+        eventBus.consumer("test address", message -> {
+            assertThat(message.body()).isEqualTo(sidecarInstance);
+            latch.countDown();
+        });
+        eventBus.send("test address", sidecarInstance);
+        assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+    }
+
+    @Test
+    void testEncodingDecoding()
+    {
+        Buffer buffer = Buffer.buffer(1024);
+        SidecarInstanceImpl sidecarInstance = new 
SidecarInstanceImpl("127.0.0.1", 9876);
+        codec.encodeToWire(buffer, sidecarInstance);
+        SidecarInstance decoded = codec.decodeFromWire(0, buffer);
+        assertThat(decoded).isEqualTo(sidecarInstance);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
index 273c97cc..61e4c9ae 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProviderTests.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.coordination;
 
 import java.math.BigInteger;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,10 +33,10 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.junit.jupiter.api.Test;
 
 import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.EndPoint;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.Metadata;
@@ -45,6 +46,7 @@ import 
org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.client.SidecarInstance;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
@@ -61,6 +63,7 @@ import static org.mockito.Mockito.when;
  */
 public class InnerDcTokenAdjacentPeerProviderTests
 {
+    private static final DriverUtils DRIVER_UTILS = new DriverUtils();
     private static final List<BigInteger> TOKENS = Stream.of(
                                                          
"-9223372036854775808",
                                                          
"-8070450532247928832",
@@ -111,7 +114,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
         for (int i = 0; i < numHosts - 1; i++)
         {
             int hostId = i + 1;
-            final List<InstanceMetadata> localInstances = List.of(
+            List<InstanceMetadata> localInstances = List.of(
             mockInstanceMetadata(1, "dc1-host" + hostId + "-i1", metadata),
             mockInstanceMetadata(2, "dc1-host" + hostId + "-i2", metadata),
             mockInstanceMetadata(3, "dc1-host" + hostId + "-i3", metadata),
@@ -120,9 +123,10 @@ public class InnerDcTokenAdjacentPeerProviderTests
             
when(metadataFetcher.allLocalInstances()).thenReturn(localInstances);
 
             CassandraClientTokenRingProvider cachedLocalTokenRanges = 
mock(CassandraClientTokenRingProvider.class);
-            Set<Host> localHosts = allHosts.stream().filter(host -> 
host.getAddress()
-                                                                        
.getHostName()
-                                                                        
.startsWith("dc1-host" + hostId + "-"))
+            Set<Host> localHosts = allHosts.stream().filter(host -> 
DRIVER_UTILS.getSocketAddress(host)
+                                                                               
 .getAddress()
+                                                                               
 .getHostName()
+                                                                               
 .startsWith("dc1-host" + hostId + "-"))
                                            .collect(Collectors.toSet());
             
when(cachedLocalTokenRanges.localInstances()).thenReturn(localHosts);
             Map<Integer, Set<TokenRange>> localRanges = Map.of(
@@ -134,24 +138,25 @@ public class InnerDcTokenAdjacentPeerProviderTests
             
when(cachedLocalTokenRanges.localTokenRanges(anyString())).thenReturn(localRanges);
             when(cachedLocalTokenRanges.allInstances()).thenReturn(allHosts);
 
-            final InnerDcTokenAdjacentPeerProvider provider = new 
InnerDcTokenAdjacentPeerProvider(metadataFetcher,
-                                                                               
                    cachedLocalTokenRanges,
-                                                                               
                    serviceConfiguration,
-                                                                               
                    new DnsResolver()
-                                                                               
                    {
-                                                                               
                        @Override
-                                                                               
                        public String resolve(String hostname)
-                                                                               
                        {
-                                                                               
                            return hostname;
-                                                                               
                        }
+            InnerDcTokenAdjacentPeerProvider provider = new 
InnerDcTokenAdjacentPeerProvider(metadataFetcher,
+                                                                               
              cachedLocalTokenRanges,
+                                                                               
              serviceConfiguration,
+                                                                               
              new DnsResolver()
+                                                                               
              {
+                                                                               
                  @Override
+                                                                               
                  public String resolve(String hostname)
+                                                                               
                  {
+                                                                               
                      return hostname;
+                                                                               
                  }
 
-                                                                               
                        @Override
-                                                                               
                        public String reverseResolve(String address)
-                                                                               
                        {
-                                                                               
                            return address;
-                                                                               
                        }
-                                                                               
                    });
-            final Set<SidecarInstance> buddies = provider.get();
+                                                                               
                  @Override
+                                                                               
                  public String reverseResolve(String address)
+                                                                               
                  {
+                                                                               
                      return address;
+                                                                               
                  }
+                                                                               
              },
+                                                                               
              DRIVER_UTILS);
+            Set<SidecarInstance> buddies = provider.get();
             assertEquals(1, buddies.size());
             
assertTrue(buddies.stream().findFirst().orElseThrow().hostname().startsWith("dc1-host"
 + (hostId + 1)));
         }
@@ -160,23 +165,26 @@ public class InnerDcTokenAdjacentPeerProviderTests
     @Test
     public void testAdjacentHosts()
     {
-        final List<Pair<Host, BigInteger>> allHosts = IntStream.range(0, 
TOKENS.size())
-                                                               .mapToObj(idx 
-> {
-                                                                   BigInteger 
token = tokenAt(idx);
-                                                                   return 
Pair.of(mockHost(INSTANCES.get(idx), token), token);
-                                                               })
-                                                               
.collect(Collectors.toList());
+        List<Pair<Host, BigInteger>> allHosts = IntStream.range(0, 
TOKENS.size())
+                                                         .mapToObj(idx -> {
+                                                             BigInteger token 
= tokenAt(idx);
+                                                             return 
Pair.of(mockHost(INSTANCES.get(idx), token), token);
+                                                         })
+                                                         
.collect(Collectors.toList());
 
-        final Set<String> adjacentHosts = new HashSet<>(TOKENS.size());
+        Set<String> adjacentHosts = new HashSet<>(TOKENS.size());
         for (int i = 0; i < TOKENS.size(); i++)
         {
-            final String localhost = INSTANCES.get(i);
-            final BigInteger token = tokenAt(i);
+            String localhost = INSTANCES.get(i);
+            BigInteger token = tokenAt(i);
             test(localhost, token, allHosts, 1, tokenAt(i + 1));
-            final Set<Host> adjacent = 
InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) -> isLocal(localhost,
-                                                                               
                         host), token, allHosts, 1);
+            Set<Host> adjacent = 
InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+                                                                               
 (host) -> isLocal(localhost, host),
+                                                                               
 token,
+                                                                               
 allHosts,
+                                                                               
 1);
             assertEquals(1, adjacent.size());
-            final String adjacentStr = 
adjacent.stream().findFirst().map(Host::toString).orElseThrow();
+            String adjacentStr = 
adjacent.stream().findFirst().map(Host::toString).orElseThrow();
             assertFalse(adjacentHosts.contains(adjacentStr));
             adjacentHosts.add(adjacentStr);
         }
@@ -191,7 +199,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
     @Test
     public void testMinToken()
     {
-        final List<TokenRange> tokens = TOKENS.stream().map(start -> new 
TokenRange(start, start.add(BigInteger.ONE))).collect(Collectors.toList());
+        List<TokenRange> tokens = TOKENS.stream().map(start -> new 
TokenRange(start, start.add(BigInteger.ONE))).collect(Collectors.toList());
         Collections.shuffle(tokens);
         assertEquals(tokenAt(0), 
InnerDcTokenAdjacentPeerProvider.minToken(tokens.stream()));
     }
@@ -206,15 +214,16 @@ public class InnerDcTokenAdjacentPeerProviderTests
                                      "local1-i2", "local2-i2", "local3-i2",
                                      "local1-i3", "local2-i3", "local3-i3",
                                      "local1-i4", "local2-i4", "local3-i4");
-        final BigInteger token = new 
BigInteger(tokens.stream().findFirst().orElseThrow());
+        BigInteger token = new 
BigInteger(tokens.stream().findFirst().orElseThrow());
         List<Pair<Host, BigInteger>> sortedLocalDcHosts = IntStream.range(0, 
tokens.size())
                                                                    .mapToObj(i 
-> {
                                                                        
BigInteger t = new BigInteger(tokens.get(i));
                                                                        return 
Pair.of(mockHost(hosts.get(i), t), t);
                                                                    })
                                                                    
.collect(Collectors.toList());
-        final int quorum = 5;
-        InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) -> 
host.getAddress().getHostName().startsWith("local1-"),
+        int quorum = 5;
+        InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+                                                       (host) -> 
DRIVER_UTILS.getSocketAddress(host).getAddress().getHostName().startsWith("local1-"),
                                                        token,
                                                        sortedLocalDcHosts,
                                                        quorum)
@@ -229,12 +238,13 @@ public class InnerDcTokenAdjacentPeerProviderTests
 
     private static void test(String localhost, BigInteger token, 
List<Pair<Host, BigInteger>> allHosts, int quorum, BigInteger... expected)
     {
-        final Set<String> result = 
InnerDcTokenAdjacentPeerProvider.adjacentHosts((host) -> isLocal(localhost, 
host),
-                                                                               
   token,
-                                                                               
   allHosts,
-                                                                               
   quorum)
-                                                                    
.stream().map(Host::toString)
-                                                                    
.collect(Collectors.toSet());
+        Set<String> result = 
InnerDcTokenAdjacentPeerProvider.adjacentHosts(DRIVER_UTILS,
+                                                                            
(host) -> isLocal(localhost, host),
+                                                                            
token,
+                                                                            
allHosts,
+                                                                            
quorum)
+                                                             
.stream().map(Host::toString)
+                                                             
.collect(Collectors.toSet());
         assertFalse(result.contains(token.toString()));
         for (BigInteger bi : expected)
         {
@@ -245,7 +255,7 @@ public class InnerDcTokenAdjacentPeerProviderTests
 
     private static boolean isLocal(String localhost, Host host)
     {
-        return isLocal(localhost, host.getAddress().getHostName());
+        return isLocal(localhost, 
DRIVER_UTILS.getSocketAddress(host).getAddress().getHostName());
     }
 
     private static boolean isLocal(String localhost, String addr)
@@ -283,12 +293,16 @@ public class InnerDcTokenAdjacentPeerProviderTests
     {
         Host host = mock(Host.class);
         InetAddress addr = mock(InetAddress.class);
+        EndPoint mockEndpoint = mock(EndPoint.class);
+        InetSocketAddress mockInetSocketAddress = 
mock(InetSocketAddress.class);
         when(addr.getHostName()).thenReturn(hostname);
         when(addr.getHostAddress()).thenReturn(hostname);
-        when(host.getAddress()).thenReturn(addr);
+        when(host.getEndPoint()).thenReturn(mockEndpoint);
+        when(mockEndpoint.resolve()).thenReturn(mockInetSocketAddress);
+        when(mockInetSocketAddress.getAddress()).thenReturn(addr);
         when(host.toString()).thenReturn(token.toString());
         when(host.getDatacenter()).thenReturn(dc);
-        final Token t = mock(Token.class);
+        Token t = mock(Token.class);
         when(t.getType()).thenReturn(DataType.bigint());
         when(t.getValue()).thenReturn(token.longValue());
         when(host.getTokens()).thenReturn(Set.of(t));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to