This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 529171b CASSANDRASC-87: Add JMX health checks during the periodic health checks 529171b is described below commit 529171b1f6dee277a9087eb9da7242ce17873643 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Mon Dec 11 13:49:36 2023 -0800 CASSANDRASC-87: Add JMX health checks during the periodic health checks In this commit, we add health checks based on the JMX connectivity to the managed Cassandra instances. Additionally, we construct the NodeSettings object based on JMX. This allows the Sidecar process to be able to determine an adapter for the node even if the node is in joining state, or its binary port has been disabled. Co-authored-by: Doug Rohrer <d...@therohrers.org> Co-authored-by: Francisco Guerrero <fran...@apache.org> Patch by Doug Rohrer, Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-87 --- CHANGES.txt | 1 + .../sidecar/adapters/base/CassandraAdapter.java | 33 +-- .../cassandra/sidecar/client/RequestContext.java | 43 ++- .../cassandra/sidecar/client/SidecarClient.java | 40 ++- ...Request.java => CassandraJmxHealthRequest.java} | 10 +- ...uest.java => CassandraNativeHealthRequest.java} | 24 +- .../sidecar/client/SidecarClientTest.java | 72 ++++- .../cassandra/sidecar/common/ApiEndpointsV1.java | 9 + .../apache/cassandra/sidecar/common/JmxClient.java | 33 ++- .../cassandra/sidecar/common/NodeSettings.java | 7 - .../sidecar/cluster/CassandraAdapterDelegate.java | 316 +++++++++++++++++---- .../cluster/SidecarLoadBalancingPolicy.java | 8 +- .../sidecar/cluster/instance/InstanceMetadata.java | 3 +- .../cluster/instance/InstanceMetadataImpl.java | 4 +- .../sidecar/routes/CassandraHealthHandler.java | 7 +- .../cassandra/sidecar/routes/RingHandler.java | 6 + .../routes/TokenRangeReplicaMapHandler.java | 5 + .../routes/cassandra/NodeSettingsHandler.java | 5 + .../sstableuploads/SSTableImportHandler.java | 5 + .../cassandra/sidecar/server/MainModule.java | 8 + .../sidecar/server/SidecarServerEvents.java | 12 + .../sidecar/utils/InstanceMetadataFetcher.java | 1 + .../cassandra/sidecar/utils/SSTableImporter.java | 6 + .../sidecar/utils/SimpleCassandraVersion.java | 2 +- .../distributed/impl/AbstractClusterUtils.java | 26 +- .../sidecar/common/CQLSessionProviderTest.java | 4 +- .../sidecar/common/DelegateIntegrationTest.java | 313 ++++++++++++++++++++ .../cassandra/sidecar/common/DelegateTest.java | 125 -------- .../cassandra/sidecar/common/JmxClientTest.java | 4 +- .../routes/GossipInfoHandlerIntegrationTest.java | 2 +- .../sidecar/routes/RingHandlerIntegrationTest.java | 2 +- .../routes/tokenrange/BasicGossipDisabledTest.java | 2 +- .../testing/CassandraSidecarTestContext.java | 33 ++- .../sidecar/testing/IntegrationTestBase.java | 6 +- .../testing/AbstractCassandraTestContext.java | 2 +- .../testing/CassandraIntegrationTest.java | 2 +- .../cassandra/testing/CassandraTestContext.java | 5 - .../org/apache/cassandra/sidecar/TestModule.java | 2 +- .../cassandra/sidecar/server/ServerSSLTest.java | 7 +- .../cassandra/sidecar/snapshots/SnapshotUtils.java | 6 +- 40 files changed, 892 insertions(+), 309 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f94b7b9..25e76dc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Add JMX health checks during the periodic health checks (CASSANDRASC-87) * Sidecar should be able to load metadata even if the local instance is unavailable (CASSANDRASC-79) * Expose additional SSL configuration options for the Sidecar Service (CASSANDRASC-82) * Expose additional node settings (CASSANDRASC-84) diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java index 0e12a0e..f02ea2f 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java @@ -28,7 +28,6 @@ import com.datastax.driver.core.DriverUtils; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import org.apache.cassandra.sidecar.common.CQLSessionProvider; @@ -41,13 +40,6 @@ import org.apache.cassandra.sidecar.common.TableOperations; import org.apache.cassandra.sidecar.common.dns.DnsResolver; import org.jetbrains.annotations.Nullable; -import static org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME; - /** * A {@link ICassandraAdapter} implementation for Cassandra 4.0 and later */ @@ -107,30 +99,7 @@ public class CassandraAdapter implements ICassandraAdapter @Nullable public NodeSettings nodeSettings() { - ResultSet rs = executeLocal("SELECT " - + RELEASE_VERSION_COLUMN_NAME + ", " - + PARTITIONER_COLUMN_NAME + ", " - + DATA_CENTER_COLUMN_NAME + ", " - + RPC_ADDRESS_COLUMN_NAME + ", " - + RPC_PORT_COLUMN_NAME + ", " - + TOKENS_COLUMN_NAME - + " FROM system.local"); - if (rs == null) - { - return null; - } - - Row oneResult = rs.one(); - - return NodeSettings.builder() - .releaseVersion(oneResult.getString(RELEASE_VERSION_COLUMN_NAME)) - .partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME)) - .sidecarVersion(sidecarVersion) - .datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME)) - .tokens(oneResult.getSet(TOKENS_COLUMN_NAME, String.class)) - .rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME)) - .rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME)) - .build(); + throw new UnsupportedOperationException("Node settings are not provided by this adapter"); } @Override diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index f595fb1..b7c7e82 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -22,7 +22,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.cassandra.sidecar.client.request.CassandraHealthRequest; +import org.apache.cassandra.sidecar.client.request.CassandraJmxHealthRequest; +import org.apache.cassandra.sidecar.client.request.CassandraNativeHealthRequest; import org.apache.cassandra.sidecar.client.request.CleanSSTableUploadSessionRequest; import org.apache.cassandra.sidecar.client.request.ClearSnapshotRequest; import org.apache.cassandra.sidecar.client.request.CreateSnapshotRequest; @@ -52,7 +53,16 @@ import org.apache.cassandra.sidecar.common.utils.HttpRange; public class RequestContext { protected static final SidecarHealthRequest SIDECAR_HEALTH_REQUEST = new SidecarHealthRequest(); - protected static final CassandraHealthRequest CASSANDRA_HEALTH_REQUEST = new CassandraHealthRequest(); + + /** + * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_REQUEST} + */ + @Deprecated + protected static final CassandraNativeHealthRequest CASSANDRA_HEALTH_REQUEST = + new CassandraNativeHealthRequest(true /* useDeprecatedHealthEndpoint */); + protected static final CassandraNativeHealthRequest CASSANDRA_NATIVE_HEALTH_REQUEST = + new CassandraNativeHealthRequest(); + protected static final CassandraJmxHealthRequest CASSANDRA_JMX_HEALTH_REQUEST = new CassandraJmxHealthRequest(); protected static final SchemaRequest FULL_SCHEMA_REQUEST = new SchemaRequest(); protected static final TimeSkewRequest TIME_SKEW_REQUEST = new TimeSkewRequest(); protected static final NodeSettingsRequest NODE_SETTINGS_REQUEST = new NodeSettingsRequest(); @@ -190,16 +200,41 @@ public class RequestContext } /** - * Sets the {@code request} to be a {@link CassandraHealthRequest} - * and returns a reference to this Builder enabling method chaining + * Sets the {@code request} to be a {@link CassandraNativeHealthRequest} + * with the {@code useDeprecatedHealthEndpoint} parameter set to {@code true} + * and returns a reference to this Builder enabling method chaining. * * @return a reference to this Builder + * @deprecated in favor of {@link #cassandraNativeHealthRequest()} */ + @Deprecated public Builder cassandraHealthRequest() { return request(CASSANDRA_HEALTH_REQUEST); } + /** + * Sets the {@code request} to be a {@link CassandraNativeHealthRequest} + * and returns a reference to this Builder enabling method chaining + * + * @return a reference to this Builder + */ + public Builder cassandraNativeHealthRequest() + { + return request(CASSANDRA_NATIVE_HEALTH_REQUEST); + } + + /** + * Sets the {@code request} to be a {@link CassandraJmxHealthRequest} + * and returns a reference to this Builder enabling method chaining + * + * @return a reference to this Builder + */ + public Builder cassandraJmxHealthRequest() + { + return request(CASSANDRA_JMX_HEALTH_REQUEST); + } + /** * Sets the {@code request} to be a {@link SchemaRequest} for the full schema and returns a reference to * this Builder enabling method chaining. diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index d9d77f8..90fd47f 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -79,22 +79,50 @@ public class SidecarClient implements AutoCloseable public CompletableFuture<HealthResponse> sidecarHealth() { return executor.executeRequestAsync(requestBuilder() - .sidecarHealthRequest() - .retryPolicy(new OncePerInstanceRetryPolicy()) - .build()); + .sidecarHealthRequest() + .retryPolicy(new OncePerInstanceRetryPolicy()) + .build()); } /** * Executes the Cassandra health request using the configured selection policy and with no retries * * @return a completable future of the Cassandra health response + * @deprecated use {@link #cassandraNativeHealth()} instead */ + @Deprecated public CompletableFuture<HealthResponse> cassandraHealth() { return executor.executeRequestAsync(requestBuilder() - .cassandraHealthRequest() - .retryPolicy(new OncePerInstanceRetryPolicy()) - .build()); + .cassandraHealthRequest() + .retryPolicy(new OncePerInstanceRetryPolicy()) + .build()); + } + + /** + * Executes the Cassandra health request using the configured selection policy and with no retries + * + * @return a completable future of the Cassandra health response + */ + public CompletableFuture<HealthResponse> cassandraNativeHealth() + { + return executor.executeRequestAsync(requestBuilder() + .cassandraNativeHealthRequest() + .retryPolicy(new OncePerInstanceRetryPolicy()) + .build()); + } + + /** + * Executes the Cassandra health request using the configured selection policy and with no retries + * + * @return a completable future of the Cassandra health response + */ + public CompletableFuture<HealthResponse> cassandraJmxHealth() + { + return executor.executeRequestAsync(requestBuilder() + .cassandraJmxHealthRequest() + .retryPolicy(new OncePerInstanceRetryPolicy()) + .build()); } /** diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java similarity index 77% copy from client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java copy to client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java index 2061c89..1d5644a 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraJmxHealthRequest.java @@ -23,16 +23,16 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.data.HealthResponse; /** - * Represents a request to retrieve the Cassandra health + * Represents a request to retrieve the connectivity health checks performed against the Cassandra JMX protocol */ -public class CassandraHealthRequest extends DecodableRequest<HealthResponse> +public class CassandraJmxHealthRequest extends DecodableRequest<HealthResponse> { /** - * Constructs a request to retrieve the Cassandra health + * Constructs a request to retrieve the Cassandra JMX health */ - public CassandraHealthRequest() + public CassandraJmxHealthRequest() { - super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); + super(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE); } /** diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java similarity index 57% copy from client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java copy to client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java index 2061c89..35960c9 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraNativeHealthRequest.java @@ -23,16 +23,30 @@ import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.data.HealthResponse; /** - * Represents a request to retrieve the Cassandra health + * Represents a request to retrieve the connectivity health checks performed against the Cassandra native protocol */ -public class CassandraHealthRequest extends DecodableRequest<HealthResponse> +public class CassandraNativeHealthRequest extends DecodableRequest<HealthResponse> { /** - * Constructs a request to retrieve the Cassandra health + * Constructs a request to retrieve the Cassandra native health */ - public CassandraHealthRequest() + public CassandraNativeHealthRequest() { - super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); + this(false); + } + + /** + * Constructs a request to retrieve the Cassandra native health + * + * @param useDeprecatedHealthEndpoint {@code true} if using the deprecated endpoint, {@code false} to use + * the new endpoint + */ + @SuppressWarnings("deprecation") + public CassandraNativeHealthRequest(boolean useDeprecatedHealthEndpoint) + { + super(useDeprecatedHealthEndpoint + ? ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE + : ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE); } /** diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 669f9a4..d47e460 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -144,8 +144,9 @@ abstract class SidecarClientTest validateResponseServed(ApiEndpointsV1.HEALTH_ROUTE); } + @SuppressWarnings("deprecation") @Test - void testCassandraHealthOk() throws Exception + void testCassandraDeprecatedHealthOk() throws Exception { MockResponse response = new MockResponse() .setResponseCode(200) @@ -161,8 +162,9 @@ abstract class SidecarClientTest validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); } + @SuppressWarnings("deprecation") @Test - void testCassandraHealthNotOk() throws Exception + void testCassandraDeprecatedHealthNotOk() throws Exception { MockResponse response = new MockResponse() .setResponseCode(503) @@ -177,6 +179,72 @@ abstract class SidecarClientTest validateResponseServed(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); } + @Test + void testCassandraNativeHealthOk() throws Exception + { + MockResponse response = new MockResponse() + .setResponseCode(200) + .setHeader("content-type", "application/json") + .setBody("{\"status\":\"OK\"}"); + enqueue(response); + + HealthResponse result = client.cassandraNativeHealth().get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.status()).isEqualToIgnoringCase("OK"); + assertThat(result.isOk()).isTrue(); + + validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE); + } + + @Test + void testCassandraNativeHealthNotOk() throws Exception + { + MockResponse response = new MockResponse() + .setResponseCode(503) + .setHeader("content-type", "application/json") + .setBody("{\"status\":\"NOT_OK\"}"); + enqueue(response); + + assertThatThrownBy(() -> client.cassandraNativeHealth().get(30, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RetriesExhaustedException.class); + + validateResponseServed(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE); + } + + @Test + void testCassandraJmxHealthOk() throws Exception + { + MockResponse response = new MockResponse() + .setResponseCode(200) + .setHeader("content-type", "application/json") + .setBody("{\"status\":\"OK\"}"); + enqueue(response); + + HealthResponse result = client.cassandraJmxHealth().get(1, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.status()).isEqualTo("OK"); + assertThat(result.isOk()).isTrue(); + + validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE); + } + + @Test + void testCassandraJmxHealthNotOk() throws Exception + { + MockResponse response = new MockResponse() + .setResponseCode(503) + .setHeader("content-type", "application/json") + .setBody("{\"status\":\"NOT_OK\"}"); + enqueue(response); + + assertThatThrownBy(() -> client.cassandraJmxHealth().get(1, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RetriesExhaustedException.class); + + validateResponseServed(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE); + } + @Test void testFullSchema() throws Exception { diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 5fe2d5b..550a8c4 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -28,6 +28,8 @@ public final class ApiEndpointsV1 public static final String HEALTH = "/__health"; public static final String CASSANDRA = "/cassandra"; + public static final String NATIVE = "/native"; + public static final String JMX = "/jmx"; public static final String KEYSPACE_PATH_PARAM = ":keyspace"; public static final String TABLE_PATH_PARAM = ":table"; public static final String SNAPSHOT_PATH_PARAM = ":snapshot"; @@ -44,7 +46,14 @@ public final class ApiEndpointsV1 public static final String PER_UPLOAD = "/uploads/" + UPLOAD_ID_PATH_PARAM; public static final String HEALTH_ROUTE = API_V1 + HEALTH; + + /** + * @deprecated in favor of {@link #CASSANDRA_NATIVE_HEALTH_ROUTE} + */ + @Deprecated public static final String CASSANDRA_HEALTH_ROUTE = API_V1 + CASSANDRA + HEALTH; + public static final String CASSANDRA_NATIVE_HEALTH_ROUTE = API_V1 + CASSANDRA + NATIVE + HEALTH; + public static final String CASSANDRA_JMX_HEALTH_ROUTE = API_V1 + CASSANDRA + JMX + HEALTH; @Deprecated // NOTE: Uses singular forms of "keyspace" and "table" public static final String DEPRECATED_SNAPSHOTS_ROUTE = API_V1 + "/keyspace/" + KEYSPACE_PATH_PARAM + diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java index 2bb86d8..44ab18c 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java @@ -23,9 +23,12 @@ import java.io.IOException; import java.net.MalformedURLException; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMISocketFactory; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -66,7 +69,8 @@ public class JmxClient implements NotificationListener, Closeable private final BooleanSupplier enableSslSupplier; private final int connectionMaxRetries; private final long connectionRetryDelayMillis; - + private final Set<NotificationListener> registeredNotificationListeners = + Collections.newSetFromMap(new ConcurrentHashMap<>()); /** * Creates a new JMX client with {@link Builder} options. @@ -119,6 +123,27 @@ public class JmxClient implements NotificationListener, Closeable } } + /** + * Registers a {@link NotificationListener} to be notified whenever we encounter a JMX event. This method + * guarantees that a listener will be registered at most once. + * + * @param notificationListener the listener to be notified + */ + public void registerListener(NotificationListener notificationListener) + { + registeredNotificationListeners.add(notificationListener); + } + + /** + * Removes an already registered {@link NotificationListener} from the recipient list for JMX events. + * + * @param notificationListener the listener to be removed + */ + public void unregisterListener(NotificationListener notificationListener) + { + registeredNotificationListeners.remove(notificationListener); + } + private RMIClientSocketFactory rmiClientSocketFactory(boolean enableSsl) { return enableSsl @@ -206,10 +231,16 @@ public class JmxClient implements NotificationListener, Closeable { this.connected = justConnected; } + forwardNotification(notification, handback); } } } + private void forwardNotification(Notification notification, Object handback) + { + registeredNotificationListeners.forEach(listener -> listener.handleNotification(notification, handback)); + } + /** * @return true if JMX is connected, false otherwise */ diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java index 61dae74..f6c5244 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeSettings.java @@ -33,13 +33,6 @@ public class NodeSettings { private static final String VERSION = "version"; - public static final String RELEASE_VERSION_COLUMN_NAME = "release_version"; - public static final String PARTITIONER_COLUMN_NAME = "partitioner"; - public static final String DATA_CENTER_COLUMN_NAME = "data_center"; - public static final String RPC_ADDRESS_COLUMN_NAME = "rpc_address"; - public static final String RPC_PORT_COLUMN_NAME = "rpc_port"; - public static final String TOKENS_COLUMN_NAME = "tokens"; - @JsonProperty("releaseVersion") private final String releaseVersion; @JsonProperty("partitioner") diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java index 4523ed4..15cea06 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java @@ -20,9 +20,15 @@ package org.apache.cassandra.sidecar.cluster; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.remote.JMXConnectionNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +58,12 @@ import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.cassandra.sidecar.common.NodeSettings.DATA_CENTER_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.PARTITIONER_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RELEASE_VERSION_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RPC_ADDRESS_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.RPC_PORT_COLUMN_NAME; -import static org.apache.cassandra.sidecar.common.NodeSettings.TOKENS_COLUMN_NAME; +import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; +import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY; /** @@ -82,13 +86,16 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi private final CassandraVersionProvider versionProvider; private final CQLSessionProvider cqlSessionProvider; private final JmxClient jmxClient; + private final JmxNotificationListener notificationListener; private SimpleCassandraVersion currentVersion; private ICassandraAdapter adapter; - private volatile NodeSettings nodeSettings = null; + private volatile boolean isNativeUp = false; + private volatile NodeSettings nodeSettingsFromJmx = null; private final AtomicBoolean registered = new AtomicBoolean(false); private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false); private final InetSocketAddress localNativeTransportAddress; private volatile Host host; + private volatile boolean closed = false; /** * Constructs a new {@link CassandraAdapterDelegate} for the given {@code cassandraInstance} @@ -118,6 +125,8 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi this.versionProvider = versionProvider; this.cqlSessionProvider = session; this.jmxClient = jmxClient; + notificationListener = new JmxNotificationListener(); + this.jmxClient.registerListener(notificationListener); } private void maybeRegisterHostListener(@NotNull Session session) @@ -146,17 +155,24 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi /** * Should be called on initial connect as well as when a server comes back since it might be from an upgrade - * synchronized so we don't flood the DB with version requests + * synchronized, so we don't flood the DB with version requests * * <p>If the healthcheck determines we've changed versions, it should load the proper adapter</p> */ public void healthCheck() { + if (closed) + { + LOGGER.debug("Skipping health check for cassandraInstanceId={}. Delegate is closed", cassandraInstanceId); + return; + } + if (isHealthCheckActive.compareAndSet(false, true)) { try { - healthCheckInternal(); + jmxHealthCheck(); + nativeProtocolHealthCheck(); } finally { @@ -170,14 +186,49 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } } - private void healthCheckInternal() + /** + * Performs health checks by utilizing the JMX protocol. It uses a small subset of the exposed mBeans to + * collect information needed to populate the {@link NodeSettings} object. + */ + protected void jmxHealthCheck() + { + try + { + NodeSettings newNodeSettings = newNodeSettingsFromJmx(); + if (!newNodeSettings.equals(nodeSettingsFromJmx)) + { + // Update the nodeSettings cache + SimpleCassandraVersion previousVersion = currentVersion; + currentVersion = SimpleCassandraVersion.create(newNodeSettings.releaseVersion()); + adapter = versionProvider.cassandra(newNodeSettings.releaseVersion()) + .create(cqlSessionProvider, jmxClient, localNativeTransportAddress); + nodeSettingsFromJmx = newNodeSettings; + LOGGER.info("Cassandra version change detected (from={} to={}) for cassandraInstanceId={}. " + + "New adapter loaded={}", previousVersion, currentVersion, cassandraInstanceId, adapter); + + notifyJmxConnection(); + } + LOGGER.debug("Cassandra version {}", newNodeSettings.releaseVersion()); + } + catch (RuntimeException e) + { + LOGGER.error("Unable to connect JMX to Cassandra instance {}", cassandraInstanceId, e); + // The cassandra node JMX connectivity is unavailable. + markJmxDownAndMaybeNotifyDisconnection(); + } + } + + /** + * Performs health checks by utilizing the native protocol + */ + protected void nativeProtocolHealthCheck() { Session activeSession = cqlSessionProvider.get(); if (activeSession == null) { LOGGER.info("No local CQL session is available for cassandraInstanceId={}. " + "Cassandra instance is down presumably.", cassandraInstanceId); - markAsDownAndMaybeNotify(); + markNativeDownAndMaybeNotifyDisconnection(); return; } @@ -186,15 +237,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi try { // NOTE: We cannot use `executeLocal` here as there may be no adapter yet. - SimpleStatement healthCheckStatement = - new SimpleStatement("SELECT " - + RELEASE_VERSION_COLUMN_NAME + ", " - + PARTITIONER_COLUMN_NAME + ", " - + DATA_CENTER_COLUMN_NAME + ", " - + RPC_ADDRESS_COLUMN_NAME + ", " - + RPC_PORT_COLUMN_NAME + ", " - + TOKENS_COLUMN_NAME - + " FROM system.local"); + SimpleStatement healthCheckStatement = new SimpleStatement("SELECT release_version FROM system.local"); Metadata metadata = activeSession.getCluster().getMetadata(); host = getHost(metadata); if (host == null) @@ -205,46 +248,82 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } healthCheckStatement.setHost(host); healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE); - Row oneResult = activeSession.execute(healthCheckStatement).one(); - - // Note that within the scope of this method, we should keep on using the local releaseVersion - String releaseVersion = oneResult.getString(RELEASE_VERSION_COLUMN_NAME); - NodeSettings newNodeSettings = NodeSettings.builder() - .releaseVersion(releaseVersion) - .partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME)) - .sidecarVersion(sidecarVersion) - .datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME)) - .tokens(oneResult.getSet(TOKENS_COLUMN_NAME, String.class)) - .rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME)) - .rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME)) - .build(); - - if (!newNodeSettings.equals(nodeSettings)) - { - // Update the nodeSettings cache - SimpleCassandraVersion previousVersion = currentVersion; - currentVersion = SimpleCassandraVersion.create(releaseVersion); - adapter = versionProvider.cassandra(releaseVersion) - .create(cqlSessionProvider, jmxClient, localNativeTransportAddress); - nodeSettings = newNodeSettings; - LOGGER.info("Cassandra version change detected (from={} to={}) for cassandraInstanceId={}. " + - "New adapter loaded={}", previousVersion, currentVersion, cassandraInstanceId, adapter); + Row row = activeSession.execute(healthCheckStatement).one(); - notifyCqlConnection(); + if (row != null) + { + if (!isNativeUp) + { + isNativeUp = true; + notifyNativeConnection(); + } + } + else + { + // This should never happen but added for completeness + LOGGER.error("Expected to query the release_version from system.local but encountered null {}", + cassandraInstanceId); + // The cassandra native protocol connection to the node is down. + markNativeDownAndMaybeNotifyDisconnection(); + // Unregister the host listener. + maybeUnregisterHostListener(activeSession); } - LOGGER.debug("Cassandra version {}", releaseVersion); } catch (IllegalArgumentException | NoHostAvailableException e) { - LOGGER.error("Unexpected error connecting to Cassandra instance {}", cassandraInstanceId, e); - // The cassandra node is down. + LOGGER.error("Unexpected error querying Cassandra instance {}", cassandraInstanceId, e); + // The cassandra native protocol connection to the node is down. + markNativeDownAndMaybeNotifyDisconnection(); // Unregister the host listener. - markAsDownAndMaybeNotify(); maybeUnregisterHostListener(activeSession); } } - private Host getHost(Metadata metadata) + protected NodeSettings newNodeSettingsFromJmx() + { + LimitedStorageOperations storageOperations = + jmxClient.proxy(LimitedStorageOperations.class, STORAGE_SERVICE_OBJ_NAME); + LimitedEndpointSnitchOperations endpointSnitchOperations = + jmxClient.proxy(LimitedEndpointSnitchOperations.class, ENDPOINT_SNITCH_INFO_OBJ_NAME); + + String releaseVersion = storageOperations.getReleaseVersion(); + String partitionerName = storageOperations.getPartitionerName(); + List<String> tokens = maybeGetTokens(storageOperations); + String dataCenter = endpointSnitchOperations.getDatacenter(); + + return NodeSettings.builder() + .releaseVersion(releaseVersion) + .partitioner(partitionerName) + .sidecarVersion(sidecarVersion) + .datacenter(dataCenter) + .tokens(new LinkedHashSet<>(tokens)) + .rpcAddress(localNativeTransportAddress.getAddress()) + .rpcPort(localNativeTransportAddress.getPort()) + .build(); + } + + /** + * Attempts to return the tokens assigned to the Cassandra instance. + * + * @param storageOperations the interface to perform the operations + * @return the list of tokens assigned to the Cassandra instance + */ + protected List<String> maybeGetTokens(LimitedStorageOperations storageOperations) + { + try + { + return storageOperations.getTokens(); + } + catch (AssertionError aex) + { + // On a joining node, the JMX call will fail with an AssertionError; we catch this scenario to prevent + // failure and just return an empty list of tokens. This is technically correct, because the node, while + // joining, doesn't actually own any tokens until it has successfully completed joining. + return Collections.emptyList(); + } + } + + protected Host getHost(Metadata metadata) { if (host == null) { @@ -271,13 +350,17 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } /** - * @return a cached {@link NodeSettings}. The returned value could be null when no CQL connection is established + * Returns the cached node settings value obtained during scheduled health checks. This method does not delegate + * to the internal adapter, as the information is retrieved on the configured health check interval. + * + * @return a cached {@link NodeSettings}. The returned value will be {@code null} when no JMX connection is + * established */ @Nullable @Override public NodeSettings nodeSettings() { - return nodeSettings; + return nodeSettingsFromJmx; } public ResultSet executeLocal(Statement statement) @@ -326,7 +409,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi @Override public void onDown(Host host) { - runIfThisHost(host, this::markAsDownAndMaybeNotify); + runIfThisHost(host, this::markNativeDownAndMaybeNotifyDisconnection); } @Override @@ -345,14 +428,27 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi { } - public boolean isUp() + /** + * @return {@code true} if the native protocol is enabled on the Cassandra instance, {@code false} otherwise + */ + public boolean isNativeUp() + { + return isNativeUp; + } + + /** + * @return {@code true} if JMX connectivity has been established to the Cassandra instance, {@code false} otherwise + */ + public boolean isJmxUp() { - return nodeSettings != null; + return nodeSettingsFromJmx != null; } public void close() { - markAsDownAndMaybeNotify(); + closed = true; + markNativeDownAndMaybeNotifyDisconnection(); + markJmxDownAndMaybeNotifyDisconnection(); Session activeSession = cqlSessionProvider.getIfConnected(); if (activeSession != null) { @@ -360,6 +456,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } if (jmxClient != null) { + jmxClient.unregisterListener(notificationListener); try { jmxClient.close(); @@ -377,7 +474,15 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi return currentVersion; } - protected void notifyCqlConnection() + protected void notifyJmxConnection() + { + JsonObject connectMessage = new JsonObject() + .put("cassandraInstanceId", cassandraInstanceId); + vertx.eventBus().publish(ON_CASSANDRA_JMX_READY.address(), connectMessage); + LOGGER.info("JMX connected to cassandraInstanceId={}", cassandraInstanceId); + } + + protected void notifyNativeConnection() { JsonObject connectMessage = new JsonObject() .put("cassandraInstanceId", cassandraInstanceId); @@ -385,11 +490,11 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi LOGGER.info("CQL connected to cassandraInstanceId={}", cassandraInstanceId); } - protected void markAsDownAndMaybeNotify() + protected void markNativeDownAndMaybeNotifyDisconnection() { - NodeSettings currentNodeSettings = nodeSettings; - nodeSettings = null; - if (currentNodeSettings != null) + boolean wasCqlConnected = isNativeUp; + isNativeUp = false; + if (wasCqlConnected) { JsonObject disconnectMessage = new JsonObject() .put("cassandraInstanceId", cassandraInstanceId); @@ -398,6 +503,21 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } } + protected void markJmxDownAndMaybeNotifyDisconnection() + { + NodeSettings currentNodeSettings = nodeSettingsFromJmx; + nodeSettingsFromJmx = null; + currentVersion = null; + adapter = null; + if (currentNodeSettings != null) + { + JsonObject disconnectMessage = new JsonObject() + .put("cassandraInstanceId", cassandraInstanceId); + vertx.eventBus().publish(ON_CASSANDRA_JMX_DISCONNECTED.address(), disconnectMessage); + LOGGER.info("JMX disconnection from cassandraInstanceId={}", cassandraInstanceId); + } + } + @Nullable private <T> T fromAdapter(Function<ICassandraAdapter, T> getter) { @@ -412,4 +532,78 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi runnable.run(); } } + + /** + * A {@link NotificationListener} implementation that reacts to {@link JMXConnectionNotification} notifications + * and updates the state of the JMX connection internally. + */ + protected class JmxNotificationListener implements NotificationListener + { + @Override + public void handleNotification(Notification notification, Object handback) + { + if (notification instanceof JMXConnectionNotification) + { + JMXConnectionNotification connectNotice = (JMXConnectionNotification) notification; + String type = connectNotice.getType(); + switch (type) + { + case JMXConnectionNotification.OPENED: + // Do not notify here as we may not have set up our own delegate yet + // Instead, run the JMX Health Check, which will notify once we have + // created or updated the adapter. + jmxHealthCheck(); + break; + + case JMXConnectionNotification.CLOSED: + case JMXConnectionNotification.FAILED: + case JMXConnectionNotification.NOTIFS_LOST: + markJmxDownAndMaybeNotifyDisconnection(); + break; + + default: + LOGGER.warn("Encountered unexpected JMX notification type={}", type); + break; + } + } + } + } + + /** + * Limited StorageOperations to obtain information required for node settings. Interface visibility is public + * because JMX proxy works on public interfaces only. + */ + public interface LimitedStorageOperations + { + /** + * Fetch a string representation of the Cassandra version. + * + * @return A string representation of the Cassandra version. + */ + String getReleaseVersion(); + + /** + * @return the cluster partitioner + */ + String getPartitionerName(); + + /** + * Fetch string representations of the tokens for this node. + * + * @return a collection of tokens formatted as strings + */ + List<String> getTokens(); + } + + /** + * Limited standard Snitch info to obtain information required for node settings. Interface visibility is public + * because JMX proxy works on public interfaces only. + */ + public interface LimitedEndpointSnitchOperations + { + /** + * @return the Datacenter name depending on the respective snitch used for this node + */ + String getDatacenter(); + } } diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java index 74e1b0e..f149f15 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java @@ -87,11 +87,11 @@ class SidecarLoadBalancingPolicy implements LoadBalancingPolicy @Override public HostDistance distance(Host host) { - if (!selectedHosts.contains(host)) + if (selectedHosts.contains(host) || isLocalHost(host)) { - return HostDistance.IGNORED; + return childPolicy.distance(host); } - return childPolicy.distance(host); + return HostDistance.IGNORED; } @Override @@ -197,7 +197,7 @@ class SidecarLoadBalancingPolicy implements LoadBalancingPolicy List<Host> nonLocalHosts = partitionedHosts.get(false); if (nonLocalHosts == null || nonLocalHosts.isEmpty()) { - LOGGER.warn("Did not find any non-local hosts in allHosts"); + LOGGER.debug("Did not find any non-local hosts in allHosts"); return; } diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java index c00dc37..26da25e 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java @@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.cluster.instance; import java.util.List; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.jetbrains.annotations.Nullable; /** @@ -56,5 +57,5 @@ public interface InstanceMetadata /** * @return a {@link CassandraAdapterDelegate} specific for the instance */ - CassandraAdapterDelegate delegate(); + @Nullable CassandraAdapterDelegate delegate(); } diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java index c1a36a7..ca39b6c 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.jetbrains.annotations.Nullable; /** * Local implementation of InstanceMetadata. @@ -35,6 +36,7 @@ public class InstanceMetadataImpl implements InstanceMetadata private final int port; private final List<String> dataDirs; private final String stagingDir; + @Nullable private final CassandraAdapterDelegate delegate; protected InstanceMetadataImpl(Builder builder) @@ -78,7 +80,7 @@ public class InstanceMetadataImpl implements InstanceMetadata } @Override - public CassandraAdapterDelegate delegate() + public @Nullable CassandraAdapterDelegate delegate() { return delegate; } diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java index 4713535..509aecb 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/CassandraHealthHandler.java @@ -31,6 +31,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JMX; import static org.apache.cassandra.sidecar.server.MainModule.NOT_OK_STATUS; import static org.apache.cassandra.sidecar.server.MainModule.OK_STATUS; @@ -72,7 +73,11 @@ public class CassandraHealthHandler extends AbstractHandler<Void> Void request) { CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate != null && delegate.isUp()) + + boolean isServiceUp = context.request().path().contains(JMX) + ? delegate != null && delegate.isJmxUp() + : delegate != null && delegate.isNativeUp(); + if (isServiceUp) { context.json(OK_STATUS); } diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java index 8ed3900..468d90a 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java @@ -63,6 +63,12 @@ public class RingHandler extends AbstractHandler<RingRequest> RingRequest request) { CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + if (delegate == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } + StorageOperations storageOperations = delegate.storageOperations(); if (storageOperations == null) diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java index aea5ffc..fa00e84 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java @@ -69,6 +69,11 @@ public class TokenRangeReplicaMapHandler extends AbstractHandler<TokenRangeRepli TokenRangeReplicasRequest request) { CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + if (delegate == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } StorageOperations operations = delegate.storageOperations(); Metadata metadata = delegate.metadata(); diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java index bc72cc5..264db54 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java @@ -60,6 +60,11 @@ public class NodeSettingsHandler extends AbstractHandler<Void> Void request) { CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + if (delegate == null) + { + context.fail(cassandraServiceUnavailable()); + return; + } NodeSettings nodeSettings = delegate.nodeSettings(); if (nodeSettings == null) { diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java index 97b8290..8f60820 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java @@ -165,6 +165,11 @@ public class SSTableImportHandler extends AbstractHandler<SSTableImportRequest> private Future<Void> importSSTablesAsync(SSTableImporter.ImportOptions importOptions) { CassandraAdapterDelegate cassandra = metadataFetcher.delegate(importOptions.host()); + if (cassandra == null) + { + return Future.failedFuture(cassandraServiceUnavailable()); + } + TableOperations tableOperations = cassandra.tableOperations(); if (tableOperations == null) diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index fec3f4c..9e11cb8 100644 --- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -162,9 +162,17 @@ public class MainModule extends AbstractModule router.get(ApiEndpointsV1.HEALTH_ROUTE) .handler(context -> context.json(OK_STATUS)); + // Backwards compatibility for the Cassandra health endpoint + //noinspection deprecation router.get(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE) .handler(cassandraHealthHandler); + router.get(ApiEndpointsV1.CASSANDRA_NATIVE_HEALTH_ROUTE) + .handler(cassandraHealthHandler); + + router.get(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE) + .handler(cassandraHealthHandler); + //noinspection deprecation router.get(ApiEndpointsV1.DEPRECATED_COMPONENTS_ROUTE) .handler(streamSSTableComponentHandler) diff --git a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java index d15423e..51c94c1 100644 --- a/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java +++ b/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java @@ -69,6 +69,18 @@ public enum SidecarServerEvents * for the Sidecar-managed Cassandra instances are available. */ ON_ALL_CASSANDRA_CQL_READY, + + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when a JMX connection for + * a given instance has been established. The instance identifier will be passed as part of the message. + */ + ON_CASSANDRA_JMX_READY, + + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when a JMX connection for + * a given instance has been disconnected. The instance identifier will be passed as part of the message. + */ + ON_CASSANDRA_JMX_DISCONNECTED, ; public String address() diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java index 0c51bf0..518af51 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java @@ -75,6 +75,7 @@ public class InstanceMetadataFetcher * @return the {@link CassandraAdapterDelegate} for the given {@code host}, or the first instance when {@code host} * is {@code null} */ + @Nullable public CassandraAdapterDelegate delegate(String host) { return instance(host).delegate(); diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java index eb3b29a..02ba0a2 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java @@ -207,6 +207,12 @@ public class SSTableImporter ImportOptions options = pair.getValue(); CassandraAdapterDelegate cassandra = metadataFetcher.delegate(options.host); + if (cassandra == null) + { + promise.fail(HttpExceptions.cassandraServiceUnavailable()); + continue; + } + TableOperations tableOperations = cassandra.tableOperations(); if (tableOperations == null) diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java index 2a3d1b4..54d54a5 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java @@ -62,7 +62,7 @@ public class SimpleCassandraVersion implements Comparable<SimpleCassandraVersion * @throws IllegalArgumentException if the provided string does not * represent a version */ - public static SimpleCassandraVersion create(String version) + public static SimpleCassandraVersion create(String version) throws IllegalArgumentException { String stripped = version.toUpperCase().replace(SNAPSHOT, ""); Matcher matcher = PATTERN.matcher(stripped); diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java similarity index 56% rename from client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java rename to src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java index 2061c89..4733817 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CassandraHealthRequest.java +++ b/src/test/integration/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java @@ -16,31 +16,15 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.client.request; - -import io.netty.handler.codec.http.HttpMethod; -import org.apache.cassandra.sidecar.common.ApiEndpointsV1; -import org.apache.cassandra.sidecar.common.data.HealthResponse; +package org.apache.cassandra.distributed.impl; /** - * Represents a request to retrieve the Cassandra health + * Utility class to interact with protected methods in AbstractCluster */ -public class CassandraHealthRequest extends DecodableRequest<HealthResponse> +public class AbstractClusterUtils { - /** - * Constructs a request to retrieve the Cassandra health - */ - public CassandraHealthRequest() - { - super(ApiEndpointsV1.CASSANDRA_HEALTH_ROUTE); - } - - /** - * {@inheritDoc} - */ - @Override - public HttpMethod method() + public static InstanceConfig createInstanceConfig(AbstractCluster cluster, int nodeNumber) { - return HttpMethod.GET; + return cluster.createInstanceConfig(nodeNumber); } } diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java index 04bddec..d77f5f8 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java @@ -53,7 +53,7 @@ public class CQLSessionProviderTest extends IntegrationTestBase void testCqlSessionProviderWorksAsExpected(VertxTestContext context, CassandraTestContext cassandraTestContext) throws Exception { - UpgradeableCluster cluster = cassandraTestContext.getCluster(); + UpgradeableCluster cluster = cassandraTestContext.cluster(); testWithClient(context, false, webClient -> { // To start, both instances are stopped, so we should get 503s for both buildInstanceHealthRequest(webClient, "1") @@ -129,7 +129,7 @@ public class CQLSessionProviderTest extends IntegrationTestBase { return webClient.get(server.actualPort(), "localhost", - "/api/v1/cassandra/__health?instanceId=" + instanceId) + "/api/v1/cassandra/native/__health?instanceId=" + instanceId) .as(BodyCodec.string()); } diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java new file mode 100644 index 0000000..9bc1c82 --- /dev/null +++ b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateIntegrationTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.impl.ConcurrentHashSet; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.Timeout; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; +import org.apache.cassandra.testing.CassandraIntegrationTest; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Ensures the Delegate works correctly + */ +@ExtendWith(VertxExtension.class) +class DelegateIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest() + void testCorrectVersionIsEnabled() + { + CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() + .instanceFromId(1) + .delegate(); + assertThat(delegate).isNotNull(); + SimpleCassandraVersion version = delegate.version(); + assertThat(version).isNotNull(); + assertThat(version.major).isEqualTo(sidecarTestContext.version.major); + assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor); + assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version); + } + + @CassandraIntegrationTest() + void testHealthCheck(VertxTestContext context) + { + EventBus eventBus = vertx.eventBus(); + Checkpoint cqlReady = context.checkpoint(); + Checkpoint cqlDisconnected = context.checkpoint(); + + CassandraAdapterDelegate adapterDelegate = sidecarTestContext.instancesConfig() + .instanceFromId(1) + .delegate(); + assertThat(adapterDelegate).isNotNull(); + assertThat(adapterDelegate.isJmxUp()).as("jmx health check succeeds").isTrue(); + assertThat(adapterDelegate.isNativeUp()).as("native health check succeeds").isTrue(); + + // Set up test listeners before disabling/enabling binary to avoid race conditions + // where the event happens before the consumer is registered. + eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), (Message<JsonObject> message) -> { + int instanceId = message.body().getInteger("cassandraInstanceId"); + CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() + .instanceFromId(instanceId) + .delegate(); + + assertThat(delegate).isNotNull(); + assertThat(delegate.isNativeUp()).as("health check fails after binary has been disabled").isFalse(); + cqlDisconnected.flag(); + sidecarTestContext.cluster().get(1).nodetool("enablebinary"); + }); + + eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), (Message<JsonObject> reconnectMessage) -> { + int instanceId = reconnectMessage.body().getInteger("cassandraInstanceId"); + CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() + .instanceFromId(instanceId) + .delegate(); + assertThat(delegate).isNotNull(); + assertThat(delegate.isNativeUp()).as("health check succeeds after binary has been enabled") + .isTrue(); + cqlReady.flag(); + }); + + // Disable binary + NodeToolResult nodetoolResult = sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary"); + assertThat(nodetoolResult.getRc()) + .withFailMessage("Failed to disable binary:\nstdout:" + nodetoolResult.getStdout() + + "\nstderr: " + nodetoolResult.getStderr()) + .isEqualTo(0); + // NOTE: enable binary happens inside the disable binary handler above, which then will trigger the + // cqlReady flag. + } + + @CassandraIntegrationTest(nodesPerDc = 3) + void testAllInstancesHealthCheck(VertxTestContext context) + { + EventBus eventBus = vertx.eventBus(); + Checkpoint allCqlReady = context.checkpoint(); + + Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3); + eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), (Message<JsonObject> message) -> { + JsonArray cassandraInstanceIds = message.body().getJsonArray("cassandraInstanceIds"); + assertThat(cassandraInstanceIds).hasSize(3); + assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size())) + .allMatch(expectedCassandraInstanceIds::contains); + + allCqlReady.flag(); + }); + } + + @CassandraIntegrationTest(nodesPerDc = 3) + void testStoppingAnInstance(VertxTestContext context) + { + EventBus eventBus = vertx.eventBus(); + Checkpoint allCqlReady = context.checkpoint(); + Checkpoint cqlDisconnected = context.checkpoint(); + Checkpoint jmxDisconnected = context.checkpoint(); + + Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3); + eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), (Message<JsonObject> message) -> { + JsonArray cassandraInstanceIds = message.body().getJsonArray("cassandraInstanceIds"); + assertThat(cassandraInstanceIds).hasSize(3); + assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size())) + .allMatch(expectedCassandraInstanceIds::contains); + + allCqlReady.flag(); + + // Stop instance 2 + ClusterUtils.stopUnchecked(sidecarTestContext.cluster().get(2)); + }); + + eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), (Message<JsonObject> message) -> { + Integer instanceId = message.body().getInteger("cassandraInstanceId"); + assertThat(instanceId).isEqualTo(2); + + buildNativeHealthRequest(client, instanceId).send(assertHealthCheckNotOk(context, cqlDisconnected)); + }); + + eventBus.localConsumer(ON_CASSANDRA_JMX_DISCONNECTED.address(), (Message<JsonObject> message) -> { + Integer instanceId = message.body().getInteger("cassandraInstanceId"); + assertThat(instanceId).isEqualTo(2); + + buildJmxHealthRequest(client, instanceId).send(assertHealthCheckNotOk(context, jmxDisconnected)); + }); + } + + @Timeout(value = 2, timeUnit = TimeUnit.MINUTES) + @CassandraIntegrationTest(nodesPerDc = 2, newNodesPerDc = 1, startCluster = false) + public void testChangingClusterSize(VertxTestContext context) throws InterruptedException + { + EventBus eventBus = vertx.eventBus(); + + Checkpoint jmxConnected = context.checkpoint(3); + Checkpoint nativeConnected = context.checkpoint(3); + Checkpoint jmxNotConnected = context.checkpoint(); + Checkpoint nativeNotConnected = context.checkpoint(); + + CountDownLatch firstTwoConnected = new CountDownLatch(2); + + Set<Integer> nativeConnectedInstances = new ConcurrentHashSet<>(); + Set<Integer> jmxConnectedInstances = new ConcurrentHashSet<>(); + + eventBus.localConsumer(ON_CASSANDRA_JMX_READY.address(), (Message<JsonObject> message) -> { + Integer instanceId = message.body().getInteger("cassandraInstanceId"); + logger.info("DBG: Received JMX connection notification for {}", instanceId); + // make sure the instance wasn't already in the set before validating + if (jmxConnectedInstances.add(instanceId)) + { + jmxConnected.flag(); + validateJmxConnections(context, jmxConnectedInstances, jmxNotConnected, firstTwoConnected); + } + }); + + eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), (Message<JsonObject> message) -> { + Integer instanceId = message.body().getInteger("cassandraInstanceId"); + logger.info("DBG: Received native connection notification for {}", instanceId); + buildNativeHealthRequest(client, instanceId).send(assertHealthCheckOk(context, nativeConnected)); + // make sure the instance wasn't already in the set before validating/flagging + if (nativeConnectedInstances.add(instanceId)) + { + nativeConnected.flag(); + validateNativeConnections(context, nativeNotConnected, firstTwoConnected, nativeConnectedInstances); + } + }); + + // Now that the event listeners are set up, start the cluster + sidecarTestContext.cluster().startup(); + + // Wait for the first two instances to get connected + assertThat(firstTwoConnected.await(2, TimeUnit.MINUTES)).isTrue(); + + // now start the 3rd instance - the test will complete when it's connected + addNewInstance(); + } + + private void validateJmxConnections(VertxTestContext context, Set<Integer> jmxConnectedInstances, + Checkpoint notOkCheckpoint, CountDownLatch firstTwoConnected) + { + int upInstanceCount = jmxConnectedInstances.size(); + if (upInstanceCount == 2) + { + buildJmxHealthRequest(client, 3).send(assertHealthCheckNotOk(context, notOkCheckpoint)); + logger.info("DBG: First two instances connected via JMX, third is down"); + firstTwoConnected.countDown(); + } + else if (upInstanceCount == 3) + { + assertThat(jmxConnectedInstances).containsExactly(1, 2, 3); + } + } + + private void validateNativeConnections(VertxTestContext context, + Checkpoint notOkCheckpoint, + CountDownLatch firstTwoConnected, + Set<Integer> nativeConnectedInstances) + { + int upInstanceCount = nativeConnectedInstances.size(); + if (upInstanceCount == 2) + { + assertThat(nativeConnectedInstances).containsExactly(1, 2); + buildNativeHealthRequest(client, 3).send(assertHealthCheckNotOk(context, notOkCheckpoint)); + logger.info("DBG: First two instances connected via native, third is down"); + firstTwoConnected.countDown(); + } + else if (upInstanceCount == 3) + { + assertThat(nativeConnectedInstances).containsExactly(1, 2, 3); + } + } + + private static Handler<AsyncResult<HttpResponse<Buffer>>> assertHealthCheckOk(VertxTestContext context, + Checkpoint checkpoint) + { + return context.succeeding(response -> context.verify(() -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK"); + })); + } + + private Handler<AsyncResult<HttpResponse<Buffer>>> assertHealthCheckNotOk(VertxTestContext context, + Checkpoint checkpoint) + { + return context.succeeding(response -> context.verify(() -> { + assertThat(response.statusCode()).isEqualTo(SERVICE_UNAVAILABLE.code()); + assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("NOT_OK"); + checkpoint.flag(); + })); + } + + private HttpRequest<Buffer> buildNativeHealthRequest(WebClient webClient, int instanceId) + { + return webClient.get(server.actualPort(), + "localhost", + "/api/v1/cassandra/native/__health?instanceId=" + instanceId); + } + + private HttpRequest<Buffer> buildJmxHealthRequest(WebClient webClient, int instanceId) + { + return webClient.get(server.actualPort(), + "localhost", + "/api/v1/cassandra/jmx/__health?instanceId=" + instanceId); + } + + private void addNewInstance() + { + UpgradeableCluster cluster = sidecarTestContext.cluster(); + IUpgradeableInstance newInstance = ClusterUtils.addInstance(cluster, cluster.get(1).config(), config -> { + config.set("auto_bootstrap", true); + config.with(Feature.GOSSIP, + Feature.JMX, + Feature.NATIVE_PROTOCOL); + }); + newInstance.startup(cluster); + } +} diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java deleted file mode 100644 index 7586a22..0000000 --- a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.sidecar.common; - -import java.util.Set; -import java.util.stream.IntStream; - -import com.google.common.collect.ImmutableSet; -import org.junit.jupiter.api.extension.ExtendWith; - -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.junit5.Checkpoint; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; -import org.apache.cassandra.distributed.api.NodeToolResult; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; -import org.apache.cassandra.sidecar.testing.IntegrationTestBase; -import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; -import org.apache.cassandra.testing.CassandraIntegrationTest; - -import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY; -import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED; -import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Ensures the Delegate works correctly - */ -@ExtendWith(VertxExtension.class) -class DelegateTest extends IntegrationTestBase -{ - @CassandraIntegrationTest(jmx = false) - void testCorrectVersionIsEnabled() - { - CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() - .instanceFromId(1) - .delegate(); - SimpleCassandraVersion version = delegate.version(); - assertThat(version).isNotNull(); - assertThat(version.major).isEqualTo(sidecarTestContext.version.major); - assertThat(version.minor).isEqualTo(sidecarTestContext.version.minor); - assertThat(version).isGreaterThanOrEqualTo(sidecarTestContext.version); - } - - @CassandraIntegrationTest(jmx = false) - void testHealthCheck(VertxTestContext context) - { - EventBus eventBus = vertx.eventBus(); - Checkpoint cqlReady = context.checkpoint(); - Checkpoint cqlDisconnected = context.checkpoint(); - - CassandraAdapterDelegate adapterDelegate = sidecarTestContext.instancesConfig() - .instanceFromId(1) - .delegate(); - assertThat(adapterDelegate.isUp()).as("health check succeeds").isTrue(); - - // Set up test listeners before disabling/enabling binary to avoid race conditions - // where the event happens before the consumer is registered. - eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), (Message<JsonObject> message) -> { - int instanceId = message.body().getInteger("cassandraInstanceId"); - CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() - .instanceFromId(instanceId) - .delegate(); - - assertThat(delegate.isUp()).as("health check fails after binary has been disabled").isFalse(); - cqlDisconnected.flag(); - sidecarTestContext.cluster().get(1).nodetool("enablebinary"); - }); - - eventBus.localConsumer(ON_CASSANDRA_CQL_READY.address(), (Message<JsonObject> reconnectMessage) -> { - int instanceId = reconnectMessage.body().getInteger("cassandraInstanceId"); - CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig() - .instanceFromId(instanceId) - .delegate(); - assertThat(delegate.isUp()).as("health check succeeds after binary has been enabled") - .isTrue(); - cqlReady.flag(); - }); - - // Disable binary - NodeToolResult nodetoolResult = sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary"); - assertThat(nodetoolResult.getRc()) - .withFailMessage("Failed to disable binary:\nstdout:" + nodetoolResult.getStdout() - + "\nstderr: " + nodetoolResult.getStderr()) - .isEqualTo(0); - // NOTE: enable binary happens inside the disable binary handler above, which then will trigger the - // cqlReady flag. - } - - @CassandraIntegrationTest(jmx = false, nodesPerDc = 3) - void testAllInstancesHealthCheck(VertxTestContext context) - { - EventBus eventBus = vertx.eventBus(); - Checkpoint allCqlReady = context.checkpoint(); - - Set<Integer> expectedCassandraInstanceIds = ImmutableSet.of(1, 2, 3); - eventBus.localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), (Message<JsonObject> message) -> { - JsonArray cassandraInstanceIds = message.body().getJsonArray("cassandraInstanceIds"); - assertThat(cassandraInstanceIds).hasSize(3); - assertThat(IntStream.rangeClosed(1, cassandraInstanceIds.size())) - .allMatch(expectedCassandraInstanceIds::contains); - - allCqlReady.flag(); - }); - } -} diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java index e643918..e152850 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java @@ -46,7 +46,7 @@ public class JmxClientTest assertThat(opMode).isNotNull(); assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT"); - IUpgradeableInstance instance = context.getCluster().getFirstRunningInstance(); + IUpgradeableInstance instance = context.cluster().getFirstRunningInstance(); IInstanceConfig config = instance.config(); assertThat(jmxClient.host()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress()); assertThat(jmxClient.port()).isEqualTo(config.jmxPort()); @@ -101,7 +101,7 @@ public class JmxClientTest private static JmxClient createJmxClient(CassandraTestContext context) { - IUpgradeableInstance instance = context.getCluster().getFirstRunningInstance(); + IUpgradeableInstance instance = context.cluster().getFirstRunningInstance(); IInstanceConfig config = instance.config(); return JmxClient.builder() .host(config.broadcastAddress().getAddress().getHostAddress()) diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java index 8a91324..4b0ab8c 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java @@ -52,7 +52,7 @@ class GossipInfoHandlerIntegrationTest extends IntegrationTestBase assertThat(gossipInfo.generation()).isNotNull(); assertThat(gossipInfo.heartbeat()).isNotNull(); assertThat(gossipInfo.hostId()).isNotNull(); - String releaseVersion = cassandraTestContext.getCluster().getFirstRunningInstance() + String releaseVersion = cassandraTestContext.cluster().getFirstRunningInstance() .getReleaseVersionString(); assertThat(gossipInfo.releaseVersion()).startsWith(releaseVersion); context.completeNow(); diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java index 51f6828..80d86e2 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java @@ -78,7 +78,7 @@ class RingHandlerIntegrationTest extends IntegrationTestBase @CassandraIntegrationTest(gossip = true) void ringFailsWhenGossipIsDisabled(CassandraTestContext context, VertxTestContext testContext) throws Exception { - int disableGossip = context.getCluster().getFirstRunningInstance().nodetool("disablegossip"); + int disableGossip = context.cluster().getFirstRunningInstance().nodetool("disablegossip"); assertThat(disableGossip).isEqualTo(0); String testRoute = "/api/v1/cassandra/ring"; testWithClient(testContext, client -> { diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java index 6331bd2..576ad2f 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BasicGossipDisabledTest.java @@ -39,7 +39,7 @@ public class BasicGossipDisabledTest extends BaseTokenRangeIntegrationTest void tokenRangeEndpointFailsWhenGossipIsDisabled(CassandraTestContext context, VertxTestContext testContext) throws Exception { - int disableGossip = context.getCluster().getFirstRunningInstance().nodetool("disablegossip"); + int disableGossip = context.cluster().getFirstRunningInstance().nodetool("disablegossip"); assertThat(disableGossip).isEqualTo(0); retrieveMappingWithKeyspace(testContext, TEST_KEYSPACE, response -> { assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE.code()); diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java index 0faa420..35a64bb 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java +++ b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java @@ -25,12 +25,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import com.datastax.driver.core.Session; import io.vertx.core.Vertx; import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.IInstanceConfig; -import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.impl.AbstractClusterUtils; +import org.apache.cassandra.distributed.impl.InstanceConfig; import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.sidecar.adapters.base.CassandraFactory; import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl; @@ -46,6 +48,7 @@ import org.apache.cassandra.sidecar.common.utils.SidecarVersionProvider; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; import org.apache.cassandra.testing.AbstractCassandraTestContext; +import org.jetbrains.annotations.NotNull; import static org.assertj.core.api.Assertions.assertThat; @@ -208,13 +211,13 @@ public class CassandraSidecarTestContext implements AutoCloseable UpgradeableCluster cluster = cluster(); List<InstanceMetadata> metadata = new ArrayList<>(); jmxClients = new ArrayList<>(); - List<InetSocketAddress> addresses = buildContactList(cluster); + List<InstanceConfig> configs = buildInstanceConfigs(cluster); + List<InetSocketAddress> addresses = buildContactList(configs); sessionProvider = new CQLSessionProviderImpl(addresses, addresses, 500, null, 0, SharedExecutorNettyOptions.INSTANCE); - for (int i = 0; i < numInstancesToManage; i++) + for (int i = 0; i < configs.size(); i++) { - IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names; - IInstanceConfig config = instance.config(); + IInstanceConfig config = configs.get(i); String hostName = JMXUtil.getJmxHost(config); int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); // The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized. @@ -255,15 +258,25 @@ public class CassandraSidecarTestContext implements AutoCloseable return new InstancesConfigImpl(metadata, dnsResolver); } - private List<InetSocketAddress> buildContactList(UpgradeableCluster cluster) + private List<InetSocketAddress> buildContactList(List<InstanceConfig> configs) { - return cluster.stream() - .map(i -> new InetSocketAddress(i.config().broadcastAddress().getAddress(), - tryGetIntConfig(i.config(), "native_transport_port", 9042))) - .limit(numInstancesToManage) + // Always return the complete list of addresses even if the cluster isn't yet that large + // this way, we populate the entire local instance list + return configs.stream() + .map(config -> new InetSocketAddress(config.broadcastAddress().getAddress(), + tryGetIntConfig(config, "native_transport_port", 9042))) .collect(Collectors.toList()); } + @NotNull + private List<InstanceConfig> buildInstanceConfigs(UpgradeableCluster cluster) + { + return IntStream.range(1, numInstancesToManage + 1) + .mapToObj(nodeNum -> + AbstractClusterUtils.createInstanceConfig(cluster, nodeNum)) + .collect(Collectors.toList()); + } + /** * A listener for {@link InstancesConfig} state changes */ diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java index c68eda2..64464dc 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java +++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java @@ -103,7 +103,8 @@ public abstract class IntegrationTestBase if (sidecarTestContext.isClusterBuilt()) { - MessageConsumer<Object> cqlReadyConsumer = vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address()); + MessageConsumer<JsonObject> cqlReadyConsumer = vertx.eventBus() + .localConsumer(ON_CASSANDRA_CQL_READY.address()); cqlReadyConsumer.handler(message -> { cqlReadyConsumer.unregister(); context.completeNow(); @@ -166,7 +167,8 @@ public abstract class IntegrationTestBase .instanceFromId(1) .delegate(); - if (delegate.isUp() || !waitForCluster) + assertThat(delegate).isNotNull(); + if (delegate.isNativeUp() || !waitForCluster) { tester.accept(client); } diff --git a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java index 4aaa055..c63809f 100644 --- a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java +++ b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java @@ -87,6 +87,6 @@ public abstract class AbstractCassandraTestContext implements AutoCloseable public int clusterSize() { - return annotation.numDcs() * annotation.nodesPerDc(); + return annotation.numDcs() * (annotation.nodesPerDc() + annotation.newNodesPerDc()); } } diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java index c52471d..29fa433 100644 --- a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java +++ b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java @@ -112,7 +112,7 @@ public @interface CassandraIntegrationTest * or {@link ConfigurableCassandraTestContext#configureAndStartCluster(Consumer)} to get the cluster. * NOTE: This cluster object must be closed by the test as the framework doesn't have access to it. * If true (the default), the test should take an instance of {@link CassandraTestContext} - * {@link CassandraTestContext#getCluster()} will contain the built cluster. + * {@link CassandraTestContext#cluster()} will contain the built cluster. * @return true if the cluster should be built by the test framework, false otherwise */ boolean buildCluster() default true; diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java index 8498776..09afe24 100644 --- a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java +++ b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java @@ -43,9 +43,4 @@ public class CassandraTestContext extends AbstractCassandraTestContext + ", cluster=" + cluster + '}'; } - - public UpgradeableCluster getCluster() - { - return cluster; - } } diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java index d639944..0acd069 100644 --- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java +++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java @@ -132,7 +132,7 @@ public class TestModule extends AbstractModule .tokens(Collections.singleton("testToken")) .build()); } - when(delegate.isUp()).thenReturn(isUp); + when(delegate.isNativeUp()).thenReturn(isUp); when(instanceMeta.delegate()).thenReturn(delegate); return instanceMeta; } diff --git a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java index f5001af..24536cd 100644 --- a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java @@ -371,9 +371,10 @@ class ServerSSLTest assertThat(throwable).isNotNull() .isInstanceOf(SSLHandshakeException.class) .hasMessageContaining("Failed to create SSL connection") - .hasCauseInstanceOf(SSLHandshakeException.class) - .hasRootCauseMessage("No appropriate protocol (protocol is disabled " + - "or cipher suites are inappropriate)"); + .hasCauseInstanceOf(SSLHandshakeException.class); + assertThat(throwable.getCause().getMessage()) + .containsAnyOf("No appropriate protocol (protocol is disabled or cipher suites are inappropriate)", + "Received fatal alert: protocol_version"); context.completeNow(); })); } diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java index 3ea3d84..28a44b0 100644 --- a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java +++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java @@ -36,6 +36,7 @@ import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl; import org.apache.cassandra.sidecar.common.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.JmxClient; import org.apache.cassandra.sidecar.common.MockCassandraFactory; import org.apache.cassandra.sidecar.common.dns.DnsResolver; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; @@ -110,8 +111,9 @@ public class SnapshotUtils if (delegate == null) { - delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, cqlSessionProvider1, null, null, - "localhost1", 9042); + JmxClient mockJmxClient = mock(JmxClient.class); + delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, cqlSessionProvider1, mockJmxClient, + null, "localhost1", 9042); } InstanceMetadataImpl localhost = InstanceMetadataImpl.builder() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org