This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new fed08b5b CASSSIDECAR-266: Add lifecycle APIs for starting and stopping
Cassandra (#256)
fed08b5b is described below
commit fed08b5bb7474f3fb4caecce4acb0838c9b15371
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Fri Sep 26 18:03:23 2025 -0400
CASSSIDECAR-266: Add lifecycle APIs for starting and stopping Cassandra
(#256)
Patch by Andrés Beck-Ruiz, Paulo Motta; Reviewed by Francisco Guerrero,
Yifan Cai for CASSSIDECAR-266
---------
Co-authored-by: Paulo Motta <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/sidecar/common/ApiEndpointsV1.java | 3 +
.../cassandra/sidecar/common/data/Lifecycle.java | 93 ++++++
.../common/request/LifecycleInfoRequest.java | 46 +++
.../common/request/LifecycleUpdateRequest.java | 55 ++++
.../common/response/LifecycleInfoResponse.java | 125 +++++++++
.../cassandra/sidecar/client/RequestContext.java | 26 ++
.../cassandra/sidecar/client/SidecarClient.java | 40 +++
.../sidecar/client/SidecarClientTest.java | 47 ++++
.../lifecycle/InJvmDTestLifecycleProvider.java | 76 +++++
.../testing/SharedClusterIntegrationTestBase.java | 71 +++++
.../InJvmLifecycleProviderIntegrationTest.java | 86 ++++++
.../LifecycleProviderIntegrationTester.java | 210 ++++++++++++++
.../acl/authorization/BasicPermissions.java | 4 +
.../sidecar/config/LifecycleConfiguration.java | 37 +++
.../sidecar/config/SidecarConfiguration.java | 5 +
.../config/yaml/LifecycleConfigurationImpl.java | 70 +++++
.../config/yaml/SidecarConfigurationImpl.java | 24 ++
.../exceptions/LifecycleTaskConflictException.java | 30 ++
.../sidecar/handlers/LifecycleInfoHandler.java | 90 ++++++
.../sidecar/handlers/LifecycleUpdateHandler.java | 117 ++++++++
.../sidecar/lifecycle/LifecycleManager.java | 196 +++++++++++++
.../sidecar/lifecycle/LifecycleProvider.java | 50 ++++
.../lifecycle/ProcessLifecycleProvider.java | 55 ++++
.../cassandra/sidecar/modules/LifecycleModule.java | 140 +++++++++
.../cassandra/sidecar/modules/SidecarModules.java | 11 +-
.../modules/multibindings/VertxRouteMapKeys.java | 10 +
.../sidecar/handlers/LifecycleInfoHandlerTest.java | 168 +++++++++++
.../handlers/LifecycleUpdateHandlerTest.java | 205 ++++++++++++++
.../sidecar/lifecycle/LifecycleManagerTest.java | 312 +++++++++++++++++++++
30 files changed, 2398 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index cf443a80..f57d67b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Add lifecycle APIs for starting and stopping Cassandra (CASSSIDECAR-266)
* Implementation of CassandraClusterSchemaMonitor (CASSSIDECAR-245)
* Sidecar endpoint for vending statistics related to compaction
(CASSSIDECAR-329)
* Update logging dependencies (CASSSIDECAR-337)
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 52bce4a0..d67e4cdc 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -167,6 +167,9 @@ public final class ApiEndpointsV1
// when a user visits /openapi
public static final String OPENAPI_HTML_ROUTE = "/openapi/*";
+ // Lifecycle APIs
+ public static final String LIFECYCLE_ROUTE = API_V1 + CASSANDRA +
"/lifecycle";
+
private ApiEndpointsV1()
{
throw new IllegalStateException(getClass() + " is a constants
container and shall not be instantiated");
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
new file mode 100644
index 00000000..dc329df9
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import static
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload.State;
+
+/**
+ * Utilities for representing the lifecycle state of a Cassandra instance and
the status of
+ * lifecycle operations executed by the LifecycleManager.
+ */
+public final class Lifecycle
+{
+ /**
+ * Represents the lifecycle state of a Cassandra instance.
+ */
+ public enum CassandraState
+ {
+ /**
+ * The state when a desired lifecycle state has not been submitted yet
+ */
+ UNKNOWN,
+ /**
+ * The state when a Cassandra process is running
+ */
+ RUNNING,
+ /**
+ * The state when a Cassandra process is not running
+ */
+ STOPPED;
+
+ public boolean isRunning()
+ {
+ return this == RUNNING;
+ }
+
+ public static CassandraState fromNodeCommandState(State state)
+ {
+ return state == State.START ? RUNNING : STOPPED;
+ }
+ }
+
+ /**
+ * Represents the status of a LifecycleManager operation to converge the
current lifecycle state
+ * of a Cassandra instance to the desired state.
+ */
+ public enum OperationStatus
+ {
+ /**
+ * The status when a desired lifecycle state has not been submitted yet
+ */
+ UNDEFINED,
+ /**
+ * The status when the current lifecycle state of an instance matches
the desired lifecycle state
+ */
+ CONVERGED,
+ /**
+ * The status when the current lifecycle state of an instance does not
match the desired lifecycle state,
+ * and there is no operation in progress to match the states
+ */
+ DIVERGED,
+ /**
+ * The status when the current lifecycle state of an instance does not
match the desired lifecycle state,
+ * and there is an operation in progress to start or stop the instance
to match the states
+ */
+ CONVERGING;
+
+ public boolean isCompleted()
+ {
+ return this == CONVERGED || this == DIVERGED;
+ }
+
+ public boolean isConverged()
+ {
+ return this == CONVERGED;
+ }
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
new file mode 100644
index 00000000..4953506b
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+
+/**
+ * Represents a request to retrieve the Cassandra lifecycle information
+ */
+public class LifecycleInfoRequest extends JsonRequest<LifecycleInfoResponse>
+{
+ /**
+ * Constructs a request to retrieve the Cassandra lifecycle information
+ */
+ public LifecycleInfoRequest()
+ {
+ super(ApiEndpointsV1.LIFECYCLE_ROUTE);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.GET;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
new file mode 100644
index 00000000..cb945b2c
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+
+/**
+ * Lifecycle update request
+ */
+public class LifecycleUpdateRequest extends JsonRequest<LifecycleInfoResponse>
+{
+ private final NodeCommandRequestPayload requestPayload;
+
+ /**
+ * Constructs a lifecycle update request with the provided parameters
+ *
+ * @param state "start" or "stop" indicating the desired operation
+ */
+ public LifecycleUpdateRequest(NodeCommandRequestPayload.State state)
+ {
+ super(ApiEndpointsV1.LIFECYCLE_ROUTE);
+ this.requestPayload = new NodeCommandRequestPayload(state.toValue());
+ }
+
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.PUT;
+ }
+
+ @Override
+ public Object requestBody()
+ {
+ return requestPayload;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
new file mode 100644
index 00000000..a2a859f3
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+
+/**
+ * A class representing a response for the {@code LifecycleInfoRequest}.
+ */
+public class LifecycleInfoResponse
+{
+
+ private final CassandraState currentState;
+ private final CassandraState desiredState;
+ private final OperationStatus status;
+ private final String lastUpdate;
+
+ /**
+ * Constructs a {@link LifecycleInfoResponse} object with the {@code
currentState}, {@code intendedState}, {@code result},
+ * and {@code message}
+ *
+ * @param currentState the current state of the Cassandra node
+ * @param desiredState the intended state of the Cassandra node
+ * @param status the result of the last lifecycle operation
+ * @param lastUpdate a message providing additional context about the last
lifecycle operation
+ */
+ @JsonCreator
+ public LifecycleInfoResponse(@JsonProperty("current_state") CassandraState
currentState,
+ @JsonProperty("desired_state") CassandraState
desiredState,
+ @JsonProperty("status") OperationStatus
status,
+ @JsonProperty("last_update") String
lastUpdate)
+ {
+ this.currentState = Objects.requireNonNull(currentState, "State must
be non-null");
+ this.desiredState = desiredState;
+ this.status = status;
+ this.lastUpdate = lastUpdate;
+ }
+
+ /**
+ * @return the current state of the Cassandra node
+ */
+ @JsonProperty("current_state")
+ public CassandraState currentState()
+ {
+ return currentState;
+ }
+
+ /**
+ * @return the intended state of the Cassandra node
+ */
+ @JsonProperty("desired_state")
+ public CassandraState desiredState()
+ {
+ return desiredState;
+ }
+
+ /**
+ * @return the status of the last lifecycle operation
+ */
+ @JsonProperty("status")
+ public OperationStatus status()
+ {
+ return status;
+ }
+
+ /**
+ * @return message providing additional context about the last lifecycle
operation
+ */
+ @JsonProperty("last_update")
+ public String lastUpdate()
+ {
+ return lastUpdate;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof LifecycleInfoResponse)) return false;
+ LifecycleInfoResponse that = (LifecycleInfoResponse) o;
+ return currentState == that.currentState
+ && desiredState == that.desiredState
+ && status == that.status
+ && Objects.equals(lastUpdate, that.lastUpdate);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(currentState, desiredState, status, lastUpdate);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "LifecycleInfoResponse{" +
+ "state=" + currentState +
+ ", intent=" + desiredState +
+ ", result=" + status +
+ ", message='" + lastUpdate + '\'' +
+ '}';
+ }
+}
+
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9e2765f3..2e630c31 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -39,6 +39,8 @@ import
org.apache.cassandra.sidecar.common.request.GossipHealthRequest;
import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
import org.apache.cassandra.sidecar.common.request.GossipUpdateRequest;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.LifecycleInfoRequest;
+import org.apache.cassandra.sidecar.common.request.LifecycleUpdateRequest;
import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
@@ -89,6 +91,7 @@ public class RequestContext
protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST =
new NodeDecommissionRequest();
protected static final StreamStatsRequest STREAM_STATS_REQUEST = new
StreamStatsRequest();
+ protected static final LifecycleInfoRequest LIFECYCLE_INFO_REQUEST = new
LifecycleInfoRequest();
protected static final RetryPolicy DEFAULT_NO_RETRY_POLICY = new
NoRetryPolicy();
protected static final RetryPolicy
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
@@ -614,6 +617,29 @@ public class RequestContext
return request(STREAM_STATS_REQUEST);
}
+ /**
+ * Sets the {@code request} to be a {@link LifecycleUpdateRequest} for
the
+ * given {@link NodeCommandRequestPayload.State state}, and returns a
reference to this Builder enabling method chaining.
+ *
+ * @param state the intended state for a Cassandra node
+ * @return a reference to this Builder
+ */
+ public Builder nodeLifecycleUpdateRequest(@NotNull
NodeCommandRequestPayload.State state)
+ {
+ return request(new LifecycleUpdateRequest(state));
+ }
+
+ /**
+ * Sets the {@code request} to be a {@link LifecycleInfoRequest} and
returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @return a reference to this Builder
+ */
+ public Builder nodeLifecycleInfoRequest()
+ {
+ return request(LIFECYCLE_INFO_REQUEST);
+ }
+
/**
* Sets the {@code retryPolicy} to be an instance of {@link
NoRetryPolicy}
*
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 5cf1399c..373bd336 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -66,6 +66,7 @@ import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -862,6 +863,45 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Sends a request to start or stop a provided Cassandra instance.
+ * <p>
+ * This operation asynchronously triggers a start or stop task of the
Cassandra instance via Sidecar's LifecycleProvider.
+ * The request body must contain a JSON payload with a "state" field,
which can be either "start" or "stop".
+ * On success of intended state submission, the server responds with HTTP
202 Accepted
+ * </p>
+ *
+ * @param instance the instance where the request will be executed
+ * @param state the desired node command state: {@link
NodeCommandRequestPayload.State#START} or {@link
NodeCommandRequestPayload.State#STOP}
+ * @return a CompletableFuture representing the completion of the operation
+ */
+
+ public CompletableFuture<LifecycleInfoResponse>
nodeUpdateLifecycle(SidecarInstance instance, NodeCommandRequestPayload.State
state)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+
.singleInstanceSelectionPolicy(instance)
+ .nodeLifecycleUpdateRequest(state)
+ .build());
+ }
+
+ /**
+ * Sends a request to retrieve the current lifecycle information of the
provided Cassandra instance.
+ * <p>
+ * This operation retrieves the current lifecycle state of the Cassandra
instance via Sidecar's LifecycleProvider.
+ * On success, the server responds with HTTP 200 OK and a payload
containing the node's actual lifecycle state, desired state,
+ * status of the last lifecycle operation, and a message providing
additional context about the last lifecycle operation.
+ * </p>
+ *
+ * @param instance the instance where the request will be executed
+ * @return a CompletableFuture representing the completion of the operation
+ */
+ public CompletableFuture<LifecycleInfoResponse>
nodeLifecycleInfo(SidecarInstance instance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+
.singleInstanceSelectionPolicy(instance)
+ .nodeLifecycleInfoRequest()
+ .build());
+ }
/**
* Returns a copy of the request builder with the default parameters
configured for the client.
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 95788cb7..b13a209b 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,8 @@ import
org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
import org.apache.cassandra.sidecar.client.retry.RetryAction;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
@@ -81,6 +83,7 @@ import
org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -1965,6 +1968,50 @@ abstract class SidecarClientTest
validateResponseServed(fileUrl);
}
+ @Test
+ void testNodeLifecycleInfo() throws Exception
+ {
+ String lifecycleInfoResponse =
"{\"current_state\":\"RUNNING\",\"desired_state\":\"RUNNING\",\"status\":\"CONVERGED\","
+
+ "\"last_update\":\"Instance has
started\"}";
+ MockResponse response = new
MockResponse().setResponseCode(OK.code()).setBody(lifecycleInfoResponse);
+ enqueue(response);
+
+ SidecarInstanceImpl sidecarInstance = instances.get(0);
+ LifecycleInfoResponse result =
client.nodeLifecycleInfo(sidecarInstance).get(30, TimeUnit.SECONDS);
+ assertThat(result).isNotNull();
+ assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
+ assertThat(result.desiredState()).isEqualTo(CassandraState.RUNNING);
+ assertThat(result.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(result.lastUpdate()).isEqualTo("Instance has started");
+
+ validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE);
+ }
+
+ @Test
+ void testNodeUpdateLifecycle() throws Exception
+ {
+ String lifecycleUpdateResponse =
"{\"current_state\":\"RUNNING\",\"desired_state\":\"STOPPED\",\"status\":\"CONVERGING\","
+
+ "\"last_update\":\"Submitting stop task for instance\"}";
+ MockResponse response = new
MockResponse().setResponseCode(OK.code()).setBody(lifecycleUpdateResponse);
+ enqueue(response);
+
+ SidecarInstanceImpl sidecarInstance = instances.get(0);
+ LifecycleInfoResponse result =
client.nodeUpdateLifecycle(sidecarInstance,
NodeCommandRequestPayload.State.STOP)
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(result).isNotNull();
+ assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
+ assertThat(result.desiredState()).isEqualTo(CassandraState.STOPPED);
+ assertThat(result.status()).isEqualTo(OperationStatus.CONVERGING);
+ assertThat(result.lastUpdate()).isEqualTo("Submitting stop task for
instance");
+
+ validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE, request -> {
+ String requestBody = request.getBody().readUtf8();
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ assertThat(requestBody).isEqualTo("{\"state\":\"stop\"}");
+ });
+ }
+
private void enqueue(MockResponse response)
{
for (MockWebServer server : servers)
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
new file mode 100644
index 00000000..e95c6add
--- /dev/null
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Manages the lifecycle of JVM Dtest Cassandra instances.
+ * This should be used for integration tests where Cassandra instances are
started and stopped
+ */
+public class InJvmDTestLifecycleProvider implements LifecycleProvider
+{
+ private final Iterable<? extends IInstance> instances;
+
+ public InJvmDTestLifecycleProvider(Iterable<? extends IInstance> instances)
+ {
+ this.instances = instances;
+ }
+
+ @Override
+ public void start(InstanceMetadata instanceMetadata)
+ {
+ getInstance(instanceMetadata).startup();
+ }
+
+ @Override
+ public void stop(InstanceMetadata instanceMetadata)
+ {
+ try
+ {
+ // Synchronous to ensure JMX port is released before start again
+ getInstance(instanceMetadata).shutdown().get(1, TimeUnit.MINUTES);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean isRunning(InstanceMetadata host)
+ {
+ return !getInstance(host).isShutdown();
+ }
+
+ private IInstance getInstance(InstanceMetadata instanceMetadata)
+ {
+ for (IInstance instance : instances)
+ {
+ if
(instance.broadcastAddress().getHostName().equals(instanceMetadata.host()))
+ {
+ return instance;
+ }
+ }
+ throw new IllegalArgumentException("No instance found for host: " +
instanceMetadata);
+ }
+}
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 13da5def..8d360801 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -25,12 +25,16 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -56,7 +60,10 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
+import io.vertx.core.Handler;
import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.json.JsonObject;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -91,6 +98,8 @@ import
org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.lifecycle.InJvmDTestLifecycleProvider;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
@@ -396,6 +405,32 @@ public abstract class SharedClusterIntegrationTestBase
.isTrue();
}
+ protected void waitForNodeToBeUp(String hostname, long timeout, TimeUnit
timeUnit) throws TimeoutException
+ {
+ long startTime = System.nanoTime();
+ while (!serverWrapper.upNodes.contains(hostname))
+ {
+ if (System.nanoTime() - startTime > timeUnit.toNanos(timeout))
+ {
+ throw new TimeoutException("Instance " + hostname + " did not
come up after " + timeout + ' ' + timeUnit);
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ protected void waitForNodeToBeDown(String hostname, long timeout, TimeUnit
timeUnit) throws TimeoutException
+ {
+ long startTime = System.nanoTime();
+ while (serverWrapper.upNodes.contains(hostname))
+ {
+ if (System.nanoTime() - startTime > timeUnit.toNanos(timeout))
+ {
+ throw new TimeoutException("Instance " + hostname + " did not
come down after " + timeout + ' ' + timeUnit);
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Stops the Sidecar service
*
@@ -403,6 +438,10 @@ public abstract class SharedClusterIntegrationTestBase
*/
protected void stopSidecar() throws InterruptedException
{
+ if (serverWrapper == null)
+ {
+ return;
+ }
closeServer(serverWrapper.server);
}
@@ -546,8 +585,10 @@ public abstract class SharedClusterIntegrationTestBase
{
public final Injector injector;
public final Server server;
+ private final InstancesMetadata instancesMetadata;
public volatile int serverPort;
private final CountDownLatch sidecarSchemaReadyLatch = new
CountDownLatch(1);
+ private final Set<String> upNodes = Collections.newSetFromMap(new
ConcurrentHashMap<String, Boolean>());
public ServerWrapper(Injector sidecarServerInjector, Server server)
{
@@ -555,10 +596,33 @@ public abstract class SharedClusterIntegrationTestBase
this.server = server;
// Server must have started to retrieve the port
this.serverPort = server.actualPort();
+ this.instancesMetadata =
injector.getInstance(InstancesMetadata.class);
Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
msg ->
sidecarSchemaReadyLatch.countDown());
+
vertx.eventBus().localConsumer(SidecarServerEvents.ON_CASSANDRA_CQL_READY.address(),
+ cqlUpHandler());
+
vertx.eventBus().localConsumer(SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED.address(),
+ cqlDownHandler());
+ }
+
+ public Handler<Message<JsonObject>> cqlUpHandler()
+ {
+ return message -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ String hostname =
instancesMetadata.instanceFromId(instanceId).host();
+ upNodes.add(hostname);
+ };
+ }
+
+ public Handler<Message<JsonObject>> cqlDownHandler()
+ {
+ return message -> {
+ Integer instanceId =
message.body().getInteger("cassandraInstanceId");
+ String hostname =
instancesMetadata.instanceFromId(instanceId).host();
+ upNodes.remove(hostname);
+ };
}
}
@@ -645,6 +709,13 @@ public abstract class SharedClusterIntegrationTestBase
return new ClusterLease(ClusterLease.Ownership.CLAIMED);
}
+ @Provides
+ @Singleton
+ public LifecycleProvider lifecycleProvider()
+ {
+ return new InJvmDTestLifecycleProvider(instances);
+ }
+
private List<InetSocketAddress> buildContactPoints()
{
return StreamSupport.stream(instances.spliterator(), false)
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
new file mode 100644
index 00000000..ff8d3f2f
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests the {@link InJvmDTestLifecycleProvider} to ensure it can start and
stop a Cassandra node
+ */
+public class InJvmLifecycleProviderIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ static final InstanceMetadata LOCALHOST_METADATA =
mock(InstanceMetadata.class);
+ private static final String JVM_LIFECYCLE_TEST_MIN_VERSION = "4.1";
+
+ @BeforeAll
+ static void beforeAll()
+ {
+ when(LOCALHOST_METADATA.host()).thenReturn("localhost");
+ }
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration().startCluster(false);
+ }
+
+ @Override
+ protected void beforeClusterProvisioning()
+ {
+ // JVM Distributed Test framework contains a bug with restarting nodes
in version 4.0 (CASSANDRA-19729)
+ assumeThat(SimpleCassandraVersion.create(testVersion.version()))
+ .as("JVM Distributed Test framework contains a bug with restarting
nodes in version 4.0 (CASSANDRA-19729)")
+
.isGreaterThanOrEqualTo(SimpleCassandraVersion.create(JVM_LIFECYCLE_TEST_MIN_VERSION));
+ }
+
+ @Test
+ void testInJvmLifecycleProviderStartAndStopAndRecoveryAfterCrash() throws
Exception
+ {
+ // Simulate node crashing by directly calling the lifecycle provider's
stop method
+ Runnable cassandraCrasher = () -> {
+ LifecycleProvider lifecycleProvider =
serverWrapper.injector.getInstance(LifecycleProvider.class);
+ lifecycleProvider.stop(LOCALHOST_METADATA);
+ };
+
+ LifecycleProviderIntegrationTester tester = new
LifecycleProviderIntegrationTester(
+ trustedClient(),
+ LOCALHOST_METADATA.host(),
+ serverWrapper.serverPort,
+ cassandraCrasher);
+
+ tester.testLifecycleProviderStartAndStopAndRecoveryAfterCrash();
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ // Do nothing
+ }
+}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
new file mode 100644
index 00000000..c65db08e
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Utility class to test different lifecycle provider implementations.
+ */
+public class LifecycleProviderIntegrationTester
+{
+ protected final Logger logger =
LoggerFactory.getLogger(LifecycleProviderIntegrationTester.class);
+
+ static final int TIMEOUT_SECONDS = 120;
+
+ final WebClient client;
+ final String sidecarHost;
+ final int sidecarPort;
+ final Runnable cassandraNodeCrasher;
+
+ public LifecycleProviderIntegrationTester(WebClient client, String
sidecarHost, int sidecarPort, Runnable cassandraNodeCrasher)
+ {
+ this.client = client;
+ this.sidecarHost = sidecarHost;
+ this.sidecarPort = sidecarPort;
+ this.cassandraNodeCrasher = cassandraNodeCrasher;
+ }
+
+ void testLifecycleProviderStartAndStopAndRecoveryAfterCrash() throws
Exception
+ {
+ // Check CQL health is NOT_OK, since node is not started
+ assertThat(getCqlStatus()).isEqualTo("NOT_OK");
+
+ // GET /lifecycle when node has not started and no intent has been
submitted
+ HttpResponse<Buffer> lifecycleInfoOnStartup = getLifecycle();
+ assertExpectedLifecycleResponse(lifecycleInfoOnStartup, OK.code(),
+ "STOPPED", "UNKNOWN", "UNDEFINED",
+ "No lifecycle task submitted for this
instance yet.");
+
+ // Start node via PUT /lifecycle and wait for node to be up
+ HttpResponse<Buffer> lifecycleUpdateStart = putLifecycle("start");
+ assertExpectedLifecycleResponse(lifecycleUpdateStart, ACCEPTED.code(),
+ "STOPPED", "RUNNING", "CONVERGING",
"Submitting start task for instance");
+
+ waitForLastUpdateToConverge("Instance has started", TIMEOUT_SECONDS);
+ waitForCqlStatus("OK", TIMEOUT_SECONDS);
+
+ // GET /lifecycle after node has started up
+ HttpResponse<Buffer> lifecycleInfoAfterStartup = getLifecycle();
+ assertExpectedLifecycleResponse(lifecycleInfoAfterStartup, OK.code(),
+ "RUNNING", "RUNNING", "CONVERGED",
"Instance has started");
+
+ // Try starting node with PUT /lifecycle again
+ HttpResponse<Buffer> lifecycleInfoAfterStartAgain =
putLifecycle("start");
+ assertExpectedLifecycleResponse(lifecycleInfoAfterStartAgain,
OK.code(),
+ "RUNNING", "RUNNING", "CONVERGED",
"Instance has started");
+
+ // Simulate node crashing by directly calling the lifecycle provider's
stop method
+ cassandraNodeCrasher.run();
+ waitForLastUpdateToConverge(String.format("Instance %s has
unexpectedly diverged from the desired state RUNNING to STOPPED.",
sidecarHost), TIMEOUT_SECONDS);
+
+ // CQL status should be down since instance is crashed
+ waitForCqlStatus("NOT_OK", TIMEOUT_SECONDS);
+
+ // GET /lifecycle after node has crashed
+ HttpResponse<Buffer> lifecycleInfoAfterCrash = getLifecycle();
+ assertExpectedLifecycleResponse(lifecycleInfoAfterCrash, OK.code(),
+ "STOPPED", "RUNNING", "DIVERGED",
+ String.format("Instance %s has
unexpectedly diverged from the desired state RUNNING to STOPPED.",
sidecarHost));
+
+ // Try starting node with PUT /lifecycle again
+ HttpResponse<Buffer> lifecycleInfoOnStartAfterCrash =
putLifecycle("start");
+ assertExpectedLifecycleResponse(lifecycleInfoOnStartAfterCrash,
ACCEPTED.code(),
+ "STOPPED", "RUNNING", "CONVERGING",
"Submitting start task for instance");
+ waitForLastUpdateToConverge("Instance has started", TIMEOUT_SECONDS);
+ waitForCqlStatus("OK", TIMEOUT_SECONDS);
+
+ // Confirm that the node is up again with GET /lifecycle
+ HttpResponse<Buffer> lifecycleInfoAfterRecovery = getLifecycle();
+ assertExpectedLifecycleResponse(lifecycleInfoAfterRecovery, OK.code(),
+ "RUNNING", "RUNNING", "CONVERGED",
"Instance has started");
+
+ // Stop node via PUT /lifecycle
+ HttpResponse<Buffer> lifecycleUpdateStop = putLifecycle("stop");
+ assertExpectedLifecycleResponse(lifecycleUpdateStop, ACCEPTED.code(),
+ "RUNNING", "STOPPED", "CONVERGING",
"Submitting stop task for instance");
+
+ // Allow time for the async stop task to complete
+ waitForLastUpdateToConverge("Instance has stopped", TIMEOUT_SECONDS);
+
+ // GET /lifecycle after node has stopped
+ HttpResponse<Buffer> lifecycleInfoAfterStop = getLifecycle();
+ assertExpectedLifecycleResponse(lifecycleInfoAfterStop, OK.code(),
+ "STOPPED", "STOPPED", "CONVERGED",
"Instance has stopped");
+
+ // CQL status should be down since instance is stopped
+ assertThat(getCqlStatus()).isEqualTo("NOT_OK");
+
+ // Try stopping node with PUT /lifecycle again, should be idempotent
+ HttpResponse<Buffer> lifecycleInfoOnStopAgain = putLifecycle("stop");
+ assertExpectedLifecycleResponse(lifecycleInfoOnStopAgain, OK.code(),
+ "STOPPED", "STOPPED", "CONVERGED",
"Instance has stopped");
+ }
+
+ private void assertExpectedLifecycleResponse(
+ HttpResponse<Buffer> response,
+ int expectedStatusCode,
+ String currentState,
+ String desiredState,
+ String status,
+ String lastUpdate)
+ {
+ assertThat(response.statusCode()).isEqualTo(expectedStatusCode);
+ JsonObject body = response.bodyAsJsonObject();
+ assertThat(body.getString("current_state")).isEqualTo(currentState);
+ assertThat(body.getString("desired_state")).isEqualTo(desiredState);
+ assertThat(body.getString("status")).isEqualTo(status);
+ assertThat(body.getString("last_update")).isEqualTo(lastUpdate);
+ }
+
+ private HttpResponse<Buffer> getLifecycle()
+ {
+ return getBlocking(client
+ .get(sidecarPort, sidecarHost,
"/api/v1/cassandra/lifecycle")
+ .send());
+ }
+
+ private HttpResponse<Buffer> putLifecycle(String desiredState)
+ {
+ return getBlocking(client
+ .put(sidecarPort, sidecarHost,
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(JsonObject.of("state",
desiredState).toBuffer()));
+ }
+
+ public void waitForCqlStatus(String expectedStatus, int timeoutSeconds)
throws TimeoutException
+ {
+ long startTime = System.nanoTime();
+ String cqlStatus = getCqlStatus();
+ while (!cqlStatus.equals(expectedStatus))
+ {
+ if (System.nanoTime() - startTime >
TimeUnit.SECONDS.toNanos(timeoutSeconds))
+ {
+ throw new TimeoutException("Expected CQL status not reached
after " + timeoutSeconds + " seconds. " +
+ ", status: " + cqlStatus);
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ cqlStatus = getCqlStatus();
+ }
+ }
+
+ private String getCqlStatus()
+ {
+ HttpResponse<Buffer> cqlStatus = getBlocking(
+ client
+ .get(sidecarPort, sidecarHost, "/api/v1/cassandra/native/__health")
+ .send());
+ logger.info("Native health response: {}", cqlStatus.bodyAsString());
+
+ return cqlStatus.bodyAsJsonObject().getString("status");
+ }
+
+ private void waitForLastUpdateToConverge(String expectedLastUpdate, int
timeoutSeconds) throws TimeoutException
+ {
+ long startTime = System.nanoTime();
+ String lastUpdate =
getLifecycle().bodyAsJsonObject().getString("last_update");
+ while (!lastUpdate.equals(expectedLastUpdate))
+ {
+ if (System.nanoTime() - startTime >
TimeUnit.SECONDS.toNanos(timeoutSeconds))
+ {
+ throw new TimeoutException("Expected lifecycle update not
reached after " + timeoutSeconds + " seconds. " +
+ ", last_update: " + lastUpdate);
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ lastUpdate =
getLifecycle().bodyAsJsonObject().getString("last_update");
+ }
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 0a809b1b..b57ac623 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -84,4 +84,8 @@ public class BasicPermissions
// Live Migration permissions
public static final Permission LIST_FILES = new
DomainAwarePermission("LIVE_MIGRATION:LIST_FILES", CLUSTER_SCOPE);
public static final Permission STREAM_FILES = new
DomainAwarePermission("LIVE_MIGRATION:STREAM", CLUSTER_SCOPE);
+
+ // Lifecycle permissions
+ public static final Permission READ_LIFECYCLE = new
DomainAwarePermission("LIFECYCLE:READ", CLUSTER_SCOPE);
+ public static final Permission MODIFY_LIFECYCLE = new
DomainAwarePermission("LIFECYCLE:MODIFY", CLUSTER_SCOPE);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
new file mode 100644
index 00000000..7268384f
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+/**
+ * Configuration for Cassandra lifecycle management service
+ */
+public interface LifecycleConfiguration
+{
+
+ /**
+ * @return {@code true} if lifecycle management is enabled, {@code false}
otherwise
+ */
+ boolean enabled();
+
+ /**
+ * @return configuration needed for setting up lifecycle in Sidecar
+ */
+ ParameterizedClassConfiguration lifecycleProvider();
+
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 011b9f7c..917c3b55 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -109,4 +109,9 @@ public interface SidecarConfiguration
* @return the configuration for live migration
*/
LiveMigrationConfiguration liveMigrationConfiguration();
+
+ /**
+ * @return the configuration for lifecycle management
+ */
+ LifecycleConfiguration lifecycleConfiguration();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
new file mode 100644
index 00000000..aa13b7d3
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
+import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
+import org.apache.cassandra.sidecar.lifecycle.ProcessLifecycleProvider;
+
+/**
+ * Configuration for Cassandra lifecycle management service
+ */
+public class LifecycleConfigurationImpl implements LifecycleConfiguration
+{
+ private static final boolean DEFAULT_ENABLED = false;
+ private static final ParameterizedClassConfiguration
DEFAULT_LIFECYCLE_PROVIDER =
+ new
ParameterizedClassConfigurationImpl(ProcessLifecycleProvider.class.getName(),
Collections.emptyMap());
+
+ protected final Boolean enabled;
+
+ protected final String directory;
+
+ protected final ParameterizedClassConfiguration lifecycleProvider;
+
+ public LifecycleConfigurationImpl()
+ {
+ this(DEFAULT_ENABLED, null, DEFAULT_LIFECYCLE_PROVIDER);
+ }
+
+ @JsonCreator
+ public LifecycleConfigurationImpl(@JsonProperty("enabled") Boolean enabled,
+ @JsonProperty("directory") String
directory,
+ @JsonProperty("provider")
ParameterizedClassConfiguration lifecycleProvider)
+ {
+ this.enabled = enabled;
+ this.directory = directory;
+ this.lifecycleProvider = lifecycleProvider;
+ }
+
+ @JsonProperty(value = "enabled")
+ public boolean enabled()
+ {
+ return enabled;
+ }
+
+ @JsonProperty(value = "provider")
+ public ParameterizedClassConfiguration lifecycleProvider()
+ {
+ return lifecycleProvider;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index 887ace1b..392c12e0 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -42,6 +42,7 @@ import
org.apache.cassandra.sidecar.config.AccessControlConfiguration;
import
org.apache.cassandra.sidecar.config.CassandraInputValidationConfiguration;
import org.apache.cassandra.sidecar.config.DriverConfiguration;
import org.apache.cassandra.sidecar.config.InstanceConfiguration;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
import org.apache.cassandra.sidecar.config.MetricsConfiguration;
import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
@@ -115,6 +116,9 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
@JsonProperty("live_migration")
private LiveMigrationConfiguration liveMigrationConfiguration;
+ @JsonProperty("lifecycle")
+ private LifecycleConfiguration lifecycleConfiguration;
+
public SidecarConfigurationImpl()
{
this(builder());
@@ -138,6 +142,7 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
vertxConfiguration = builder.vertxConfiguration;
schemaReportingConfiguration = builder.schemaReportingConfiguration;
liveMigrationConfiguration = builder.liveMigrationConfiguration;
+ lifecycleConfiguration = builder.lifecycleConfiguration;
}
/**
@@ -301,6 +306,13 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return liveMigrationConfiguration;
}
+ @Override
+ @JsonProperty("lifecycle")
+ public LifecycleConfiguration lifecycleConfiguration()
+ {
+ return lifecycleConfiguration;
+ }
+
public static SidecarConfigurationImpl readYamlConfiguration(Path
yamlConfigurationPath) throws IOException
{
@@ -413,6 +425,7 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
private VertxConfiguration vertxConfiguration = new
VertxConfigurationImpl();
private SchemaReportingConfiguration schemaReportingConfiguration =
new SchemaReportingConfigurationImpl();
private LiveMigrationConfiguration liveMigrationConfiguration = new
LiveMigrationConfigurationImpl();
+ private LifecycleConfiguration lifecycleConfiguration = new
LifecycleConfigurationImpl();
protected Builder()
{
@@ -606,6 +619,17 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return update(b -> b.liveMigrationConfiguration =
liveMigrationConfiguration);
}
+ /**
+ * Sets the {@code lifecycleConfiguration} and returns a reference to
this Builder enabling method chaining.
+ *
+ * @param lifecycleConfiguration the {@code lifecycleConfiguration} to
set
+ * @return a reference to this Builder
+ */
+ public Builder lifecycleConfiguration(LifecycleConfiguration
lifecycleConfiguration)
+ {
+ return update(b -> b.lifecycleConfiguration =
lifecycleConfiguration);
+ }
+
/**
* Returns a {@code SidecarConfigurationImpl} built from the
parameters previously set.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
new file mode 100644
index 00000000..e789f90a
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.exceptions;
+
+/**
+ * Exception thrown when a lifecycle operation is already in progress for a
Cassandra host.
+ */
+public class LifecycleTaskConflictException extends Exception
+{
+ public LifecycleTaskConflictException(String message)
+ {
+ super(message);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
new file mode 100644
index 00000000..f887169b
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Handles {@code GET /api/v1/cassandra/lifecycle} requests for retrieving
lifecycle information
+ * for a given Cassandra node.
+ */
+@Singleton
+public class LifecycleInfoHandler extends AbstractHandler<Void> implements
AccessProtected
+{
+
+ private final LifecycleManager lifecycleManager;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the metadata fetcher
+ * @param executorPools executor pools for blocking executions
+ */
+ @Inject
+ public LifecycleInfoHandler(InstanceMetadataFetcher metadataFetcher,
ExecutorPools executorPools, LifecycleManager lifecycleManager)
+ {
+ super(metadataFetcher, executorPools, null);
+ this.lifecycleManager = lifecycleManager;
+ }
+
+ @Override
+ public Set<Authorization> requiredAuthorizations()
+ {
+ return
Collections.singleton(BasicPermissions.READ_LIFECYCLE.toAuthorization());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ @NotNull String host,
+ SocketAddress remoteAddress,
+ Void request)
+ {
+ executorPools.service()
+ .executeBlocking(() ->
lifecycleManager.getLifecycleInfo(host))
+ .onSuccess(context::json)
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
new file mode 100644
index 00000000..6f69fcec
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.utils.HttpExceptions;
+
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Handles {@code PUT /api/v1/cassandra/lifecycle} requests to start or stop a
Cassandra node.
+ *
+ * <p> Expects a JSON payload:
+ * { "state": "start" } or { "state": "stop" }
+ * and will record the desired state. </p>
+ */
+@Singleton
+public class LifecycleUpdateHandler extends NodeCommandHandler implements
AccessProtected
+{
+ private final LifecycleManager lifecycleManager;
+
+ @Inject
+ public LifecycleUpdateHandler(InstanceMetadataFetcher metadataFetcher,
ExecutorPools executorPools, LifecycleManager lifecycleManager)
+ {
+ super(metadataFetcher, executorPools, null);
+ this.lifecycleManager = lifecycleManager;
+ }
+
+ @Override
+ public Set<Authorization> requiredAuthorizations()
+ {
+ return
Collections.singleton(BasicPermissions.MODIFY_LIFECYCLE.toAuthorization());
+ }
+
+ @Override
+ protected void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ @NotNull String host,
+ SocketAddress remoteAddress,
+ NodeCommandRequestPayload request)
+ {
+ CassandraState desiredState =
CassandraState.fromNodeCommandState(request.state());
+ executorPools.service()
+ .executeBlocking(() ->
lifecycleManager.updateDesiredState(host, desiredState))
+ .onSuccess(info ->
+ {
+ HttpServerResponse response =
context.response().putHeader("Content-Type", "application/json");
+ switch (info.status())
+ {
+ case CONVERGED:
+
response.setStatusCode(HttpResponseStatus.OK.code());
+ break;
+ case CONVERGING:
+
response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
+ break;
+ default:
+ logger.warn("{} request failed
with unexpected result. request={}, remoteAddress={}, instance={},
operationStatus={}",
+
this.getClass().getSimpleName(), request, remoteAddress, host, info.status());
+
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+ }
+ response.end(Json.encode(info));
+ })
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ @Override
+ protected void processFailure(Throwable cause, RoutingContext context,
String host, SocketAddress remoteAddress, NodeCommandRequestPayload request)
+ {
+ if (cause instanceof LifecycleTaskConflictException)
+ {
+ HttpException httpException =
HttpExceptions.wrapHttpException(HttpResponseStatus.CONFLICT,
+
cause.getMessage(), cause);
+ logger.warn("{} request failed due to lifecycle conflict.
request={}, remoteAddress={}, instance={}",
+ this.getClass().getSimpleName(), request,
remoteAddress, host, cause);
+ context.fail(httpException);
+ }
+ else
+ {
+ super.processFailure(cause, context, host, remoteAddress, request);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
new file mode 100644
index 00000000..18338741
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+/**
+ * Manages the lifecycle intents for local Cassandra hosts and submits tasks
to the
+ * LifecycleProvider, while ensuring that only one task can be active per host
at a time.
+ */
+@Singleton
+public class LifecycleManager
+{
+ protected static final Logger LOG =
LoggerFactory.getLogger(LifecycleManager.class);
+
+ private final InstanceMetadataFetcher metadataFetcher;
+ private final LifecycleProvider lifecycleProvider;
+ private final ExecutorPools executorPools;
+
+ private final Set<String> convergingInstances =
ConcurrentHashMap.newKeySet();
+ private final Map<String, OperationStatus> lastCompletedStatus = new
ConcurrentHashMap<>();
+ private final Map<String, CassandraState> desiredStateByInstance = new
ConcurrentHashMap<>();
+ private final Map<String, String> lastUpdateMsgByInstance = new
ConcurrentHashMap<>();
+
+ @Inject
+ public LifecycleManager(InstanceMetadataFetcher metadataFetcher,
+ LifecycleProvider lifecycleProvider,
+ ExecutorPools executorPools)
+ {
+ this.metadataFetcher = metadataFetcher;
+ this.lifecycleProvider = lifecycleProvider;
+ this.executorPools = executorPools;
+ }
+
+ public synchronized LifecycleInfoResponse updateDesiredState(String
instanceId, CassandraState desiredState)
+ throws
LifecycleTaskConflictException
+ {
+ // Nothing to do: already in successful desired state
+ LifecycleInfoResponse current = getLifecycleInfo(instanceId);
+ if (current.desiredState() == desiredState && current.status() ==
OperationStatus.CONVERGED)
+ {
+ return current;
+ }
+ // Make sure there is a single lifecycle task running at a time per
instance
+ if (!convergingInstances.add(instanceId))
+ {
+ throw new LifecycleTaskConflictException(String.format("Cannot
update lifecycle state of instance %s to %s. " +
+ "Task
already in progress for this host.",
+ instanceId,
desiredState));
+ }
+ // Converge state asynchronously and cleanup in-progress tasks after
completion or error
+ try
+ {
+ lastCompletedStatus.remove(instanceId);
+ desiredStateByInstance.put(instanceId, desiredState);
+ lastUpdateMsgByInstance.put(instanceId, "Submitting " +
+ (desiredState.isRunning() ? "start" : "stop") + " task for
instance");
+ LifecycleInfoResponse updatedDesireLifecycleInfo =
getLifecycleInfo(instanceId);
+ Future<Void> convergeTask = desiredState.isRunning()
+ ? submitStartTask(instanceId)
+ : submitStopTask(instanceId);
+ convergeTask.onFailure(x -> LOG.warn("Unexpected failure while
converging state.", x))
+ .onComplete(x ->
convergingInstances.remove(instanceId));
+ return updatedDesireLifecycleInfo;
+ }
+ catch (Exception e)
+ {
+ LOG.error("Unexpected error while submitting lifecycle task for
instance {}: {}", instanceId, e.getMessage());
+ convergingInstances.remove(instanceId);
+ throw e;
+ }
+ }
+
+ public synchronized LifecycleInfoResponse getLifecycleInfo(String
instanceId)
+ {
+ CassandraState currentState = lifecycleProvider
+ .isRunning(metadata(instanceId)) ? CassandraState.RUNNING :
CassandraState.STOPPED;
+ CassandraState desiredState =
this.desiredStateByInstance.getOrDefault(instanceId, CassandraState.UNKNOWN);
+ OperationStatus currentStatus = getStatus(instanceId, currentState,
desiredState);
+ maybeRefreshLastCompletedStatus(instanceId, currentStatus,
currentState, desiredState);
+ String lastUpdate =
this.lastUpdateMsgByInstance.getOrDefault(instanceId, "No lifecycle task
submitted for this instance yet.");
+ return new LifecycleInfoResponse(currentState, desiredState,
currentStatus, lastUpdate);
+ }
+
+ /**
+ * This method keeps track of the last completed status of an instance and
logs a warning if the operation status has changed unexpectedly.
+ * This is useful for detecting unexpected state changes that may indicate
issues with the lifecycle management.
+ * See @{link LifecycleManagerTest#testStateChangesUnexpectedlyFlapping}
for an example test case.
+ */
+ private void maybeRefreshLastCompletedStatus(String instanceId,
OperationStatus currentStatus, CassandraState currentState,
+ CassandraState desiredState)
+ {
+ if (!currentStatus.isCompleted()) // this logic is only valid after a
lifecycle task has completed
+ return;
+ OperationStatus lastStatus = lastCompletedStatus.put(instanceId,
currentStatus);
+ if (lastStatus != null && lastStatus != currentStatus)
+ {
+ String msg = currentStatus.isConverged() ?
+ String.format("Instance %s has converged back to the
desired state %s.", instanceId, desiredState) :
+ String.format("Instance %s has unexpectedly diverged
from the desired state %s to %s.", instanceId, desiredState, currentState);
+ lastUpdateMsgByInstance.put(instanceId, msg);
+ LOG.warn(msg);
+ }
+ }
+
+ private Future<Void> submitStartTask(String instanceId)
+ {
+ return executorPools.internal().runBlocking(() ->
+ {
+ try
+ {
+ lastUpdateMsgByInstance.put(instanceId, "Starting instance");
+ lifecycleProvider.start(metadata(instanceId));
+ lastUpdateMsgByInstance.put(instanceId, "Instance has
started");
+ }
+ catch (Exception e)
+ {
+ LOG.error("Failed to start instance {}: {}", instanceId,
e.getMessage());
+ lastUpdateMsgByInstance.put(instanceId, String.format("Failed
to start instance %s: %s", instanceId, e.getMessage()));
+ }
+ });
+ }
+
+ private Future<Void> submitStopTask(String instanceId)
+ {
+ return executorPools.internal().runBlocking(() ->
+ {
+ try
+ {
+ lastUpdateMsgByInstance.put(instanceId, "Stopping instance");
+ lifecycleProvider.stop(metadata(instanceId));
+ lastUpdateMsgByInstance.put(instanceId, "Instance has
stopped");
+ }
+ catch (Exception e)
+ {
+ LOG.error("Failed to stop instance {}: {}", instanceId,
e.getMessage());
+ lastUpdateMsgByInstance.put(instanceId, String.format("Failed
to stop instance %s: %s", instanceId, e.getMessage()));
+ }
+ });
+ }
+
+ private OperationStatus getStatus(String instanceId, CassandraState
currentState, CassandraState desiredState)
+ {
+ if (convergingInstances.contains(instanceId) && desiredState !=
currentState)
+ {
+ return OperationStatus.CONVERGING;
+ }
+ if (desiredState == CassandraState.UNKNOWN)
+ {
+ return OperationStatus.UNDEFINED;
+ }
+ if (desiredState != currentState)
+ {
+ return OperationStatus.DIVERGED;
+ }
+ return OperationStatus.CONVERGED;
+ }
+
+ private InstanceMetadata metadata(String host)
+ {
+ return metadataFetcher.instance(host);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
new file mode 100644
index 00000000..49897990
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Manages the lifecycle of Cassandra instances through Sidecar
+ */
+public interface LifecycleProvider
+{
+ /**
+ * Start a Cassandra process
+ *
+ * @param instance Cassandra instance metadata
+ */
+ void start(InstanceMetadata instance);
+
+ /**
+ * Stop a Cassandra process
+ *
+ * @param instance Cassandra instance metadata
+ */
+ void stop(InstanceMetadata instance);
+
+ /**
+ * Check whether a Cassandra process is running or not
+ *
+ * @param instance Cassandra instance metadata
+ * @return true if the Cassandra process is running
+ */
+ boolean isRunning(InstanceMetadata instance);
+
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
new file mode 100644
index 00000000..681dc990
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.Map;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * A {@link LifecycleProvider} that manages Cassandra instances as OS
processes.
+ * <p>
+ * This implementation is a placeholder and is not yet implemented.
+ */
+public class ProcessLifecycleProvider implements LifecycleProvider
+{
+ public ProcessLifecycleProvider(Map<String, String> params)
+ {
+ // Params unused for now
+ }
+
+ @Override
+ public void start(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public void stop(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Not implemented yet");
+
+ }
+
+ @Override
+ public boolean isRunning(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
new file mode 100644
index 00000000..6da8f3c5
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.modules;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoMap;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.PUT;
+import jakarta.ws.rs.Path;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
+import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.handlers.LifecycleInfoHandler;
+import org.apache.cassandra.sidecar.handlers.LifecycleUpdateHandler;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
+import org.apache.cassandra.sidecar.lifecycle.ProcessLifecycleProvider;
+import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
+import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
+import org.apache.cassandra.sidecar.routes.RouteBuilder;
+import org.apache.cassandra.sidecar.routes.VertxRoute;
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.eclipse.microprofile.openapi.annotations.media.Content;
+import org.eclipse.microprofile.openapi.annotations.media.Schema;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Provides the telemetry capability
+ */
+public class LifecycleModule extends AbstractModule
+{
+ @Provides
+ @Singleton
+ LifecycleProvider lifecycleProvider(SidecarConfiguration
sidecarConfiguration)
+ {
+ LifecycleConfiguration lifecycleConfiguration =
sidecarConfiguration.lifecycleConfiguration();
+ if (!lifecycleConfiguration.enabled())
+ {
+ return getNoopProvider();
+ }
+
+ ParameterizedClassConfiguration providerClass =
lifecycleConfiguration.lifecycleProvider();
+ if (providerClass == null)
+ {
+ throw new ConfigurationException("Lifecycle management is enabled,
but provider not set.");
+ }
+
+ if
(providerClass.className().equalsIgnoreCase(ProcessLifecycleProvider.class.getName()))
+ {
+ Map<String, String> namedParams = providerClass.namedParameters();
+ return new ProcessLifecycleProvider(namedParams != null ?
namedParams : Collections.emptyMap());
+ }
+
+ throw new ConfigurationException("Unrecognized authorization provider
" + providerClass.className() + " set");
+ }
+
+ private static @NotNull LifecycleProvider getNoopProvider()
+ {
+ return new LifecycleProvider()
+ {
+ @Override
+ public void start(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Lifecycle management
is disabled. Cannot start instance " + instance.host());
+ }
+
+ @Override
+ public void stop(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Lifecycle management
is disabled. Cannot stop instance " + instance.host());
+ }
+
+ @Override
+ public boolean isRunning(InstanceMetadata instance)
+ {
+ throw new UnsupportedOperationException("Lifecycle management
is disabled. Cannot check if instance " + instance.host() + " is running");
+ }
+ };
+ }
+
+ @PUT
+ @Path(ApiEndpointsV1.LIFECYCLE_ROUTE)
+ @Operation(summary = "Updates desired lifecycle state",
+ description = "Updates the desired lifecycle state for a local
Cassandra node. Valid states are 'RUNNING' or 'STOPPED'.")
+ @APIResponse(description = "Desired lifecycle state updated successfully",
+ responseCode = "202",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation = LifecycleInfoResponse.class)))
+ @ProvidesIntoMap
+ @KeyClassMapKey(VertxRouteMapKeys.LifecycleUpdateRouteKey.class)
+ VertxRoute lifecycleUpdateRoute(RouteBuilder.Factory factory,
+ LifecycleUpdateHandler
lifecycleUpdateHandler)
+ {
+ return factory.builderForRoute()
+ .setBodyHandler(true)
+ .handler(lifecycleUpdateHandler)
+ .build();
+ }
+
+ @GET
+ @Path(ApiEndpointsV1.LIFECYCLE_ROUTE)
+ @Operation(summary = "Gets lifecycle information",
+ description = "Returns the lifecycle information for a local
Cassandra node")
+ @APIResponse(description = "Lifecycle information retrieved successfully",
+ responseCode = "200",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation = LifecycleInfoResponse.class)))
+ @ProvidesIntoMap
+ @KeyClassMapKey(VertxRouteMapKeys.LifecycleInfoRouteKey.class)
+ VertxRoute lifecycleInfoRoute(RouteBuilder.Factory factory,
+ LifecycleInfoHandler lifecycleInfoHandler)
+ {
+ return factory.buildRouteWithHandler(lifecycleInfoHandler);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
index 9aa5b057..dc798177 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
@@ -51,17 +51,18 @@ public class SidecarModules
new CdcModule(),
new ConfigurationModule(confPath),
new CoordinationModule(),
- new SchemaReportingModule(),
new HealthCheckModule(),
+ new LifecycleModule(),
+ new LiveMigrationModule(),
+ new MultiBindingTypeResolverModule(),
+ new OpenApiModule(),
new RestoreJobModule(),
new SchedulingModule(),
+ new SchemaReportingModule(),
new SidecarSchemaModule(),
new SSTablesAccessModule(),
new TelemetryModule(),
- new UtilitiesModule(),
- new MultiBindingTypeResolverModule(),
- new LiveMigrationModule(),
- new OpenApiModule());
+ new UtilitiesModule());
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index 48b1e38f..e3bec846 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -183,6 +183,16 @@ public interface VertxRouteMapKeys
HttpMethod HTTP_METHOD = HttpMethod.GET;
String ROUTE_URI = ApiEndpointsV1.KEYSPACE_SCHEMA_ROUTE;
}
+ interface LifecycleInfoRouteKey extends RouteClassKey
+ {
+ HttpMethod HTTP_METHOD = HttpMethod.GET;
+ String ROUTE_URI = ApiEndpointsV1.LIFECYCLE_ROUTE;
+ }
+ interface LifecycleUpdateRouteKey extends RouteClassKey
+ {
+ HttpMethod HTTP_METHOD = HttpMethod.PUT;
+ String ROUTE_URI = ApiEndpointsV1.LIFECYCLE_ROUTE;
+ }
interface ListCassandraOperationalJobRouteKey extends RouteClassKey
{
HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
new file mode 100644
index 00000000..0a363b3b
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link LifecycleInfoHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class LifecycleInfoHandlerTest
+{
+
+ Vertx vertx;
+ Server server;
+ LifecycleManager mockLifecycleManager = mock(LifecycleManager.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule()).with(new
LifecycleInfoHandlerTestModule());
+ injector =
Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start().onSuccess(s ->
context.completeNow()).onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ getBlocking(TestResourceReaper.create().with(server).close(), 60,
TimeUnit.SECONDS, "Closing server");
+ }
+
+ @Test
+ void testGetLifecycleInfoNoTask(VertxTestContext context)
+ {
+ // Mock response for initial state with no task submitted
+ LifecycleInfoResponse expectedResponse = new
LifecycleInfoResponse(CassandraState.RUNNING, CassandraState.RUNNING,
+ OperationStatus.CONVERGED, "All good");
+
when(mockLifecycleManager.getLifecycleInfo(anyString())).thenReturn(expectedResponse);
+
+ WebClient client = WebClient.create(vertx);
+ client.get(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LifecycleInfoResponse lifecycleResponse =
response.bodyAsJson(LifecycleInfoResponse.class);
+ assertThat(lifecycleResponse).isEqualTo(expectedResponse);
+ context.completeNow();
+ }));
+ }
+
+
+ @Test
+ void testGetLifecycleInfoTaskInProgress(VertxTestContext context)
+ {
+ // Mock response for a task in progress
+ LifecycleInfoResponse mockResponse = new LifecycleInfoResponse(
+ CassandraState.STOPPED, CassandraState.RUNNING,
OperationStatus.CONVERGING, null);
+
when(mockLifecycleManager.getLifecycleInfo("127.0.0.1")).thenReturn(mockResponse);
+
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/lifecycle";
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LifecycleInfoResponse lifecycleResponse =
response.bodyAsJson(LifecycleInfoResponse.class);
+ assertThat(lifecycleResponse).isNotNull();
+
assertThat(lifecycleResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(lifecycleResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(lifecycleResponse.status()).isEqualTo(OperationStatus.CONVERGING);
+ assertThat(lifecycleResponse.lastUpdate()).isNull();
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testGetLifecycleInfoFinished(VertxTestContext context)
+ {
+ // Mock response for a finished task
+ LifecycleInfoResponse mockResponse = new LifecycleInfoResponse(
+ CassandraState.RUNNING, CassandraState.RUNNING,
OperationStatus.CONVERGED, "Host has started");
+
when(mockLifecycleManager.getLifecycleInfo("127.0.0.1")).thenReturn(mockResponse);
+
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/lifecycle";
+ client.get(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LifecycleInfoResponse lifecycleResponse =
response.bodyAsJson(LifecycleInfoResponse.class);
+ assertThat(lifecycleResponse).isNotNull();
+
assertThat(lifecycleResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(lifecycleResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(lifecycleResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(lifecycleResponse.lastUpdate()).isEqualTo("Host
has started");
+ context.completeNow();
+ }));
+ }
+
+ /**
+ * Test guice module for {@link LifecycleInfoHandler} tests
+ */
+ class LifecycleInfoHandlerTestModule extends AbstractModule
+ {
+
+ @Provides
+ @Singleton
+ public LifecycleManager intentManager()
+ {
+ return mockLifecycleManager;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
new file mode 100644
index 00000000..3a70774f
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link LifecycleUpdateHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class LifecycleUpdateHandlerTest
+{
+ Vertx vertx;
+ Server server;
+ LifecycleManager mockLifecycleManager = mock(LifecycleManager.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule()).with(new
LifecycleUpdateHandlerTestModule());
+ injector =
Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start().onSuccess(s ->
context.completeNow()).onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ reset(mockLifecycleManager);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ getBlocking(TestResourceReaper.create().with(server).close(), 60,
TimeUnit.SECONDS, "Closing server");
+ }
+
+ @Test
+ void testSuccessfulPutWithAcceptedResponse(VertxTestContext ctx) throws
LifecycleTaskConflictException
+ {
+ WebClient client = WebClient.create(vertx);
+ JsonObject payload = JsonObject.of("state", "start");
+ LifecycleInfoResponse expectedResponse = new
LifecycleInfoResponse(CassandraState.STOPPED,
+
CassandraState.RUNNING,
+
OperationStatus.CONVERGING,
+
"Submitted task to start instance");
+ when(mockLifecycleManager.updateDesiredState("127.0.0.1",
CassandraState.RUNNING))
+
.thenReturn(expectedResponse);
+ client.put(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockLifecycleManager,
times(1)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+ assertThat(resp.statusCode()).isEqualTo(ACCEPTED.code());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testSuccessfulPutWithOKResponse(VertxTestContext ctx) throws
LifecycleTaskConflictException
+ {
+ WebClient client = WebClient.create(vertx);
+ JsonObject payload = JsonObject.of("state", "stop");
+ LifecycleInfoResponse expectedResponse = new
LifecycleInfoResponse(CassandraState.STOPPED,
+
CassandraState.STOPPED,
+
OperationStatus.CONVERGED,
+
"Submitted task to stop instance");
+ when(mockLifecycleManager.updateDesiredState("127.0.0.1",
CassandraState.STOPPED)).thenReturn(expectedResponse);
+ client.put(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockLifecycleManager,
times(1)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testSuccessfulPutWithFailedResponse(VertxTestContext ctx) throws
LifecycleTaskConflictException
+ {
+ WebClient client = WebClient.create(vertx);
+ JsonObject payload = JsonObject.of("state", "stop");
+ LifecycleInfoResponse expectedResponse = new
LifecycleInfoResponse(CassandraState.RUNNING,
+
CassandraState.STOPPED,
+
OperationStatus.DIVERGED,
+
"Error while stopping instance");
+ when(mockLifecycleManager.updateDesiredState("127.0.0.1",
CassandraState.STOPPED)).thenReturn(expectedResponse);
+ client.put(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockLifecycleManager,
times(1)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testInvalidState(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ JsonObject payload = JsonObject.of("state", "invalid");
+ client.put(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+ verify(mockLifecycleManager,
times(0)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+ verify(mockLifecycleManager,
times(0)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testSubmitTaskAlreadyInProgress(VertxTestContext ctx) throws
LifecycleTaskConflictException
+ {
+ // Setup mock to throw conflict exception
+ doThrow(new LifecycleTaskConflictException("Cannot start host
127.0.0.1. Task already in progress for this host."))
+ .when(mockLifecycleManager).updateDesiredState("127.0.0.1",
CassandraState.RUNNING);
+
+ WebClient client = WebClient.create(vertx);
+ JsonObject payload = JsonObject.of("state", "start");
+ client.put(server.actualPort(), "127.0.0.1",
"/api/v1/cassandra/lifecycle")
+ .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(CONFLICT.code());
+ verify(mockLifecycleManager,
times(1)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Test guice module for {@link LifecycleUpdateHandler} tests
+ */
+ class LifecycleUpdateHandlerTestModule extends AbstractModule
+ {
+
+ @Provides
+ @Singleton
+ public LifecycleManager lifecycleManager()
+ {
+ return mockLifecycleManager;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
new file mode 100644
index 00000000..ee6addf6
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link LifecycleManager} to validate lifecycle management behavior
+ */
+class LifecycleManagerTest
+{
+ private static final String TEST_HOST = "127.0.0.1";
+ private static final InstanceMetadata TEST_HOST_META =
mock(InstanceMetadata.class);
+ private static final String TEST_HOST_2 = "127.0.0.2";
+ private static final InstanceMetadata TEST_HOST_2_META =
mock(InstanceMetadata.class);
+
+ protected Vertx vertx;
+ protected ExecutorPools executorPools;
+ protected LifecycleProvider mockLifecycleProvider;
+ protected InstanceMetadataFetcher metadataFetcher =
mock(InstanceMetadataFetcher.class);
+
+ @BeforeEach
+ void setup()
+ {
+ vertx = Vertx.vertx();
+ executorPools = new ExecutorPools(vertx, new
ServiceConfigurationImpl());
+ mockLifecycleProvider = mock(LifecycleProvider.class);
+ when(metadataFetcher.instance(TEST_HOST)).thenReturn(TEST_HOST_META);
+
when(metadataFetcher.instance(TEST_HOST_2)).thenReturn(TEST_HOST_2_META);
+ }
+
+ @AfterEach
+ void cleanup()
+ {
+ TestResourceReaper.create().with(vertx).with(executorPools).close();
+ }
+
+ @Test
+ void testGetLifecycleInfoWithNoTaskSubmitted()
+ {
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+ LifecycleInfoResponse response =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+ assertThat(response.currentState()).isEqualTo(CassandraState.STOPPED);
+ assertThat(response.desiredState()).isEqualTo(CassandraState.UNKNOWN);
+ assertThat(response.status()).isEqualTo(OperationStatus.UNDEFINED);
+ assertThat(response.lastUpdate()).isEqualTo("No lifecycle task
submitted for this instance yet.");
+ }
+
+ @Test
+ void testSubmittedTaskSucceeds() throws LifecycleTaskConflictException
+ {
+ // Submit slow start task
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+ CountDownLatch startLatch = slowCassandraStart();
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+
+ LifecycleInfoResponse actualResponse =
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+ LifecycleInfoResponse expectedResponse = new
LifecycleInfoResponse(CassandraState.STOPPED, CassandraState.RUNNING,
+
OperationStatus.CONVERGING,
+
"Submitting start task for instance");
+ assertThat(actualResponse).isEqualTo(expectedResponse);
+
+ // Wait for the task to complete
+ startLatch.countDown();
+ when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+
+ // Check status was updated
+ LifecycleInfoResponse expectedResponseAfterStart = new
LifecycleInfoResponse(CassandraState.RUNNING, CassandraState.RUNNING,
+ OperationStatus.CONVERGED,
+ "Instance has started");
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse actualResponseAfterStart =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(actualResponseAfterStart).isEqualTo(expectedResponseAfterStart);
+ });
+
+ // Attempt to start the instance again, should be no-op since instance
is already running
+ LifecycleInfoResponse responseAfterStartAgain =
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
assertThat(responseAfterStartAgain).isEqualTo(expectedResponseAfterStart);
+ }
+
+ @Test
+ void testSubmittedTaskFails() throws LifecycleTaskConflictException
+ {
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+ String errorMessage = "Cannot find Cassandra executable to start
instance.";
+ doThrow(new
RuntimeException(errorMessage)).when(mockLifecycleProvider).start(TEST_HOST_META);
+
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse response =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(response.currentState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(response.desiredState()).isEqualTo(CassandraState.RUNNING);
+ assertThat(response.status()).isEqualTo(OperationStatus.DIVERGED);
+ assertThat(response.lastUpdate()).isEqualTo(String.format("Failed
to start instance 127.0.0.1: %s", errorMessage));
+ });
+
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+ reset(mockLifecycleProvider);
+
+ when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse newResponse =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(newResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(newResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(newResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(newResponse.lastUpdate()).isEqualTo("Instance has
started");
+ });
+ }
+
+ @Test
+ void testSubmitTaskWhenInProgressThrowsException() throws
LifecycleTaskConflictException
+ {
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+ // Mock a slow start operation
+ slowCassandraStart();
+
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+ assertThatThrownBy(() ->
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING))
+ .isExactlyInstanceOf(LifecycleTaskConflictException.class)
+ .hasMessage("Cannot update lifecycle state of instance " + TEST_HOST +
" to RUNNING. Task already in progress for this host.");
+ }
+
+ @Test
+ void testSubmitNewTaskSucceedsAfterOldTaskFinishes() throws
LifecycleTaskConflictException
+ {
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+ when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse firstResponse =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(firstResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(firstResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(firstResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(firstResponse.lastUpdate()).isEqualTo("Instance has
started");
+ });
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+ loopAssert(1, 200, () -> {
+ try
+ {
+ lifecycleManager.updateDesiredState(TEST_HOST,
CassandraState.STOPPED);
+ }
+ catch (LifecycleTaskConflictException e)
+ {
+ // Ignore and retry in case the previous task is still in
progress
+ }
+ LifecycleInfoResponse finalResponse =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(finalResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(finalResponse.desiredState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(finalResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(finalResponse.lastUpdate()).isEqualTo("Instance has
stopped");
+ });
+ }
+
+ @Test
+ void testStateChangesUnexpectedlyFlapping() throws
LifecycleTaskConflictException
+ {
+ // Update state to RUNNING
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+ when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse response =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(response.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(response.desiredState()).isEqualTo(CassandraState.RUNNING);
+ assertThat(response.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(response.lastUpdate()).isEqualTo("Instance has
started");
+ });
+
+ // Now simulate instance getting stopped without a STOP lifecycle task
being submitted
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse divergedResponse =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(divergedResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(divergedResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(divergedResponse.status()).isEqualTo(OperationStatus.DIVERGED);
+ assertThat(divergedResponse.lastUpdate()).isEqualTo("Instance
127.0.0.1 has unexpectedly diverged from the desired state RUNNING to
STOPPED.");
+ });
+
+ // Now simulate instance getting running without a START lifecycle
task being submitted
+ when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+ LifecycleInfoResponse convergedResponse =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+
assertThat(convergedResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(convergedResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(convergedResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+ assertThat(convergedResponse.lastUpdate()).isEqualTo("Instance
127.0.0.1 has converged back to the desired state RUNNING.");
+ }
+
+ @Test
+ void testCanSubmitTasksForIndependentHosts() throws
LifecycleTaskConflictException
+ {
+
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
when(mockLifecycleProvider.isRunning(TEST_HOST_2_META)).thenReturn(true);
+
+ // Mock a slow start operation
+ CountDownLatch startLatch = slowCassandraStart();
+ // Mock a slow stop operation
+ CountDownLatch stopLatch = slowCassandraStop();
+
+ LifecycleManager lifecycleManager = new
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+ lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+ lifecycleManager.updateDesiredState(TEST_HOST_2,
CassandraState.STOPPED);
+
+
+ loopAssert(1, 200, () -> {
+ LifecycleInfoResponse response1 =
lifecycleManager.getLifecycleInfo(TEST_HOST);
+ LifecycleInfoResponse response2 =
lifecycleManager.getLifecycleInfo(TEST_HOST_2);
+
+
assertThat(response1.currentState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(response1.desiredState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(response1.status()).isEqualTo(OperationStatus.CONVERGING);
+ assertThat(response1.lastUpdate()).isEqualTo("Starting instance");
+
+
assertThat(response2.currentState()).isEqualTo(CassandraState.RUNNING);
+
assertThat(response2.desiredState()).isEqualTo(CassandraState.STOPPED);
+
assertThat(response2.status()).isEqualTo(OperationStatus.CONVERGING);
+ assertThat(response2.lastUpdate()).isEqualTo("Stopping instance");
+ });
+
+ startLatch.countDown();
+ stopLatch.countDown();
+ }
+
+ private @NotNull CountDownLatch slowCassandraStop()
+ {
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ try
+ {
+ stopLatch.await(5, TimeUnit.SECONDS);
+ return null;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }).when(mockLifecycleProvider).stop(TEST_HOST_2_META);
+ return stopLatch;
+ }
+
+ private @NotNull CountDownLatch slowCassandraStart()
+ {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ try
+ {
+ startLatch.await(5, TimeUnit.SECONDS);
+ return null;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }).when(mockLifecycleProvider).start(TEST_HOST_META);
+ return startLatch;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]