This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 529171b  CASSANDRASC-87: Add JMX health checks during the periodic 
health checks
529171b is described below

commit 529171b1f6dee277a9087eb9da7242ce17873643
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Mon Dec 11 13:49:36 2023 -0800

    CASSANDRASC-87: Add JMX health checks during the periodic health checks
    
    In this commit, we add health checks based on the JMX connectivity to the 
managed
    Cassandra instances. Additionally, we construct the NodeSettings object 
based on
    JMX. This allows the Sidecar process to be able to determine an adapter for 
the
    node even if the node is in joining state, or its binary port has been 
disabled.
    
    Co-authored-by: Doug Rohrer <d...@therohrers.org>
    Co-authored-by: Francisco Guerrero <fran...@apache.org>
    
    Patch by Doug Rohrer, Francisco Guerrero; Reviewed by Yifan Cai for 
CASSANDRASC-87
---
 CHANGES.txt                                        |   1 +
 .../sidecar/adapters/base/CassandraAdapter.java    |  33 +--
 .../cassandra/sidecar/client/RequestContext.java   |  43 ++-
 .../cassandra/sidecar/client/SidecarClient.java    |  40 ++-
 ...Request.java => CassandraJmxHealthRequest.java} |  10 +-
 ...uest.java => CassandraNativeHealthRequest.java} |  24 +-
 .../sidecar/client/SidecarClientTest.java          |  72 ++++-
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   9 +
 .../apache/cassandra/sidecar/common/JmxClient.java |  33 ++-
 .../cassandra/sidecar/common/NodeSettings.java     |   7 -
 .../sidecar/cluster/CassandraAdapterDelegate.java  | 316 +++++++++++++++++----
 .../cluster/SidecarLoadBalancingPolicy.java        |   8 +-
 .../sidecar/cluster/instance/InstanceMetadata.java |   3 +-
 .../cluster/instance/InstanceMetadataImpl.java     |   4 +-
 .../sidecar/routes/CassandraHealthHandler.java     |   7 +-
 .../cassandra/sidecar/routes/RingHandler.java      |   6 +
 .../routes/TokenRangeReplicaMapHandler.java        |   5 +
 .../routes/cassandra/NodeSettingsHandler.java      |   5 +
 .../sstableuploads/SSTableImportHandler.java       |   5 +
 .../cassandra/sidecar/server/MainModule.java       |   8 +
 .../sidecar/server/SidecarServerEvents.java        |  12 +
 .../sidecar/utils/InstanceMetadataFetcher.java     |   1 +
 .../cassandra/sidecar/utils/SSTableImporter.java   |   6 +
 .../sidecar/utils/SimpleCassandraVersion.java      |   2 +-
 .../distributed/impl/AbstractClusterUtils.java     |  26 +-
 .../sidecar/common/CQLSessionProviderTest.java     |   4 +-
 .../sidecar/common/DelegateIntegrationTest.java    | 313 ++++++++++++++++++++
 .../cassandra/sidecar/common/DelegateTest.java     | 125 --------
 .../cassandra/sidecar/common/JmxClientTest.java    |   4 +-
 .../routes/GossipInfoHandlerIntegrationTest.java   |   2 +-
 .../sidecar/routes/RingHandlerIntegrationTest.java |   2 +-
 .../routes/tokenrange/BasicGossipDisabledTest.java |   2 +-
 .../testing/CassandraSidecarTestContext.java       |  33 ++-
 .../sidecar/testing/IntegrationTestBase.java       |   6 +-
 .../testing/AbstractCassandraTestContext.java      |   2 +-
 .../testing/CassandraIntegrationTest.java          |   2 +-
 .../cassandra/testing/CassandraTestContext.java    |   5 -
 .../org/apache/cassandra/sidecar/TestModule.java   |   2 +-
 .../cassandra/sidecar/server/ServerSSLTest.java    |   7 +-
 .../cassandra/sidecar/snapshots/SnapshotUtils.java |   6 +-
 40 files changed, 892 insertions(+), 309 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f94b7b9..25e76dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add JMX health checks during the periodic health checks (CASSANDRASC-87)
  * Sidecar should be able to load metadata even if the local instance is 
unavailable (CASSANDRASC-79)
  * Expose additional SSL configuration options for the Sidecar Service 
(CASSANDRASC-82)
  * Expose additional node settings (CASSANDRASC-84)
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index 0e12a0e..f02ea2f 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -28,7 +28,6 @@ import com.datastax.driver.core.DriverUtils;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
@@ -41,13 +40,6 @@ import org.apache.cassandra.sidecar.common.TableOperations;
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME;
-
 /**
  * A {@link ICassandraAdapter} implementation for Cassandra 4.0 and later
  */
@@ -107,30 +99,7 @@ public class CassandraAdapter implements ICassandraAdapter
     @Nullable
     public NodeSettings nodeSettings()
     {
-        ResultSet rs = executeLocal("SELECT "
-                                              + RELEASE_VERSION_COLUMN_NAME + 
", "
-                                              + PARTITIONER_COLUMN_NAME + ", "
-                                              + DATA_CENTER_COLUMN_NAME + ", "
-                                              + RPC_ADDRESS_COLUMN_NAME + ", "
-                                              + RPC_PORT_COLUMN_NAME + ", "
-                                              + TOKENS_COLUMN_NAME
-                                              + " FROM system.local");
-        if (rs == null)
-        {
-            return null;
-        }
-
-        Row oneResult = rs.one();
-
-        return NodeSettings.builder()
-                           
.releaseVersion(oneResult.getString(RELEASE_VERSION_COLUMN_NAME))
-                           
.partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME))
-                           .sidecarVersion(sidecarVersion)
-                           
.datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME))
-                           .tokens(oneResult.getSet(TOKENS_COLUMN_NAME, 
String.class))
-                           
.rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME))
-                           .rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME))
-                           .build();
+        throw new UnsupportedOperationException("Node settings are not 
provided by this adapter");
     }
 
     @Override
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index f595fb1..b7c7e82 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -22,7 +22,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.cassandra.sidecar.client.request.CassandraHealthRequest;
+import org.apache.cassandra.sidecar.client.request.CassandraJmxHealthRequest;
+import 
org.apache.cassandra.sidecar.client.request.CassandraNativeHealthRequest;
 import 
org.apache.cassandra.sidecar.client.request.CleanSSTableUploadSessionRequest;
 import org.apache.cassandra.sidecar.client.request.ClearSnapshotRequest;
 import org.apache.cassandra.sidecar.client.request.CreateSnapshotRequest;
@@ -52,7 +53,16 @@ import org.apache.cassandra.sidecar.common.utils.HttpRange;
 public class RequestContext
 {
     protected static final SidecarHealthRequest SIDECAR_HEALTH_REQUEST = new 
SidecarHealthRequest();
-    protected static final CassandraHealthRequest CASSANDRA_HEALTH_REQUEST = 
new CassandraHealthRequest();
+
+    /**
+     * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_REQUEST}
+     */
+    @Deprecated
+    protected static final CassandraNativeHealthRequest 
CASSANDRA_HEALTH_REQUEST =
+    new CassandraNativeHealthRequest(true /* useDeprecatedHealthEndpoint */);
+    protected static final CassandraNativeHealthRequest 
CASSANDRA_NATIVE_HEALTH_REQUEST =
+    new CassandraNativeHealthRequest();
+    protected static final CassandraJmxHealthRequest 
CASSANDRA_JMX_HEALTH_REQUEST = new CassandraJmxHealthRequest();
     protected static final SchemaRequest FULL_SCHEMA_REQUEST = new 
SchemaRequest();
     protected static final TimeSkewRequest TIME_SKEW_REQUEST = new 
TimeSkewRequest();
     protected static final NodeSettingsRequest NODE_SETTINGS_REQUEST = new 
NodeSettingsRequest();
@@ -190,16 +200,41 @@ public class RequestContext
         }
 
         /**
-         * Sets the {@code request} to be a {@link CassandraHealthRequest}
-         * and returns a reference to this Builder enabling method chaining
+         * Sets the {@code request} to be a {@link 
CassandraNativeHealthRequest}
+         * with the {@code useDeprecatedHealthEndpoint} parameter set to 
{@code true}
+         * and returns a reference to this Builder enabling method chaining.
          *
          * @return a reference to this Builder
+         * @deprecated in favor of {@link #cassandraNativeHealthRequest()}
          */
+        @Deprecated
         public Builder cassandraHealthRequest()
         {
             return request(CASSANDRA_HEALTH_REQUEST);
         }
 
+        /**
+         * Sets the {@code request} to be a {@link 
CassandraNativeHealthRequest}
+         * and returns a reference to this Builder enabling method chaining
+         *
+         * @return a reference to this Builder
+         */
+        public Builder cassandraNativeHealthRequest()
+        {
+            return request(CASSANDRA_NATIVE_HEALTH_REQUEST);
+        }
+
+        /**
+         * Sets the {@code request} to be a {@link CassandraJmxHealthRequest}
+         * and returns a reference to this Builder enabling method chaining
+         *
+         * @return a reference to this Builder
+         */
+        public Builder cassandraJmxHealthRequest()
+        {
+            return request(CASSANDRA_JMX_HEALTH_REQUEST);
+        }
+
         /**
          * Sets the {@code request} to be a {@link SchemaRequest} for the full 
schema and returns a reference to
          * this Builder enabling method chaining.
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index d9d77f8..90fd47f 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -79,22 +79,50 @@ public class SidecarClient implements AutoCloseable
     public CompletableFuture<HealthResponse> sidecarHealth()
     {
         return executor.executeRequestAsync(requestBuilder()
-                .sidecarHealthRequest()
-                .retryPolicy(new OncePerInstanceRetryPolicy())
-                .build());
+                                            .sidecarHealthRequest()
+                                            .retryPolicy(new 
OncePerInstanceRetryPolicy())
+                                            .build());
     }
 
     /**
      * Executes the Cassandra health request using the configured selection 
policy and with no retries
      *
      * @return a completable future of the Cassandra health response
+     * @deprecated use {@link #cassandraNativeHealth()} instead
      */
+    @Deprecated
     public CompletableFuture<HealthResponse> cassandraHealth()
     {
         return executor.executeRequestAsync(requestBuilder()
-                .cassandraHealthRequest()
-                .retryPolicy(new OncePerInstanceRetryPolicy())
-                .build());
+                                            .cassandraHealthRequest()
+                                            .retryPolicy(new 
OncePerInstanceRetryPolicy())
+                                            .build());
+    }
+
+    /**
+     * Executes the Cassandra health request using the configured selection 
policy and with no retries
+     *
+     * @return a completable future of the Cassandra health response
+     */
+    public CompletableFuture<HealthResponse> cassandraNativeHealth()
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            .cassandraNativeHealthRequest()
+                                            .retryPolicy(new 
OncePerInstanceRetryPolicy())
+                                            .build());
+    }
+
+    /**
+     * Executes the Cassandra health request using the configured selection 
policy and with no retries
+     *
+     * @return a completable future of the Cassandra health response
+     */
+    public CompletableFuture<HealthResponse> cassandraJmxHealth()
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            .cassandraJmxHealthRequest()
+                                            .retryPolicy(new 
OncePerInstanceRetryPolicy())
+                                            .build());
     }
 
     /**
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
similarity index 77%
copy from 
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
copy to 
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
index 2061c89..1d5644a 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java
@@ -23,16 +23,16 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 
 /**
- * Represents a request to retrieve the Cassandra health
+ * Represents a request to retrieve the connectivity health checks performed 
against the Cassandra JMX protocol
  */
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class CassandraJmxHealthRequest extends DecodableRequest<HealthResponse>
 {
     /**
-     * Constructs a request to retrieve the Cassandra health
+     * Constructs a request to retrieve the Cassandra JMX health
      */
-    public CassandraHealthRequest()
+    public CassandraJmxHealthRequest()
     {
-        super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
+        super(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
     }
 
     /**
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
similarity index 57%
copy from 
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
copy to 
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
index 2061c89..35960c9 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java
@@ -23,16 +23,30 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
 import org.apache.cassandra.sidecar.common.data.HealthResponse;
 
 /**
- * Represents a request to retrieve the Cassandra health
+ * Represents a request to retrieve the connectivity health checks performed 
against the Cassandra native protocol
  */
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class CassandraNativeHealthRequest extends 
DecodableRequest<HealthResponse>
 {
     /**
-     * Constructs a request to retrieve the Cassandra health
+     * Constructs a request to retrieve the Cassandra native health
      */
-    public CassandraHealthRequest()
+    public CassandraNativeHealthRequest()
     {
-        super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
+        this(false);
+    }
+
+    /**
+     * Constructs a request to retrieve the Cassandra native health
+     *
+     * @param useDeprecatedHealthEndpoint {@code true} if using the deprecated 
endpoint, {@code false} to use
+     *                                    the new endpoint
+     */
+    @SuppressWarnings("deprecation")
+    public CassandraNativeHealthRequest(boolean useDeprecatedHealthEndpoint)
+    {
+        super(useDeprecatedHealthEndpoint
+              ? ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE
+              : ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
     }
 
     /**
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 669f9a4..d47e460 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -144,8 +144,9 @@ abstract class SidecarClientTest
         validateResponseServed(ApiEndpointsV1.HEALTH_ROUTE);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
-    void testCassandraHealthOk() throws Exception
+    void testCassandraDeprecatedHealthOk() throws Exception
     {
         MockResponse response = new MockResponse()
                                 .setResponseCode(200)
@@ -161,8 +162,9 @@ abstract class SidecarClientTest
         validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
-    void testCassandraHealthNotOk() throws Exception
+    void testCassandraDeprecatedHealthNotOk() throws Exception
     {
         MockResponse response = new MockResponse()
                                 .setResponseCode(503)
@@ -177,6 +179,72 @@ abstract class SidecarClientTest
         validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
     }
 
+    @Test
+    void testCassandraNativeHealthOk() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(200)
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"OK\"}");
+        enqueue(response);
+
+        HealthResponse result = client.cassandraNativeHealth().get(30, 
TimeUnit.SECONDS);
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualToIgnoringCase("OK");
+        assertThat(result.isOk()).isTrue();
+
+        validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
+    }
+
+    @Test
+    void testCassandraNativeHealthNotOk() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(503)
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"NOT_OK\"}");
+        enqueue(response);
+
+        assertThatThrownBy(() -> client.cassandraNativeHealth().get(30, 
TimeUnit.SECONDS))
+        .isInstanceOf(ExecutionException.class)
+        .hasCauseInstanceOf(RetriesExhaustedException.class);
+
+        validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE);
+    }
+
+    @Test
+    void testCassandraJmxHealthOk() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(200)
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"OK\"}");
+        enqueue(response);
+
+        HealthResponse result = client.cassandraJmxHealth().get(1, 
TimeUnit.SECONDS);
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualTo("OK");
+        assertThat(result.isOk()).isTrue();
+
+        validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
+    }
+
+    @Test
+    void testCassandraJmxHealthNotOk() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(503)
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"NOT_OK\"}");
+        enqueue(response);
+
+        assertThatThrownBy(() -> client.cassandraJmxHealth().get(1, 
TimeUnit.SECONDS))
+        .isInstanceOf(ExecutionException.class)
+        .hasCauseInstanceOf(RetriesExhaustedException.class);
+
+        validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE);
+    }
+
     @Test
     void testFullSchema() throws Exception
     {
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 5fe2d5b..550a8c4 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -28,6 +28,8 @@ public final class ApiEndpointsV1
 
     public static final String HEALTH = "/__health";
     public static final String CASSANDRA = "/cassandra";
+    public static final String NATIVE = "/native";
+    public static final String JMX = "/jmx";
     public static final String KEYSPACE_PATH_PARAM = ":keyspace";
     public static final String TABLE_PATH_PARAM = ":table";
     public static final String SNAPSHOT_PATH_PARAM = ":snapshot";
@@ -44,7 +46,14 @@ public final class ApiEndpointsV1
     public static final String PER_UPLOAD = "/uploads/" + UPLOAD_ID_PATH_PARAM;
 
     public static final String HEALTH_ROUTE = API_V1 + HEALTH;
+
+    /**
+     * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_ROUTE}
+     */
+    @Deprecated
     public static final String CASSANDRA_HEALTH_ROUTE = API_V1 + CASSANDRA + 
HEALTH;
+    public static final String CASSANDRA_NATIVE_HEALTH_ROUTE = API_V1 + 
CASSANDRA + NATIVE + HEALTH;
+    public static final String CASSANDRA_JMX_HEALTH_ROUTE = API_V1 + CASSANDRA 
+ JMX + HEALTH;
 
     @Deprecated  // NOTE: Uses singular forms of "keyspace" and "table"
     public static final String DEPRECATED_SNAPSHOTS_ROUTE = API_V1 + 
"/keyspace/" + KEYSPACE_PATH_PARAM +
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
index 2bb86d8..44ab18c 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
@@ -23,9 +23,12 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.rmi.server.RMIClientSocketFactory;
 import java.rmi.server.RMISocketFactory;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
@@ -66,7 +69,8 @@ public class JmxClient implements NotificationListener, 
Closeable
     private final BooleanSupplier enableSslSupplier;
     private final int connectionMaxRetries;
     private final long connectionRetryDelayMillis;
-
+    private final Set<NotificationListener> registeredNotificationListeners =
+    Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     /**
      * Creates a new JMX client with {@link Builder} options.
@@ -119,6 +123,27 @@ public class JmxClient implements NotificationListener, 
Closeable
         }
     }
 
+    /**
+     * Registers a {@link NotificationListener} to be notified whenever we 
encounter a JMX event. This method
+     * guarantees that a listener will be registered at most once.
+     *
+     * @param notificationListener the listener to be notified
+     */
+    public void registerListener(NotificationListener notificationListener)
+    {
+        registeredNotificationListeners.add(notificationListener);
+    }
+
+    /**
+     * Removes an already registered {@link NotificationListener} from the 
recipient list for JMX events.
+     *
+     * @param notificationListener the listener to be removed
+     */
+    public void unregisterListener(NotificationListener notificationListener)
+    {
+        registeredNotificationListeners.remove(notificationListener);
+    }
+
     private RMIClientSocketFactory rmiClientSocketFactory(boolean enableSsl)
     {
         return enableSsl
@@ -206,10 +231,16 @@ public class JmxClient implements NotificationListener, 
Closeable
                 {
                     this.connected = justConnected;
                 }
+                forwardNotification(notification, handback);
             }
         }
     }
 
+    private void forwardNotification(Notification notification, Object 
handback)
+    {
+        registeredNotificationListeners.forEach(listener -> 
listener.handleNotification(notification, handback));
+    }
+
     /**
      * @return true if JMX is connected, false otherwise
      */
diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java 
b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
index 61dae74..f6c5244 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java
@@ -33,13 +33,6 @@ public class NodeSettings
 {
     private static final String VERSION = "version";
 
-    public static final String RELEASE_VERSION_COLUMN_NAME = "release_version";
-    public static final String PARTITIONER_COLUMN_NAME = "partitioner";
-    public static final String DATA_CENTER_COLUMN_NAME = "data_center";
-    public static final String RPC_ADDRESS_COLUMN_NAME = "rpc_address";
-    public static final String RPC_PORT_COLUMN_NAME = "rpc_port";
-    public static final String TOKENS_COLUMN_NAME = "tokens";
-
     @JsonProperty("releaseVersion")
     private final String releaseVersion;
     @JsonProperty("partitioner")
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
 
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 4523ed4..15cea06 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -20,9 +20,15 @@ package org.apache.cassandra.sidecar.cluster;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.remote.JMXConnectionNotification;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,14 +58,12 @@ import 
org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME;
-import static 
org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static 
org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
 
 
 /**
@@ -82,13 +86,16 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
     private final CassandraVersionProvider versionProvider;
     private final CQLSessionProvider cqlSessionProvider;
     private final JmxClient jmxClient;
+    private final JmxNotificationListener notificationListener;
     private SimpleCassandraVersion currentVersion;
     private ICassandraAdapter adapter;
-    private volatile NodeSettings nodeSettings = null;
+    private volatile boolean isNativeUp = false;
+    private volatile NodeSettings nodeSettingsFromJmx = null;
     private final AtomicBoolean registered = new AtomicBoolean(false);
     private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
     private final InetSocketAddress localNativeTransportAddress;
     private volatile Host host;
+    private volatile boolean closed = false;
 
     /**
      * Constructs a new {@link CassandraAdapterDelegate} for the given {@code 
cassandraInstance}
@@ -118,6 +125,8 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         this.versionProvider = versionProvider;
         this.cqlSessionProvider = session;
         this.jmxClient = jmxClient;
+        notificationListener = new JmxNotificationListener();
+        this.jmxClient.registerListener(notificationListener);
     }
 
     private void maybeRegisterHostListener(@NotNull Session session)
@@ -146,17 +155,24 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
 
     /**
      * Should be called on initial connect as well as when a server comes back 
since it might be from an upgrade
-     * synchronized so we don't flood the DB with version requests
+     * synchronized, so we don't flood the DB with version requests
      *
      * <p>If the healthcheck determines we've changed versions, it should load 
the proper adapter</p>
      */
     public void healthCheck()
     {
+        if (closed)
+        {
+            LOGGER.debug("Skipping health check for cassandraInstanceId={}. 
Delegate is closed", cassandraInstanceId);
+            return;
+        }
+
         if (isHealthCheckActive.compareAndSet(false, true))
         {
             try
             {
-                healthCheckInternal();
+                jmxHealthCheck();
+                nativeProtocolHealthCheck();
             }
             finally
             {
@@ -170,14 +186,49 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         }
     }
 
-    private void healthCheckInternal()
+    /**
+     * Performs health checks by utilizing the JMX protocol. It uses a small 
subset of the exposed mBeans to
+     * collect information needed to populate the {@link NodeSettings} object.
+     */
+    protected void jmxHealthCheck()
+    {
+        try
+        {
+            NodeSettings newNodeSettings = newNodeSettingsFromJmx();
+            if (!newNodeSettings.equals(nodeSettingsFromJmx))
+            {
+                // Update the nodeSettings cache
+                SimpleCassandraVersion previousVersion = currentVersion;
+                currentVersion = 
SimpleCassandraVersion.create(newNodeSettings.releaseVersion());
+                adapter = 
versionProvider.cassandra(newNodeSettings.releaseVersion())
+                                         .create(cqlSessionProvider, 
jmxClient, localNativeTransportAddress);
+                nodeSettingsFromJmx = newNodeSettings;
+                LOGGER.info("Cassandra version change detected (from={} to={}) 
for cassandraInstanceId={}. " +
+                            "New adapter loaded={}", previousVersion, 
currentVersion, cassandraInstanceId, adapter);
+
+                notifyJmxConnection();
+            }
+            LOGGER.debug("Cassandra version {}", 
newNodeSettings.releaseVersion());
+        }
+        catch (RuntimeException e)
+        {
+            LOGGER.error("Unable to connect JMX to Cassandra instance {}", 
cassandraInstanceId, e);
+            // The cassandra node JMX connectivity is unavailable.
+            markJmxDownAndMaybeNotifyDisconnection();
+        }
+    }
+
+    /**
+     * Performs health checks by utilizing the native protocol
+     */
+    protected void nativeProtocolHealthCheck()
     {
         Session activeSession = cqlSessionProvider.get();
         if (activeSession == null)
         {
             LOGGER.info("No local CQL session is available for 
cassandraInstanceId={}. " +
                         "Cassandra instance is down presumably.", 
cassandraInstanceId);
-            markAsDownAndMaybeNotify();
+            markNativeDownAndMaybeNotifyDisconnection();
             return;
         }
 
@@ -186,15 +237,7 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         try
         {
             // NOTE: We cannot use `executeLocal` here as there may be no 
adapter yet.
-            SimpleStatement healthCheckStatement =
-            new SimpleStatement("SELECT "
-                                + RELEASE_VERSION_COLUMN_NAME + ", "
-                                + PARTITIONER_COLUMN_NAME + ", "
-                                + DATA_CENTER_COLUMN_NAME + ", "
-                                + RPC_ADDRESS_COLUMN_NAME + ", "
-                                + RPC_PORT_COLUMN_NAME + ", "
-                                + TOKENS_COLUMN_NAME
-                                + " FROM system.local");
+            SimpleStatement healthCheckStatement = new SimpleStatement("SELECT 
release_version FROM system.local");
             Metadata metadata = activeSession.getCluster().getMetadata();
             host = getHost(metadata);
             if (host == null)
@@ -205,46 +248,82 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
             }
             healthCheckStatement.setHost(host);
             healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE);
-            Row oneResult = activeSession.execute(healthCheckStatement).one();
-
-            // Note that within the scope of this method, we should keep on 
using the local releaseVersion
-            String releaseVersion = 
oneResult.getString(RELEASE_VERSION_COLUMN_NAME);
-            NodeSettings newNodeSettings = NodeSettings.builder()
-                                                       
.releaseVersion(releaseVersion)
-                                                       
.partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME))
-                                                       
.sidecarVersion(sidecarVersion)
-                                                       
.datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME))
-                                                       
.tokens(oneResult.getSet(TOKENS_COLUMN_NAME, String.class))
-                                                       
.rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME))
-                                                       
.rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME))
-                                                       .build();
-
-            if (!newNodeSettings.equals(nodeSettings))
-            {
-                // Update the nodeSettings cache
-                SimpleCassandraVersion previousVersion = currentVersion;
-                currentVersion = SimpleCassandraVersion.create(releaseVersion);
-                adapter = versionProvider.cassandra(releaseVersion)
-                                         .create(cqlSessionProvider, 
jmxClient, localNativeTransportAddress);
-                nodeSettings = newNodeSettings;
-                LOGGER.info("Cassandra version change detected (from={} to={}) 
for cassandraInstanceId={}. " +
-                            "New adapter loaded={}", previousVersion, 
currentVersion, cassandraInstanceId, adapter);
+            Row row = activeSession.execute(healthCheckStatement).one();
 
-                notifyCqlConnection();
+            if (row != null)
+            {
+                if (!isNativeUp)
+                {
+                    isNativeUp = true;
+                    notifyNativeConnection();
+                }
+            }
+            else
+            {
+                // This should never happen but added for completeness
+                LOGGER.error("Expected to query the release_version from 
system.local but encountered null {}",
+                             cassandraInstanceId);
+                // The cassandra native protocol connection to the node is 
down.
+                markNativeDownAndMaybeNotifyDisconnection();
+                // Unregister the host listener.
+                maybeUnregisterHostListener(activeSession);
             }
-            LOGGER.debug("Cassandra version {}", releaseVersion);
         }
         catch (IllegalArgumentException | NoHostAvailableException e)
         {
-            LOGGER.error("Unexpected error connecting to Cassandra instance 
{}", cassandraInstanceId, e);
-            // The cassandra node is down.
+            LOGGER.error("Unexpected error querying Cassandra instance {}", 
cassandraInstanceId, e);
+            // The cassandra native protocol connection to the node is down.
+            markNativeDownAndMaybeNotifyDisconnection();
             // Unregister the host listener.
-            markAsDownAndMaybeNotify();
             maybeUnregisterHostListener(activeSession);
         }
     }
 
-    private Host getHost(Metadata metadata)
+    protected NodeSettings newNodeSettingsFromJmx()
+    {
+        LimitedStorageOperations storageOperations =
+        jmxClient.proxy(LimitedStorageOperations.class, 
STORAGE_SERVICE_OBJ_NAME);
+        LimitedEndpointSnitchOperations endpointSnitchOperations =
+        jmxClient.proxy(LimitedEndpointSnitchOperations.class, 
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+        String releaseVersion = storageOperations.getReleaseVersion();
+        String partitionerName = storageOperations.getPartitionerName();
+        List<String> tokens = maybeGetTokens(storageOperations);
+        String dataCenter = endpointSnitchOperations.getDatacenter();
+
+        return NodeSettings.builder()
+                           .releaseVersion(releaseVersion)
+                           .partitioner(partitionerName)
+                           .sidecarVersion(sidecarVersion)
+                           .datacenter(dataCenter)
+                           .tokens(new LinkedHashSet<>(tokens))
+                           
.rpcAddress(localNativeTransportAddress.getAddress())
+                           .rpcPort(localNativeTransportAddress.getPort())
+                           .build();
+    }
+
+    /**
+     * Attempts to return the tokens assigned to the Cassandra instance.
+     *
+     * @param storageOperations the interface to perform the operations
+     * @return the list of tokens assigned to the Cassandra instance
+     */
+    protected List<String> maybeGetTokens(LimitedStorageOperations 
storageOperations)
+    {
+        try
+        {
+            return storageOperations.getTokens();
+        }
+        catch (AssertionError aex)
+        {
+            // On a joining node, the JMX call will fail with an 
AssertionError; we catch this scenario to prevent
+            // failure and just return an empty list of tokens. This is 
technically correct, because the node, while
+            // joining, doesn't actually own any tokens until it has 
successfully completed joining.
+            return Collections.emptyList();
+        }
+    }
+
+    protected Host getHost(Metadata metadata)
     {
         if (host == null)
         {
@@ -271,13 +350,17 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
     }
 
     /**
-     * @return a cached {@link NodeSettings}. The returned value could be null 
when no CQL connection is established
+     * Returns the cached node settings value obtained during scheduled health 
checks. This method does not delegate
+     * to the internal adapter, as the information is retrieved on the 
configured health check interval.
+     *
+     * @return a cached {@link NodeSettings}. The returned value will be 
{@code null} when no JMX connection is
+     * established
      */
     @Nullable
     @Override
     public NodeSettings nodeSettings()
     {
-        return nodeSettings;
+        return nodeSettingsFromJmx;
     }
 
     public ResultSet executeLocal(Statement statement)
@@ -326,7 +409,7 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
     @Override
     public void onDown(Host host)
     {
-        runIfThisHost(host, this::markAsDownAndMaybeNotify);
+        runIfThisHost(host, this::markNativeDownAndMaybeNotifyDisconnection);
     }
 
     @Override
@@ -345,14 +428,27 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
     {
     }
 
-    public boolean isUp()
+    /**
+     * @return {@code true} if the native protocol is enabled on the Cassandra 
instance, {@code false} otherwise
+     */
+    public boolean isNativeUp()
+    {
+        return isNativeUp;
+    }
+
+    /**
+     * @return {@code true} if JMX connectivity has been established to the 
Cassandra instance, {@code false} otherwise
+     */
+    public boolean isJmxUp()
     {
-        return nodeSettings != null;
+        return nodeSettingsFromJmx != null;
     }
 
     public void close()
     {
-        markAsDownAndMaybeNotify();
+        closed = true;
+        markNativeDownAndMaybeNotifyDisconnection();
+        markJmxDownAndMaybeNotifyDisconnection();
         Session activeSession = cqlSessionProvider.getIfConnected();
         if (activeSession != null)
         {
@@ -360,6 +456,7 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         }
         if (jmxClient != null)
         {
+            jmxClient.unregisterListener(notificationListener);
             try
             {
                 jmxClient.close();
@@ -377,7 +474,15 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         return currentVersion;
     }
 
-    protected void notifyCqlConnection()
+    protected void notifyJmxConnection()
+    {
+        JsonObject connectMessage = new JsonObject()
+                                    .put("cassandraInstanceId", 
cassandraInstanceId);
+        vertx.eventBus().publish(ON_CASSANDRA_JMX_READY.address(), 
connectMessage);
+        LOGGER.info("JMX connected to cassandraInstanceId={}", 
cassandraInstanceId);
+    }
+
+    protected void notifyNativeConnection()
     {
         JsonObject connectMessage = new JsonObject()
                                     .put("cassandraInstanceId", 
cassandraInstanceId);
@@ -385,11 +490,11 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         LOGGER.info("CQL connected to cassandraInstanceId={}", 
cassandraInstanceId);
     }
 
-    protected void markAsDownAndMaybeNotify()
+    protected void markNativeDownAndMaybeNotifyDisconnection()
     {
-        NodeSettings currentNodeSettings = nodeSettings;
-        nodeSettings = null;
-        if (currentNodeSettings != null)
+        boolean wasCqlConnected = isNativeUp;
+        isNativeUp = false;
+        if (wasCqlConnected)
         {
             JsonObject disconnectMessage = new JsonObject()
                                            .put("cassandraInstanceId", 
cassandraInstanceId);
@@ -398,6 +503,21 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
         }
     }
 
+    protected void markJmxDownAndMaybeNotifyDisconnection()
+    {
+        NodeSettings currentNodeSettings = nodeSettingsFromJmx;
+        nodeSettingsFromJmx = null;
+        currentVersion = null;
+        adapter = null;
+        if (currentNodeSettings != null)
+        {
+            JsonObject disconnectMessage = new JsonObject()
+                                           .put("cassandraInstanceId", 
cassandraInstanceId);
+            vertx.eventBus().publish(ON_CASSANDRA_JMX_DISCONNECTED.address(), 
disconnectMessage);
+            LOGGER.info("JMX disconnection from cassandraInstanceId={}", 
cassandraInstanceId);
+        }
+    }
+
     @Nullable
     private <T> T fromAdapter(Function<ICassandraAdapter, T> getter)
     {
@@ -412,4 +532,78 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
             runnable.run();
         }
     }
+
+    /**
+     * A {@link NotificationListener} implementation that reacts to {@link 
JMXConnectionNotification} notifications
+     * and updates the state of the JMX connection internally.
+     */
+    protected class JmxNotificationListener implements NotificationListener
+    {
+        @Override
+        public void handleNotification(Notification notification, Object 
handback)
+        {
+            if (notification instanceof JMXConnectionNotification)
+            {
+                JMXConnectionNotification connectNotice = 
(JMXConnectionNotification) notification;
+                String type = connectNotice.getType();
+                switch (type)
+                {
+                    case JMXConnectionNotification.OPENED:
+                        // Do not notify here as we may not have set up our 
own delegate yet
+                        // Instead, run the JMX Health Check, which will 
notify once we have
+                        // created or updated the adapter.
+                        jmxHealthCheck();
+                        break;
+
+                    case JMXConnectionNotification.CLOSED:
+                    case JMXConnectionNotification.FAILED:
+                    case JMXConnectionNotification.NOTIFS_LOST:
+                        markJmxDownAndMaybeNotifyDisconnection();
+                        break;
+
+                    default:
+                        LOGGER.warn("Encountered unexpected JMX notification 
type={}", type);
+                        break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Limited StorageOperations to obtain information required for node 
settings. Interface visibility is public
+     * because JMX proxy works on public interfaces only.
+     */
+    public interface LimitedStorageOperations
+    {
+        /**
+         * Fetch a string representation of the Cassandra version.
+         *
+         * @return A string representation of the Cassandra version.
+         */
+        String getReleaseVersion();
+
+        /**
+         * @return the cluster partitioner
+         */
+        String getPartitionerName();
+
+        /**
+         * Fetch string representations of the tokens for this node.
+         *
+         * @return a collection of tokens formatted as strings
+         */
+        List<String> getTokens();
+    }
+
+    /**
+     * Limited standard Snitch info to obtain information required for node 
settings. Interface visibility is public
+     * because JMX proxy works on public interfaces only.
+     */
+    public interface LimitedEndpointSnitchOperations
+    {
+        /**
+         * @return the Datacenter name depending on the respective snitch used 
for this node
+         */
+        String getDatacenter();
+    }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
 
b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
index 74e1b0e..f149f15 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
@@ -87,11 +87,11 @@ class SidecarLoadBalancingPolicy implements 
LoadBalancingPolicy
     @Override
     public HostDistance distance(Host host)
     {
-        if (!selectedHosts.contains(host))
+        if (selectedHosts.contains(host) || isLocalHost(host))
         {
-            return HostDistance.IGNORED;
+            return childPolicy.distance(host);
         }
-        return childPolicy.distance(host);
+        return HostDistance.IGNORED;
     }
 
     @Override
@@ -197,7 +197,7 @@ class SidecarLoadBalancingPolicy implements 
LoadBalancingPolicy
             List<Host> nonLocalHosts = partitionedHosts.get(false);
             if (nonLocalHosts == null || nonLocalHosts.isEmpty())
             {
-                LOGGER.warn("Did not find any non-local hosts in allHosts");
+                LOGGER.debug("Did not find any non-local hosts in allHosts");
                 return;
             }
 
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
index c00dc37..26da25e 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.cluster.instance;
 import java.util.List;
 
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.jetbrains.annotations.Nullable;
 
 
 /**
@@ -56,5 +57,5 @@ public interface InstanceMetadata
     /**
      * @return a {@link CassandraAdapterDelegate} specific for the instance
      */
-    CassandraAdapterDelegate delegate();
+    @Nullable CassandraAdapterDelegate delegate();
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index c1a36a7..ca39b6c 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Local implementation of InstanceMetadata.
@@ -35,6 +36,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
     private final int port;
     private final List<String> dataDirs;
     private final String stagingDir;
+    @Nullable
     private final CassandraAdapterDelegate delegate;
 
     protected InstanceMetadataImpl(Builder builder)
@@ -78,7 +80,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
     }
 
     @Override
-    public CassandraAdapterDelegate delegate()
+    public @Nullable CassandraAdapterDelegate delegate()
     {
         return delegate;
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java 
b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
index 4713535..509aecb 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JMX;
 import static org.apache.cassandra.sidecar.server.MainModule.NOT_OK_STATUS;
 import static org.apache.cassandra.sidecar.server.MainModule.OK_STATUS;
 
@@ -72,7 +73,11 @@ public class CassandraHealthHandler extends 
AbstractHandler<Void>
                                   Void request)
     {
         CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
-        if (delegate != null && delegate.isUp())
+
+        boolean isServiceUp = context.request().path().contains(JMX)
+                              ? delegate != null && delegate.isJmxUp()
+                              : delegate != null && delegate.isNativeUp();
+        if (isServiceUp)
         {
             context.json(OK_STATUS);
         }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java 
b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
index 8ed3900..468d90a 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java
@@ -63,6 +63,12 @@ public class RingHandler extends AbstractHandler<RingRequest>
                                RingRequest request)
     {
         CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
+
         StorageOperations storageOperations = delegate.storageOperations();
 
         if (storageOperations == null)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
index aea5ffc..fa00e84 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java
@@ -69,6 +69,11 @@ public class TokenRangeReplicaMapHandler extends 
AbstractHandler<TokenRangeRepli
                                TokenRangeReplicasRequest request)
     {
         CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
 
         StorageOperations operations = delegate.storageOperations();
         Metadata metadata = delegate.metadata();
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
index bc72cc5..264db54 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java
@@ -60,6 +60,11 @@ public class NodeSettingsHandler extends 
AbstractHandler<Void>
                                Void request)
     {
         CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+        if (delegate == null)
+        {
+            context.fail(cassandraServiceUnavailable());
+            return;
+        }
         NodeSettings nodeSettings = delegate.nodeSettings();
         if (nodeSettings == null)
         {
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
index 97b8290..8f60820 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
@@ -165,6 +165,11 @@ public class SSTableImportHandler extends 
AbstractHandler<SSTableImportRequest>
     private Future<Void> importSSTablesAsync(SSTableImporter.ImportOptions 
importOptions)
     {
         CassandraAdapterDelegate cassandra = 
metadataFetcher.delegate(importOptions.host());
+        if (cassandra == null)
+        {
+            return Future.failedFuture(cassandraServiceUnavailable());
+        }
+
         TableOperations tableOperations = cassandra.tableOperations();
 
         if (tableOperations == null)
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index fec3f4c..9e11cb8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -162,9 +162,17 @@ public class MainModule extends AbstractModule
         router.get(ApiEndpointsV1.HEALTH_ROUTE)
               .handler(context -> context.json(OK_STATUS));
 
+        // Backwards compatibility for the Cassandra health endpoint
+        //noinspection deprecation
         router.get(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE)
               .handler(cassandraHealthHandler);
 
+        router.get(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE)
+              .handler(cassandraHealthHandler);
+
+        router.get(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE)
+              .handler(cassandraHealthHandler);
+
         //noinspection deprecation
         router.get(ApiEndpointsV1.DEPRECATED_COMPONENTS_ROUTE)
               .handler(streamSSTableComponentHandler)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java 
b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
index d15423e..51c94c1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java
@@ -69,6 +69,18 @@ public enum SidecarServerEvents
      * for the Sidecar-managed Cassandra instances are available.
      */
     ON_ALL_CASSANDRA_CQL_READY,
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when a JMX connection for
+     * a given instance has been established. The instance identifier will be 
passed as part of the message.
+     */
+    ON_CASSANDRA_JMX_READY,
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when a JMX connection for
+     * a given instance has been disconnected. The instance identifier will be 
passed as part of the message.
+     */
+    ON_CASSANDRA_JMX_DISCONNECTED,
     ;
 
     public String address()
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
index 0c51bf0..518af51 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
@@ -75,6 +75,7 @@ public class InstanceMetadataFetcher
      * @return the {@link CassandraAdapterDelegate} for the given {@code 
host}, or the first instance when {@code host}
      * is {@code null}
      */
+    @Nullable
     public CassandraAdapterDelegate delegate(String host)
     {
         return instance(host).delegate();
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index eb3b29a..02ba0a2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -207,6 +207,12 @@ public class SSTableImporter
             ImportOptions options = pair.getValue();
 
             CassandraAdapterDelegate cassandra = 
metadataFetcher.delegate(options.host);
+            if (cassandra == null)
+            {
+                promise.fail(HttpExceptions.cassandraServiceUnavailable());
+                continue;
+            }
+
             TableOperations tableOperations = cassandra.tableOperations();
 
             if (tableOperations == null)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
index 2a3d1b4..54d54a5 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
@@ -62,7 +62,7 @@ public class SimpleCassandraVersion implements 
Comparable<SimpleCassandraVersion
      * @throws IllegalArgumentException if the provided string does not
      *                                  represent a version
      */
-    public static SimpleCassandraVersion create(String version)
+    public static SimpleCassandraVersion create(String version) throws 
IllegalArgumentException
     {
         String stripped = version.toUpperCase().replace(SNAPSHOT, "");
         Matcher matcher = PATTERN.matcher(stripped);
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
 
b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
similarity index 56%
rename from 
client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
rename to 
src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
index 2061c89..4733817 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java
+++ 
b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java
@@ -16,31 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.client.request;
-
-import io.netty.handler.codec.http.HttpMethod;
-import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
-import org.apache.cassandra.sidecar.common.data.HealthResponse;
+package org.apache.cassandra.distributed.impl;
 
 /**
- * Represents a request to retrieve the Cassandra health
+ * Utility class to interact with protected methods in AbstractCluster
  */
-public class CassandraHealthRequest extends DecodableRequest<HealthResponse>
+public class AbstractClusterUtils
 {
-    /**
-     * Constructs a request to retrieve the Cassandra health
-     */
-    public CassandraHealthRequest()
-    {
-        super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public HttpMethod method()
+    public static InstanceConfig createInstanceConfig(AbstractCluster cluster, 
int nodeNumber)
     {
-        return HttpMethod.GET;
+        return cluster.createInstanceConfig(nodeNumber);
     }
 }
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
index 04bddec..d77f5f8 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
@@ -53,7 +53,7 @@ public class CQLSessionProviderTest extends 
IntegrationTestBase
     void testCqlSessionProviderWorksAsExpected(VertxTestContext context, 
CassandraTestContext cassandraTestContext)
     throws Exception
     {
-        UpgradeableCluster cluster = cassandraTestContext.getCluster();
+        UpgradeableCluster cluster = cassandraTestContext.cluster();
         testWithClient(context, false, webClient -> {
                            // To start, both instances are stopped, so we 
should get 503s for both
                            buildInstanceHealthRequest(webClient, "1")
@@ -129,7 +129,7 @@ public class CQLSessionProviderTest extends 
IntegrationTestBase
     {
         return webClient.get(server.actualPort(),
                              "localhost",
-                             "/api/v1/cassandra/__health?instanceId=" + 
instanceId)
+                             "/api/v1/cassandra/native/__health?instanceId=" + 
instanceId)
                         .as(BodyCodec.string());
     }
 
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
new file mode 100644
index 0000000..9bc1c82
--- /dev/null
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.impl.ConcurrentHashSet;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.Timeout;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Ensures the Delegate works correctly
+ */
+@ExtendWith(VertxExtension.class)
+class DelegateIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest()
+    void testCorrectVersionIsEnabled()
+    {
+        CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
+                                                              
.instanceFromId(1)
+                                                              .delegate();
+        assertThat(delegate).isNotNull();
+        SimpleCassandraVersion version = delegate.version();
+        assertThat(version).isNotNull();
+        assertThat(version.major).isEqualTo(sidecarTestContext.version.major);
+        assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor);
+        assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version);
+    }
+
+    @CassandraIntegrationTest()
+    void testHealthCheck(VertxTestContext context)
+    {
+        EventBus eventBus = vertx.eventBus();
+        Checkpoint cqlReady = context.checkpoint();
+        Checkpoint cqlDisconnected = context.checkpoint();
+
+        CassandraAdapterDelegate adapterDelegate = 
sidecarTestContext.instancesConfig()
+                                                                     
.instanceFromId(1)
+                                                                     
.delegate();
+        assertThat(adapterDelegate).isNotNull();
+        assertThat(adapterDelegate.isJmxUp()).as("jmx health check 
succeeds").isTrue();
+        assertThat(adapterDelegate.isNativeUp()).as("native health check 
succeeds").isTrue();
+
+        // Set up test listeners before disabling/enabling binary to avoid 
race conditions
+        // where the event happens before the consumer is registered.
+        eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
+            int instanceId = message.body().getInteger("cassandraInstanceId");
+            CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
+                                                                  
.instanceFromId(instanceId)
+                                                                  .delegate();
+
+            assertThat(delegate).isNotNull();
+            assertThat(delegate.isNativeUp()).as("health check fails after 
binary has been disabled").isFalse();
+            cqlDisconnected.flag();
+            sidecarTestContext.cluster().get(1).nodetool("enablebinary");
+        });
+
+        eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> reconnectMessage) -> {
+            int instanceId = 
reconnectMessage.body().getInteger("cassandraInstanceId");
+            CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
+                                                                  
.instanceFromId(instanceId)
+                                                                  .delegate();
+            assertThat(delegate).isNotNull();
+            assertThat(delegate.isNativeUp()).as("health check succeeds after 
binary has been enabled")
+                                             .isTrue();
+            cqlReady.flag();
+        });
+
+        // Disable binary
+        NodeToolResult nodetoolResult = 
sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
+        assertThat(nodetoolResult.getRc())
+        .withFailMessage("Failed to disable binary:\nstdout:" + 
nodetoolResult.getStdout()
+                         + "\nstderr: " + nodetoolResult.getStderr())
+        .isEqualTo(0);
+        // NOTE: enable binary happens inside the disable binary handler 
above, which then will trigger the
+        // cqlReady flag.
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 3)
+    void testAllInstancesHealthCheck(VertxTestContext context)
+    {
+        EventBus eventBus = vertx.eventBus();
+        Checkpoint allCqlReady = context.checkpoint();
+
+        Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
+        eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+            JsonArray cassandraInstanceIds = 
message.body().getJsonArray("cassandraInstanceIds");
+            assertThat(cassandraInstanceIds).hasSize(3);
+            assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
+            .allMatch(expectedCassandraInstanceIds::contains);
+
+            allCqlReady.flag();
+        });
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 3)
+    void testStoppingAnInstance(VertxTestContext context)
+    {
+        EventBus eventBus = vertx.eventBus();
+        Checkpoint allCqlReady = context.checkpoint();
+        Checkpoint cqlDisconnected = context.checkpoint();
+        Checkpoint jmxDisconnected = context.checkpoint();
+
+        Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
+        eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+            JsonArray cassandraInstanceIds = 
message.body().getJsonArray("cassandraInstanceIds");
+            assertThat(cassandraInstanceIds).hasSize(3);
+            assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
+            .allMatch(expectedCassandraInstanceIds::contains);
+
+            allCqlReady.flag();
+
+            // Stop instance 2
+            ClusterUtils.stopUnchecked(sidecarTestContext.cluster().get(2));
+        });
+
+        eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
+            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+            assertThat(instanceId).isEqualTo(2);
+
+            buildNativeHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected));
+        });
+
+        eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
+            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+            assertThat(instanceId).isEqualTo(2);
+
+            buildJmxHealthRequest(client, 
instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected));
+        });
+    }
+
+    @Timeout(value = 2, timeUnit = TimeUnit.MINUTES)
+    @CassandraIntegrationTest(nodesPerDc = 2, newNodesPerDc = 1, startCluster 
= false)
+    public void testChangingClusterSize(VertxTestContext context) throws 
InterruptedException
+    {
+        EventBus eventBus = vertx.eventBus();
+
+        Checkpoint jmxConnected = context.checkpoint(3);
+        Checkpoint nativeConnected = context.checkpoint(3);
+        Checkpoint jmxNotConnected = context.checkpoint();
+        Checkpoint nativeNotConnected = context.checkpoint();
+
+        CountDownLatch firstTwoConnected = new CountDownLatch(2);
+
+        Set<Integer> nativeConnectedInstances = new ConcurrentHashSet<>();
+        Set<Integer> jmxConnectedInstances = new ConcurrentHashSet<>();
+
+        eventBus.localConsumer(ON_CASSANDRA_JMX_READY.address(), 
(Message<JsonObject> message) -> {
+            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+            logger.info("DBG: Received JMX connection notification for {}", 
instanceId);
+            // make sure the instance wasn't already in the set before 
validating
+            if (jmxConnectedInstances.add(instanceId))
+            {
+                jmxConnected.flag();
+                validateJmxConnections(context, jmxConnectedInstances, 
jmxNotConnected, firstTwoConnected);
+            }
+        });
+
+        eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+            Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+            logger.info("DBG: Received native connection notification for {}", 
instanceId);
+            buildNativeHealthRequest(client, 
instanceId).send(assertHealthCheckOk(context, nativeConnected));
+            // make sure the instance wasn't already in the set before 
validating/flagging
+            if (nativeConnectedInstances.add(instanceId))
+            {
+                nativeConnected.flag();
+                validateNativeConnections(context, nativeNotConnected, 
firstTwoConnected, nativeConnectedInstances);
+            }
+        });
+
+        // Now that the event listeners are set up, start the cluster
+        sidecarTestContext.cluster().startup();
+
+        // Wait for the first two instances to get connected
+        assertThat(firstTwoConnected.await(2, TimeUnit.MINUTES)).isTrue();
+
+        // now start the 3rd instance - the test will complete when it's 
connected
+        addNewInstance();
+    }
+
+    private void validateJmxConnections(VertxTestContext context, Set<Integer> 
jmxConnectedInstances,
+                                        Checkpoint notOkCheckpoint, 
CountDownLatch firstTwoConnected)
+    {
+        int upInstanceCount = jmxConnectedInstances.size();
+        if (upInstanceCount == 2)
+        {
+            buildJmxHealthRequest(client, 
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
+            logger.info("DBG: First two instances connected via JMX, third is 
down");
+            firstTwoConnected.countDown();
+        }
+        else if (upInstanceCount == 3)
+        {
+            assertThat(jmxConnectedInstances).containsExactly(1, 2, 3);
+        }
+    }
+
+    private void validateNativeConnections(VertxTestContext context,
+                                           Checkpoint notOkCheckpoint,
+                                           CountDownLatch firstTwoConnected,
+                                           Set<Integer> 
nativeConnectedInstances)
+    {
+        int upInstanceCount = nativeConnectedInstances.size();
+        if (upInstanceCount == 2)
+        {
+            assertThat(nativeConnectedInstances).containsExactly(1, 2);
+            buildNativeHealthRequest(client, 
3).send(assertHealthCheckNotOk(context, notOkCheckpoint));
+            logger.info("DBG: First two instances connected via native, third 
is down");
+            firstTwoConnected.countDown();
+        }
+        else if (upInstanceCount == 3)
+        {
+            assertThat(nativeConnectedInstances).containsExactly(1, 2, 3);
+        }
+    }
+
+    private static Handler<AsyncResult<HttpResponse<Buffer>>> 
assertHealthCheckOk(VertxTestContext context,
+                                                                               
   Checkpoint checkpoint)
+    {
+        return context.succeeding(response -> context.verify(() -> {
+            assertThat(response.statusCode()).isEqualTo(OK.code());
+            
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
+        }));
+    }
+
+    private Handler<AsyncResult<HttpResponse<Buffer>>> 
assertHealthCheckNotOk(VertxTestContext context,
+                                                                              
Checkpoint checkpoint)
+    {
+        return context.succeeding(response -> context.verify(() -> {
+            
assertThat(response.statusCode()).isEqualTo(SERVICE_UNAVAILABLE.code());
+            
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("NOT_OK");
+            checkpoint.flag();
+        }));
+    }
+
+    private HttpRequest<Buffer> buildNativeHealthRequest(WebClient webClient, 
int instanceId)
+    {
+        return webClient.get(server.actualPort(),
+                             "localhost",
+                             "/api/v1/cassandra/native/__health?instanceId=" + 
instanceId);
+    }
+
+    private HttpRequest<Buffer> buildJmxHealthRequest(WebClient webClient, int 
instanceId)
+    {
+        return webClient.get(server.actualPort(),
+                             "localhost",
+                             "/api/v1/cassandra/jmx/__health?instanceId=" + 
instanceId);
+    }
+
+    private void addNewInstance()
+    {
+        UpgradeableCluster cluster = sidecarTestContext.cluster();
+        IUpgradeableInstance newInstance = ClusterUtils.addInstance(cluster, 
cluster.get(1).config(), config -> {
+            config.set("auto_bootstrap", true);
+            config.with(Feature.GOSSIP,
+                        Feature.JMX,
+                        Feature.NATIVE_PROTOCOL);
+        });
+        newInstance.startup(cluster);
+    }
+}
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java 
b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
deleted file mode 100644
index 7586a22..0000000
--- a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.common;
-
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import com.google.common.collect.ImmutableSet;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.Message;
-import io.vertx.core.json.JsonArray;
-import io.vertx.core.json.JsonObject;
-import io.vertx.junit5.Checkpoint;
-import io.vertx.junit5.VertxExtension;
-import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.distributed.api.NodeToolResult;
-import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
-import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
-import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
-import org.apache.cassandra.testing.CassandraIntegrationTest;
-
-import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
-import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED;
-import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Ensures the Delegate works correctly
- */
-@ExtendWith(VertxExtension.class)
-class DelegateTest extends IntegrationTestBase
-{
-    @CassandraIntegrationTest(jmx = false)
-    void testCorrectVersionIsEnabled()
-    {
-        CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
-                                                              
.instanceFromId(1)
-                                                              .delegate();
-        SimpleCassandraVersion version = delegate.version();
-        assertThat(version).isNotNull();
-        assertThat(version.major).isEqualTo(sidecarTestContext.version.major);
-        assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor);
-        assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version);
-    }
-
-    @CassandraIntegrationTest(jmx = false)
-    void testHealthCheck(VertxTestContext context)
-    {
-        EventBus eventBus = vertx.eventBus();
-        Checkpoint cqlReady = context.checkpoint();
-        Checkpoint cqlDisconnected = context.checkpoint();
-
-        CassandraAdapterDelegate adapterDelegate = 
sidecarTestContext.instancesConfig()
-                                                                     
.instanceFromId(1)
-                                                                     
.delegate();
-        assertThat(adapterDelegate.isUp()).as("health check 
succeeds").isTrue();
-
-        // Set up test listeners before disabling/enabling binary to avoid 
race conditions
-        // where the event happens before the consumer is registered.
-        eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), 
(Message<JsonObject> message) -> {
-            int instanceId = message.body().getInteger("cassandraInstanceId");
-            CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
-                                                                  
.instanceFromId(instanceId)
-                                                                  .delegate();
-
-            assertThat(delegate.isUp()).as("health check fails after binary 
has been disabled").isFalse();
-            cqlDisconnected.flag();
-            sidecarTestContext.cluster().get(1).nodetool("enablebinary");
-        });
-
-        eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> reconnectMessage) -> {
-            int instanceId = 
reconnectMessage.body().getInteger("cassandraInstanceId");
-            CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesConfig()
-                                                                  
.instanceFromId(instanceId)
-                                                                  .delegate();
-            assertThat(delegate.isUp()).as("health check succeeds after binary 
has been enabled")
-                                       .isTrue();
-            cqlReady.flag();
-        });
-
-        // Disable binary
-        NodeToolResult nodetoolResult = 
sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
-        assertThat(nodetoolResult.getRc())
-        .withFailMessage("Failed to disable binary:\nstdout:" + 
nodetoolResult.getStdout()
-                         + "\nstderr: " + nodetoolResult.getStderr())
-        .isEqualTo(0);
-        // NOTE: enable binary happens inside the disable binary handler 
above, which then will trigger the
-        // cqlReady flag.
-    }
-
-    @CassandraIntegrationTest(jmx = false, nodesPerDc = 3)
-    void testAllInstancesHealthCheck(VertxTestContext context)
-    {
-        EventBus eventBus = vertx.eventBus();
-        Checkpoint allCqlReady = context.checkpoint();
-
-        Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3);
-        eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
-            JsonArray cassandraInstanceIds = 
message.body().getJsonArray("cassandraInstanceIds");
-            assertThat(cassandraInstanceIds).hasSize(3);
-            assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size()))
-            .allMatch(expectedCassandraInstanceIds::contains);
-
-            allCqlReady.flag();
-        });
-    }
-}
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java 
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
index e643918..e152850 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
@@ -46,7 +46,7 @@ public class JmxClientTest
             assertThat(opMode).isNotNull();
             assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", 
"DECOMMISSIONED", "CLIENT");
 
-            IUpgradeableInstance instance = 
context.getCluster().getFirstRunningInstance();
+            IUpgradeableInstance instance = 
context.cluster().getFirstRunningInstance();
             IInstanceConfig config = instance.config();
             
assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
             assertThat(jmxClient.port()).isEqualTo(config.jmxPort());
@@ -101,7 +101,7 @@ public class JmxClientTest
 
     private static JmxClient createJmxClient(CassandraTestContext context)
     {
-        IUpgradeableInstance instance = 
context.getCluster().getFirstRunningInstance();
+        IUpgradeableInstance instance = 
context.cluster().getFirstRunningInstance();
         IInstanceConfig config = instance.config();
         return JmxClient.builder()
                         
.host(config.broadcastAddress().getAddress().getHostAddress())
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
index 8a91324..4b0ab8c 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
@@ -52,7 +52,7 @@ class GossipInfoHandlerIntegrationTest extends 
IntegrationTestBase
                       assertThat(gossipInfo.generation()).isNotNull();
                       assertThat(gossipInfo.heartbeat()).isNotNull();
                       assertThat(gossipInfo.hostId()).isNotNull();
-                      String releaseVersion = 
cassandraTestContext.getCluster().getFirstRunningInstance()
+                      String releaseVersion = 
cassandraTestContext.cluster().getFirstRunningInstance()
                                                                   
.getReleaseVersionString();
                       
assertThat(gossipInfo.releaseVersion()).startsWith(releaseVersion);
                       context.completeNow();
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
index 51f6828..80d86e2 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
@@ -78,7 +78,7 @@ class RingHandlerIntegrationTest extends IntegrationTestBase
     @CassandraIntegrationTest(gossip = true)
     void ringFailsWhenGossipIsDisabled(CassandraTestContext context, 
VertxTestContext testContext) throws Exception
     {
-        int disableGossip = 
context.getCluster().getFirstRunningInstance().nodetool("disablegossip");
+        int disableGossip = 
context.cluster().getFirstRunningInstance().nodetool("disablegossip");
         assertThat(disableGossip).isEqualTo(0);
         String testRoute = "/api/v1/cassandra/ring";
         testWithClient(testContext, client -> {
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
index 6331bd2..576ad2f 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java
@@ -39,7 +39,7 @@ public class BasicGossipDisabledTest extends 
BaseTokenRangeIntegrationTest
     void tokenRangeEndpointFailsWhenGossipIsDisabled(CassandraTestContext 
context, VertxTestContext testContext)
     throws Exception
     {
-        int disableGossip = 
context.getCluster().getFirstRunningInstance().nodetool("disablegossip");
+        int disableGossip = 
context.cluster().getFirstRunningInstance().nodetool("disablegossip");
         assertThat(disableGossip).isEqualTo(0);
         retrieveMappingWithKeyspace(testContext, TEST_KEYSPACE, response -> {
             
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE.code());
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
 
b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 0faa420..35a64bb 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -25,12 +25,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import com.datastax.driver.core.Session;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.impl.AbstractClusterUtils;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
 import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
@@ -46,6 +48,7 @@ import 
org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.apache.cassandra.testing.AbstractCassandraTestContext;
+import org.jetbrains.annotations.NotNull;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -208,13 +211,13 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         UpgradeableCluster cluster = cluster();
         List<InstanceMetadata> metadata = new ArrayList<>();
         jmxClients = new ArrayList<>();
-        List<InetSocketAddress> addresses = buildContactList(cluster);
+        List<InstanceConfig> configs = buildInstanceConfigs(cluster);
+        List<InetSocketAddress> addresses = buildContactList(configs);
         sessionProvider = new CQLSessionProviderImpl(addresses, addresses, 
500, null,
                                                      0, 
SharedExecutorNettyOptions.INSTANCE);
-        for (int i = 0; i < numInstancesToManage; i++)
+        for (int i = 0; i < configs.size(); i++)
         {
-            IUpgradeableInstance instance = cluster.get(i + 1); // 1-based 
indexing to match node names;
-            IInstanceConfig config = instance.config();
+            IInstanceConfig config = configs.get(i);
             String hostName = JMXUtil.getJmxHost(config);
             int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
             // The in-jvm dtest framework sometimes returns a cluster before 
all the jmx infrastructure is initialized.
@@ -255,15 +258,25 @@ public class CassandraSidecarTestContext implements 
AutoCloseable
         return new InstancesConfigImpl(metadata, dnsResolver);
     }
 
-    private List<InetSocketAddress> buildContactList(UpgradeableCluster 
cluster)
+    private List<InetSocketAddress> buildContactList(List<InstanceConfig> 
configs)
     {
-        return cluster.stream()
-                      .map(i -> new 
InetSocketAddress(i.config().broadcastAddress().getAddress(),
-                                                      
tryGetIntConfig(i.config(), "native_transport_port", 9042)))
-                      .limit(numInstancesToManage)
+        // Always return the complete list of addresses even if the cluster 
isn't yet that large
+        // this way, we populate the entire local instance list
+        return configs.stream()
+                      .map(config -> new 
InetSocketAddress(config.broadcastAddress().getAddress(),
+                                                           
tryGetIntConfig(config, "native_transport_port", 9042)))
                       .collect(Collectors.toList());
     }
 
+    @NotNull
+    private List<InstanceConfig> buildInstanceConfigs(UpgradeableCluster 
cluster)
+    {
+        return IntStream.range(1, numInstancesToManage + 1)
+                        .mapToObj(nodeNum ->
+                                  
AbstractClusterUtils.createInstanceConfig(cluster, nodeNum))
+                        .collect(Collectors.toList());
+    }
+
     /**
      * A listener for {@link InstancesConfig} state changes
      */
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
 
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index c68eda2..64464dc 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -103,7 +103,8 @@ public abstract class IntegrationTestBase
 
         if (sidecarTestContext.isClusterBuilt())
         {
-            MessageConsumer<Object> cqlReadyConsumer = 
vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address());
+            MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus()
+                                                                
.localConsumer(ON_CASSANDRA_CQL_READY.address());
             cqlReadyConsumer.handler(message -> {
                 cqlReadyConsumer.unregister();
                 context.completeNow();
@@ -166,7 +167,8 @@ public abstract class IntegrationTestBase
                                                               
.instanceFromId(1)
                                                               .delegate();
 
-        if (delegate.isUp() || !waitForCluster)
+        assertThat(delegate).isNotNull();
+        if (delegate.isNativeUp() || !waitForCluster)
         {
             tester.accept(client);
         }
diff --git 
a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
 
b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
index 4aaa055..c63809f 100644
--- 
a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
+++ 
b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
@@ -87,6 +87,6 @@ public abstract class AbstractCassandraTestContext implements 
AutoCloseable
 
     public int clusterSize()
     {
-        return annotation.numDcs() * annotation.nodesPerDc();
+        return annotation.numDcs() * (annotation.nodesPerDc() + 
annotation.newNodesPerDc());
     }
 }
diff --git 
a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
 
b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
index c52471d..29fa433 100644
--- 
a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
+++ 
b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
@@ -112,7 +112,7 @@ public @interface CassandraIntegrationTest
      *     or {@link 
ConfigurableCassandraTestContext#configureAndStartCluster(Consumer)} to get the 
cluster.
      *     NOTE: This cluster object must be closed by the test as the 
framework doesn't have access to it.
      * If true (the default), the test should take an instance of {@link 
CassandraTestContext}
-     *          {@link CassandraTestContext#getCluster()} will contain the 
built cluster.
+     *          {@link CassandraTestContext#cluster()} will contain the built 
cluster.
      * @return true if the cluster should be built by the test framework, 
false otherwise
      */
     boolean buildCluster() default true;
diff --git 
a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java 
b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
index 8498776..09afe24 100644
--- 
a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
+++ 
b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
@@ -43,9 +43,4 @@ public class CassandraTestContext extends 
AbstractCassandraTestContext
                + ", cluster=" + cluster
                + '}';
     }
-
-    public UpgradeableCluster getCluster()
-    {
-        return cluster;
-    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java 
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index d639944..0acd069 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -132,7 +132,7 @@ public class TestModule extends AbstractModule
                                                                  
.tokens(Collections.singleton("testToken"))
                                                                  .build());
         }
-        when(delegate.isUp()).thenReturn(isUp);
+        when(delegate.isNativeUp()).thenReturn(isUp);
         when(instanceMeta.delegate()).thenReturn(delegate);
         return instanceMeta;
     }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java 
b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
index f5001af..24536cd 100644
--- a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
@@ -371,9 +371,10 @@ class ServerSSLTest
                   assertThat(throwable).isNotNull()
                                        
.isInstanceOf(SSLHandshakeException.class)
                                        .hasMessageContaining("Failed to create 
SSL connection")
-                                       
.hasCauseInstanceOf(SSLHandshakeException.class)
-                                       .hasRootCauseMessage("No appropriate 
protocol (protocol is disabled " +
-                                                            "or cipher suites 
are inappropriate)");
+                                       
.hasCauseInstanceOf(SSLHandshakeException.class);
+                  assertThat(throwable.getCause().getMessage())
+                  .containsAnyOf("No appropriate protocol (protocol is 
disabled or cipher suites are inappropriate)",
+                                 "Received fatal alert: protocol_version");
                   context.completeNow();
               }));
     }
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java 
b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
index 3ea3d84..28a44b0 100644
--- a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
@@ -36,6 +36,7 @@ import 
org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.JmxClient;
 import org.apache.cassandra.sidecar.common.MockCassandraFactory;
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
 import org.apache.cassandra.sidecar.utils.CassandraVersionProvider;
@@ -110,8 +111,9 @@ public class SnapshotUtils
 
         if (delegate == null)
         {
-            delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, 
cqlSessionProvider1, null, null,
-                                                    "localhost1", 9042);
+            JmxClient mockJmxClient = mock(JmxClient.class);
+            delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, 
cqlSessionProvider1, mockJmxClient,
+                                                    null, "localhost1", 9042);
         }
 
         InstanceMetadataImpl localhost = InstanceMetadataImpl.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to