This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fed08b5b CASSSIDECAR-266: Add lifecycle APIs for starting and stopping 
Cassandra (#256)
fed08b5b is described below

commit fed08b5bb7474f3fb4caecce4acb0838c9b15371
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Fri Sep 26 18:03:23 2025 -0400

    CASSSIDECAR-266: Add lifecycle APIs for starting and stopping Cassandra 
(#256)
    
    Patch by Andrés Beck-Ruiz, Paulo Motta; Reviewed by Francisco Guerrero, 
Yifan Cai for CASSSIDECAR-266
    
    ---------
    
    Co-authored-by: Paulo Motta <[email protected]>
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   3 +
 .../cassandra/sidecar/common/data/Lifecycle.java   |  93 ++++++
 .../common/request/LifecycleInfoRequest.java       |  46 +++
 .../common/request/LifecycleUpdateRequest.java     |  55 ++++
 .../common/response/LifecycleInfoResponse.java     | 125 +++++++++
 .../cassandra/sidecar/client/RequestContext.java   |  26 ++
 .../cassandra/sidecar/client/SidecarClient.java    |  40 +++
 .../sidecar/client/SidecarClientTest.java          |  47 ++++
 .../lifecycle/InJvmDTestLifecycleProvider.java     |  76 +++++
 .../testing/SharedClusterIntegrationTestBase.java  |  71 +++++
 .../InJvmLifecycleProviderIntegrationTest.java     |  86 ++++++
 .../LifecycleProviderIntegrationTester.java        | 210 ++++++++++++++
 .../acl/authorization/BasicPermissions.java        |   4 +
 .../sidecar/config/LifecycleConfiguration.java     |  37 +++
 .../sidecar/config/SidecarConfiguration.java       |   5 +
 .../config/yaml/LifecycleConfigurationImpl.java    |  70 +++++
 .../config/yaml/SidecarConfigurationImpl.java      |  24 ++
 .../exceptions/LifecycleTaskConflictException.java |  30 ++
 .../sidecar/handlers/LifecycleInfoHandler.java     |  90 ++++++
 .../sidecar/handlers/LifecycleUpdateHandler.java   | 117 ++++++++
 .../sidecar/lifecycle/LifecycleManager.java        | 196 +++++++++++++
 .../sidecar/lifecycle/LifecycleProvider.java       |  50 ++++
 .../lifecycle/ProcessLifecycleProvider.java        |  55 ++++
 .../cassandra/sidecar/modules/LifecycleModule.java | 140 +++++++++
 .../cassandra/sidecar/modules/SidecarModules.java  |  11 +-
 .../modules/multibindings/VertxRouteMapKeys.java   |  10 +
 .../sidecar/handlers/LifecycleInfoHandlerTest.java | 168 +++++++++++
 .../handlers/LifecycleUpdateHandlerTest.java       | 205 ++++++++++++++
 .../sidecar/lifecycle/LifecycleManagerTest.java    | 312 +++++++++++++++++++++
 30 files changed, 2398 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cf443a80..f57d67b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Add lifecycle APIs for starting and stopping Cassandra (CASSSIDECAR-266)
  * Implementation of CassandraClusterSchemaMonitor (CASSSIDECAR-245)
  * Sidecar endpoint for vending statistics related to compaction 
(CASSSIDECAR-329)
  * Update logging dependencies (CASSSIDECAR-337)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 52bce4a0..d67e4cdc 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -167,6 +167,9 @@ public final class ApiEndpointsV1
     // when a user visits /openapi
     public static final String OPENAPI_HTML_ROUTE = "/openapi/*";
 
+    // Lifecycle APIs
+    public static final String LIFECYCLE_ROUTE = API_V1 + CASSANDRA + 
"/lifecycle";
+
     private ApiEndpointsV1()
     {
         throw new IllegalStateException(getClass() + " is a constants 
container and shall not be instantiated");
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
new file mode 100644
index 00000000..dc329df9
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/Lifecycle.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import static 
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload.State;
+
+/**
+ * Utilities for representing the lifecycle state of a Cassandra instance and 
the status of
+ * lifecycle operations executed by the LifecycleManager.
+ */
+public final class Lifecycle
+{
+    /**
+     * Represents the lifecycle state of a Cassandra instance.
+     */
+    public enum CassandraState
+    {
+        /**
+         * The state when a desired lifecycle state has not been submitted yet
+         */
+        UNKNOWN,
+        /**
+         * The state when a Cassandra process is running
+         */
+        RUNNING,
+        /**
+         * The state when a Cassandra process is not running
+         */
+        STOPPED;
+
+        public boolean isRunning()
+        {
+            return this == RUNNING;
+        }
+
+        public static CassandraState fromNodeCommandState(State state)
+        {
+            return state == State.START ? RUNNING : STOPPED;
+        }
+    }
+
+    /**
+     * Represents the status of a LifecycleManager operation to converge the 
current lifecycle state
+     * of a Cassandra instance to the desired state.
+     */
+    public enum OperationStatus
+    {
+        /**
+         * The status when a desired lifecycle state has not been submitted yet
+         */
+        UNDEFINED,
+        /**
+         * The status when the current lifecycle state of an instance matches 
the desired lifecycle state
+         */
+        CONVERGED,
+        /**
+         * The status when the current lifecycle state of an instance does not 
match the desired lifecycle state,
+         * and there is no operation in progress to match the states
+         */
+        DIVERGED,
+        /**
+         * The status when the current lifecycle state of an instance does not 
match the desired lifecycle state,
+         * and there is an operation in progress to start or stop the instance 
to match the states
+         */
+        CONVERGING;
+
+        public boolean isCompleted()
+        {
+            return this == CONVERGED || this == DIVERGED;
+        }
+
+        public boolean isConverged()
+        {
+            return this == CONVERGED;
+        }
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
new file mode 100644
index 00000000..4953506b
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleInfoRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+
+/**
+ * Represents a request to retrieve the Cassandra lifecycle information
+ */
+public class LifecycleInfoRequest extends JsonRequest<LifecycleInfoResponse>
+{
+    /**
+     * Constructs a request to retrieve the Cassandra lifecycle information
+     */
+    public LifecycleInfoRequest()
+    {
+        super(ApiEndpointsV1.LIFECYCLE_ROUTE);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
new file mode 100644
index 00000000..cb945b2c
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LifecycleUpdateRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+
+/**
+ * Lifecycle update request
+ */
+public class LifecycleUpdateRequest extends JsonRequest<LifecycleInfoResponse>
+{
+    private final NodeCommandRequestPayload requestPayload;
+
+    /**
+     * Constructs a lifecycle update request with the provided parameters
+     *
+     * @param state "start" or "stop" indicating the desired operation
+     */
+    public LifecycleUpdateRequest(NodeCommandRequestPayload.State state)
+    {
+        super(ApiEndpointsV1.LIFECYCLE_ROUTE);
+        this.requestPayload = new NodeCommandRequestPayload(state.toValue());
+    }
+
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.PUT;
+    }
+
+    @Override
+    public Object requestBody()
+    {
+        return requestPayload;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
new file mode 100644
index 00000000..a2a859f3
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LifecycleInfoResponse.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+
+/**
+ * A class representing a response for the {@code LifecycleInfoRequest}.
+ */
+public class LifecycleInfoResponse
+{
+
+    private final CassandraState currentState;
+    private final CassandraState desiredState;
+    private final OperationStatus status;
+    private final String lastUpdate;
+
+    /**
+     * Constructs a {@link LifecycleInfoResponse} object with the {@code 
currentState}, {@code intendedState}, {@code result},
+     * and {@code message}
+     *
+     * @param currentState the current state of the Cassandra node
+     * @param desiredState the intended state of the Cassandra node
+     * @param status the result of the last lifecycle operation
+     * @param lastUpdate a message providing additional context about the last 
lifecycle operation
+     */
+    @JsonCreator
+    public LifecycleInfoResponse(@JsonProperty("current_state") CassandraState 
currentState,
+                                 @JsonProperty("desired_state") CassandraState 
desiredState,
+                                 @JsonProperty("status") OperationStatus 
status,
+                                 @JsonProperty("last_update") String 
lastUpdate)
+    {
+        this.currentState = Objects.requireNonNull(currentState, "State must 
be non-null");
+        this.desiredState = desiredState;
+        this.status = status;
+        this.lastUpdate = lastUpdate;
+    }
+
+    /**
+     * @return the current state of the Cassandra node
+     */
+    @JsonProperty("current_state")
+    public CassandraState currentState()
+    {
+        return currentState;
+    }
+
+    /**
+     * @return the intended state of the Cassandra node
+     */
+    @JsonProperty("desired_state")
+    public CassandraState desiredState()
+    {
+        return desiredState;
+    }
+
+    /**
+     * @return the status of the last lifecycle operation
+     */
+    @JsonProperty("status")
+    public OperationStatus status()
+    {
+        return status;
+    }
+
+    /**
+     * @return message providing additional context about the last lifecycle 
operation
+     */
+    @JsonProperty("last_update")
+    public String lastUpdate()
+    {
+        return lastUpdate;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof LifecycleInfoResponse)) return false;
+        LifecycleInfoResponse that = (LifecycleInfoResponse) o;
+        return currentState == that.currentState
+               && desiredState == that.desiredState
+               && status == that.status
+               && Objects.equals(lastUpdate, that.lastUpdate);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(currentState, desiredState, status, lastUpdate);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "LifecycleInfoResponse{" +
+               "state=" + currentState +
+               ", intent=" + desiredState +
+               ", result=" + status +
+               ", message='" + lastUpdate + '\'' +
+               '}';
+    }
+}
+
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 9e2765f3..2e630c31 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -39,6 +39,8 @@ import 
org.apache.cassandra.sidecar.common.request.GossipHealthRequest;
 import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
 import org.apache.cassandra.sidecar.common.request.GossipUpdateRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.LifecycleInfoRequest;
+import org.apache.cassandra.sidecar.common.request.LifecycleUpdateRequest;
 import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
 import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
 import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
@@ -89,6 +91,7 @@ public class RequestContext
     protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = 
new NodeDecommissionRequest();
 
     protected static final StreamStatsRequest STREAM_STATS_REQUEST = new 
StreamStatsRequest();
+    protected static final LifecycleInfoRequest LIFECYCLE_INFO_REQUEST = new 
LifecycleInfoRequest();
     protected static final RetryPolicy DEFAULT_NO_RETRY_POLICY = new 
NoRetryPolicy();
     protected static final RetryPolicy 
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
     new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
@@ -614,6 +617,29 @@ public class RequestContext
             return request(STREAM_STATS_REQUEST);
         }
 
+        /**
+         * Sets the {@code request} to be a {@link LifecycleUpdateRequest} for 
the
+         * given {@link NodeCommandRequestPayload.State state}, and returns a 
reference to this Builder enabling method chaining.
+         *
+         * @param state the intended state for a Cassandra node
+         * @return a reference to this Builder
+         */
+        public Builder nodeLifecycleUpdateRequest(@NotNull 
NodeCommandRequestPayload.State state)
+        {
+            return request(new LifecycleUpdateRequest(state));
+        }
+
+        /**
+         * Sets the {@code request} to be a {@link LifecycleInfoRequest} and 
returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder nodeLifecycleInfoRequest()
+        {
+            return request(LIFECYCLE_INFO_REQUEST);
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an instance of {@link 
NoRetryPolicy}
          *
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 5cf1399c..373bd336 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -66,6 +66,7 @@ import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
 import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
 import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
 import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -862,6 +863,45 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                             .build());
     }
 
+    /**
+     * Sends a request to start or stop a provided Cassandra instance.
+     * <p>
+     * This operation asynchronously triggers a start or stop task of the 
Cassandra instance via Sidecar's LifecycleProvider.
+     * The request body must contain a JSON payload with a "state" field, 
which can be either "start" or "stop".
+     * On success of intended state submission, the server responds with HTTP 
202 Accepted
+     * </p>
+     *
+     * @param instance the instance where the request will be executed
+     * @param state the desired node command state: {@link 
NodeCommandRequestPayload.State#START} or {@link 
NodeCommandRequestPayload.State#STOP}
+     * @return a CompletableFuture representing the completion of the operation
+     */
+
+     public CompletableFuture<LifecycleInfoResponse> 
nodeUpdateLifecycle(SidecarInstance instance, NodeCommandRequestPayload.State 
state)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .nodeLifecycleUpdateRequest(state)
+                                            .build());
+    }
+
+    /**
+     * Sends a request to retrieve the current lifecycle information of the 
provided Cassandra instance.
+     * <p>
+     * This operation retrieves the current lifecycle state of the Cassandra 
instance via Sidecar's LifecycleProvider.
+     * On success, the server responds with HTTP 200 OK and a payload 
containing the node's actual lifecycle state, desired state,
+     * status of the last lifecycle operation, and a message providing 
additional context about the last lifecycle operation.
+     * </p>
+     *
+     * @param instance the instance where the request will be executed
+     * @return a CompletableFuture representing the completion of the operation
+     */
+    public CompletableFuture<LifecycleInfoResponse> 
nodeLifecycleInfo(SidecarInstance instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .nodeLifecycleInfoRequest()
+                                            .build());
+    }
 
     /**
      * Returns a copy of the request builder with the default parameters 
configured for the client.
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 95788cb7..b13a209b 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,6 +63,8 @@ import 
org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
 import org.apache.cassandra.sidecar.client.retry.RetryAction;
 import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
 import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
@@ -81,6 +83,7 @@ import 
org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
 import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
 import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
 import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
 import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -1965,6 +1968,50 @@ abstract class SidecarClientTest
         validateResponseServed(fileUrl);
     }
 
+    @Test
+    void testNodeLifecycleInfo() throws Exception
+    {
+        String lifecycleInfoResponse = 
"{\"current_state\":\"RUNNING\",\"desired_state\":\"RUNNING\",\"status\":\"CONVERGED\","
 +
+                                      "\"last_update\":\"Instance has 
started\"}";
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(lifecycleInfoResponse);
+        enqueue(response);
+
+        SidecarInstanceImpl sidecarInstance = instances.get(0);
+        LifecycleInfoResponse result = 
client.nodeLifecycleInfo(sidecarInstance).get(30, TimeUnit.SECONDS);
+        assertThat(result).isNotNull();
+        assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
+        assertThat(result.desiredState()).isEqualTo(CassandraState.RUNNING);
+        assertThat(result.status()).isEqualTo(OperationStatus.CONVERGED);
+        assertThat(result.lastUpdate()).isEqualTo("Instance has started");
+
+        validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE);
+    }
+
+    @Test
+    void testNodeUpdateLifecycle() throws Exception
+    {
+        String lifecycleUpdateResponse = 
"{\"current_state\":\"RUNNING\",\"desired_state\":\"STOPPED\",\"status\":\"CONVERGING\","
 +
+                "\"last_update\":\"Submitting stop task for instance\"}";
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(lifecycleUpdateResponse);
+        enqueue(response);
+
+        SidecarInstanceImpl sidecarInstance = instances.get(0);
+        LifecycleInfoResponse result = 
client.nodeUpdateLifecycle(sidecarInstance, 
NodeCommandRequestPayload.State.STOP)
+                .get(30, TimeUnit.SECONDS);
+
+        assertThat(result).isNotNull();
+        assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
+        assertThat(result.desiredState()).isEqualTo(CassandraState.STOPPED);
+        assertThat(result.status()).isEqualTo(OperationStatus.CONVERGING);
+        assertThat(result.lastUpdate()).isEqualTo("Submitting stop task for 
instance");
+
+        validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE, request -> {
+            String requestBody = request.getBody().readUtf8();
+            assertThat(request.getMethod()).isEqualTo("PUT");
+            assertThat(requestBody).isEqualTo("{\"state\":\"stop\"}");
+        });
+    }
+
     private void enqueue(MockResponse response)
     {
         for (MockWebServer server : servers)
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
new file mode 100644
index 00000000..e95c6add
--- /dev/null
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/lifecycle/InJvmDTestLifecycleProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Manages the lifecycle of JVM Dtest Cassandra instances.
+ * This should be used for integration tests where Cassandra instances are 
started and stopped
+ */
+public class InJvmDTestLifecycleProvider implements LifecycleProvider
+{
+    private final Iterable<? extends IInstance> instances;
+
+    public InJvmDTestLifecycleProvider(Iterable<? extends IInstance> instances)
+    {
+        this.instances = instances;
+    }
+
+    @Override
+    public void start(InstanceMetadata instanceMetadata)
+    {
+        getInstance(instanceMetadata).startup();
+    }
+
+    @Override
+    public void stop(InstanceMetadata instanceMetadata)
+    {
+        try
+        {
+            // Synchronous to ensure JMX port is released before start again
+            getInstance(instanceMetadata).shutdown().get(1, TimeUnit.MINUTES);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean isRunning(InstanceMetadata host)
+    {
+        return !getInstance(host).isShutdown();
+    }
+
+    private IInstance getInstance(InstanceMetadata instanceMetadata)
+    {
+        for (IInstance instance : instances)
+        {
+            if 
(instance.broadcastAddress().getHostName().equals(instanceMetadata.host()))
+            {
+                return instance;
+            }
+        }
+        throw new IllegalArgumentException("No instance found for host: " + 
instanceMetadata);
+    }
+}
diff --git 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 13da5def..8d360801 100644
--- 
a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -25,12 +25,16 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -56,7 +60,10 @@ import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
+import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.json.JsonObject;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -91,6 +98,8 @@ import 
org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.lifecycle.InJvmDTestLifecycleProvider;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
 import org.apache.cassandra.sidecar.modules.SidecarModules;
 import org.apache.cassandra.sidecar.server.Server;
@@ -396,6 +405,32 @@ public abstract class SharedClusterIntegrationTestBase
         .isTrue();
     }
 
+    protected void waitForNodeToBeUp(String hostname, long timeout, TimeUnit 
timeUnit) throws TimeoutException
+    {
+        long startTime = System.nanoTime();
+        while (!serverWrapper.upNodes.contains(hostname))
+        {
+            if (System.nanoTime() - startTime > timeUnit.toNanos(timeout))
+            {
+                throw new TimeoutException("Instance " + hostname + " did not 
come up after " + timeout + ' ' + timeUnit);
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    protected void waitForNodeToBeDown(String hostname, long timeout, TimeUnit 
timeUnit) throws TimeoutException
+    {
+        long startTime = System.nanoTime();
+        while (serverWrapper.upNodes.contains(hostname))
+        {
+            if (System.nanoTime() - startTime > timeUnit.toNanos(timeout))
+            {
+                throw new TimeoutException("Instance " + hostname + " did not 
come down after " + timeout + ' ' + timeUnit);
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+    }
+
     /**
      * Stops the Sidecar service
      *
@@ -403,6 +438,10 @@ public abstract class SharedClusterIntegrationTestBase
      */
     protected void stopSidecar() throws InterruptedException
     {
+        if (serverWrapper == null)
+        {
+            return;
+        }
         closeServer(serverWrapper.server);
     }
 
@@ -546,8 +585,10 @@ public abstract class SharedClusterIntegrationTestBase
     {
         public final Injector injector;
         public final Server server;
+        private final InstancesMetadata instancesMetadata;
         public volatile int serverPort;
         private final CountDownLatch sidecarSchemaReadyLatch = new 
CountDownLatch(1);
+        private final Set<String> upNodes = Collections.newSetFromMap(new 
ConcurrentHashMap<String, Boolean>());
 
         public ServerWrapper(Injector sidecarServerInjector, Server server)
         {
@@ -555,10 +596,33 @@ public abstract class SharedClusterIntegrationTestBase
             this.server = server;
             // Server must have started to retrieve the port
             this.serverPort = server.actualPort();
+            this.instancesMetadata = 
injector.getInstance(InstancesMetadata.class);
 
             Vertx vertx = sidecarServerInjector.getInstance(Vertx.class);
             
vertx.eventBus().localConsumer(SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED.address(),
                                            msg -> 
sidecarSchemaReadyLatch.countDown());
+            
vertx.eventBus().localConsumer(SidecarServerEvents.ON_CASSANDRA_CQL_READY.address(),
+                                           cqlUpHandler());
+            
vertx.eventBus().localConsumer(SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED.address(),
+                                           cqlDownHandler());
+        }
+
+        public Handler<Message<JsonObject>> cqlUpHandler()
+        {
+            return message -> {
+                Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+                String hostname = 
instancesMetadata.instanceFromId(instanceId).host();
+                upNodes.add(hostname);
+            };
+        }
+
+        public Handler<Message<JsonObject>> cqlDownHandler()
+        {
+            return message -> {
+                Integer instanceId = 
message.body().getInteger("cassandraInstanceId");
+                String hostname = 
instancesMetadata.instanceFromId(instanceId).host();
+                upNodes.remove(hostname);
+            };
         }
     }
 
@@ -645,6 +709,13 @@ public abstract class SharedClusterIntegrationTestBase
             return new ClusterLease(ClusterLease.Ownership.CLAIMED);
         }
 
+        @Provides
+        @Singleton
+        public LifecycleProvider lifecycleProvider()
+        {
+            return new InJvmDTestLifecycleProvider(instances);
+        }
+
         private List<InetSocketAddress> buildContactPoints()
         {
             return StreamSupport.stream(instances.spliterator(), false)
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
new file mode 100644
index 00000000..ff8d3f2f
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/InJvmLifecycleProviderIntegrationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests the {@link InJvmDTestLifecycleProvider} to ensure it can start and 
stop a Cassandra node
+ */
+public class InJvmLifecycleProviderIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    static final InstanceMetadata LOCALHOST_METADATA = 
mock(InstanceMetadata.class);
+    private static final String JVM_LIFECYCLE_TEST_MIN_VERSION = "4.1";
+
+    @BeforeAll
+    static void beforeAll()
+    {
+        when(LOCALHOST_METADATA.host()).thenReturn("localhost");
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration().startCluster(false);
+    }
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        // JVM Distributed Test framework contains a bug with restarting nodes 
in version 4.0 (CASSANDRA-19729)
+        assumeThat(SimpleCassandraVersion.create(testVersion.version()))
+        .as("JVM Distributed Test framework contains a bug with restarting 
nodes in version 4.0 (CASSANDRA-19729)")
+        
.isGreaterThanOrEqualTo(SimpleCassandraVersion.create(JVM_LIFECYCLE_TEST_MIN_VERSION));
+    }
+
+    @Test
+    void testInJvmLifecycleProviderStartAndStopAndRecoveryAfterCrash() throws 
Exception
+    {
+        // Simulate node crashing by directly calling the lifecycle provider's 
stop method
+        Runnable cassandraCrasher = () -> {
+            LifecycleProvider lifecycleProvider = 
serverWrapper.injector.getInstance(LifecycleProvider.class);
+            lifecycleProvider.stop(LOCALHOST_METADATA);
+        };
+
+        LifecycleProviderIntegrationTester tester = new 
LifecycleProviderIntegrationTester(
+                trustedClient(),
+                LOCALHOST_METADATA.host(),
+                serverWrapper.serverPort,
+                cassandraCrasher);
+
+        tester.testLifecycleProviderStartAndStopAndRecoveryAfterCrash();
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        // Do nothing
+    }
+}
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
new file mode 100644
index 00000000..c65db08e
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/lifecycle/LifecycleProviderIntegrationTester.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Utility class to test different lifecycle provider implementations.
+ */
+public class LifecycleProviderIntegrationTester
+{
+    protected final Logger logger = 
LoggerFactory.getLogger(LifecycleProviderIntegrationTester.class);
+
+    static final int TIMEOUT_SECONDS = 120;
+
+    final WebClient client;
+    final String sidecarHost;
+    final int sidecarPort;
+    final Runnable cassandraNodeCrasher;
+
+    public LifecycleProviderIntegrationTester(WebClient client, String 
sidecarHost, int sidecarPort, Runnable cassandraNodeCrasher)
+    {
+        this.client = client;
+        this.sidecarHost = sidecarHost;
+        this.sidecarPort = sidecarPort;
+        this.cassandraNodeCrasher = cassandraNodeCrasher;
+    }
+
+    void testLifecycleProviderStartAndStopAndRecoveryAfterCrash() throws 
Exception
+    {
+        // Check CQL health is NOT_OK, since node is not started
+        assertThat(getCqlStatus()).isEqualTo("NOT_OK");
+
+        // GET /lifecycle when node has not started and no intent has been 
submitted
+        HttpResponse<Buffer> lifecycleInfoOnStartup = getLifecycle();
+        assertExpectedLifecycleResponse(lifecycleInfoOnStartup, OK.code(),
+                                        "STOPPED", "UNKNOWN", "UNDEFINED",
+                                        "No lifecycle task submitted for this 
instance yet.");
+
+        // Start node via PUT /lifecycle and wait for node to be up
+        HttpResponse<Buffer> lifecycleUpdateStart = putLifecycle("start");
+        assertExpectedLifecycleResponse(lifecycleUpdateStart, ACCEPTED.code(),
+                                        "STOPPED", "RUNNING", "CONVERGING", 
"Submitting start task for instance");
+
+        waitForLastUpdateToConverge("Instance has started", TIMEOUT_SECONDS);
+        waitForCqlStatus("OK", TIMEOUT_SECONDS);
+
+        // GET /lifecycle after node has started up
+        HttpResponse<Buffer> lifecycleInfoAfterStartup = getLifecycle();
+        assertExpectedLifecycleResponse(lifecycleInfoAfterStartup, OK.code(),
+                                        "RUNNING", "RUNNING", "CONVERGED", 
"Instance has started");
+
+        // Try starting node with PUT /lifecycle again
+        HttpResponse<Buffer> lifecycleInfoAfterStartAgain = 
putLifecycle("start");
+        assertExpectedLifecycleResponse(lifecycleInfoAfterStartAgain, 
OK.code(),
+                                        "RUNNING", "RUNNING", "CONVERGED", 
"Instance has started");
+
+        // Simulate node crashing by directly calling the lifecycle provider's 
stop method
+        cassandraNodeCrasher.run();
+        waitForLastUpdateToConverge(String.format("Instance %s has 
unexpectedly diverged from the desired state RUNNING to STOPPED.", 
sidecarHost), TIMEOUT_SECONDS);
+
+        // CQL status should be down since instance is crashed
+        waitForCqlStatus("NOT_OK", TIMEOUT_SECONDS);
+
+        // GET /lifecycle after node has crashed
+        HttpResponse<Buffer> lifecycleInfoAfterCrash = getLifecycle();
+        assertExpectedLifecycleResponse(lifecycleInfoAfterCrash, OK.code(),
+                                        "STOPPED", "RUNNING", "DIVERGED",
+                                        String.format("Instance %s has 
unexpectedly diverged from the desired state RUNNING to STOPPED.", 
sidecarHost));
+
+        // Try starting node with PUT /lifecycle again
+        HttpResponse<Buffer> lifecycleInfoOnStartAfterCrash = 
putLifecycle("start");
+        assertExpectedLifecycleResponse(lifecycleInfoOnStartAfterCrash, 
ACCEPTED.code(),
+                                        "STOPPED", "RUNNING", "CONVERGING", 
"Submitting start task for instance");
+        waitForLastUpdateToConverge("Instance has started", TIMEOUT_SECONDS);
+        waitForCqlStatus("OK", TIMEOUT_SECONDS);
+
+        // Confirm that the node is up again with GET /lifecycle
+        HttpResponse<Buffer> lifecycleInfoAfterRecovery = getLifecycle();
+        assertExpectedLifecycleResponse(lifecycleInfoAfterRecovery, OK.code(),
+                                        "RUNNING", "RUNNING", "CONVERGED", 
"Instance has started");
+
+        // Stop node via PUT /lifecycle
+        HttpResponse<Buffer> lifecycleUpdateStop = putLifecycle("stop");
+        assertExpectedLifecycleResponse(lifecycleUpdateStop, ACCEPTED.code(),
+                                        "RUNNING", "STOPPED", "CONVERGING", 
"Submitting stop task for instance");
+
+        // Allow time for the async stop task to complete
+        waitForLastUpdateToConverge("Instance has stopped", TIMEOUT_SECONDS);
+
+        // GET /lifecycle after node has stopped
+        HttpResponse<Buffer> lifecycleInfoAfterStop = getLifecycle();
+        assertExpectedLifecycleResponse(lifecycleInfoAfterStop, OK.code(),
+                                        "STOPPED", "STOPPED", "CONVERGED", 
"Instance has stopped");
+
+        // CQL status should be down since instance is stopped
+        assertThat(getCqlStatus()).isEqualTo("NOT_OK");
+
+        // Try stopping node with PUT /lifecycle again, should be idempotent
+        HttpResponse<Buffer> lifecycleInfoOnStopAgain = putLifecycle("stop");
+        assertExpectedLifecycleResponse(lifecycleInfoOnStopAgain, OK.code(),
+                                        "STOPPED", "STOPPED", "CONVERGED", 
"Instance has stopped");
+    }
+
+    private void assertExpectedLifecycleResponse(
+    HttpResponse<Buffer> response,
+    int expectedStatusCode,
+    String currentState,
+    String desiredState,
+    String status,
+    String lastUpdate)
+    {
+        assertThat(response.statusCode()).isEqualTo(expectedStatusCode);
+        JsonObject body = response.bodyAsJsonObject();
+        assertThat(body.getString("current_state")).isEqualTo(currentState);
+        assertThat(body.getString("desired_state")).isEqualTo(desiredState);
+        assertThat(body.getString("status")).isEqualTo(status);
+        assertThat(body.getString("last_update")).isEqualTo(lastUpdate);
+    }
+
+    private HttpResponse<Buffer> getLifecycle()
+    {
+        return getBlocking(client
+                           .get(sidecarPort, sidecarHost, 
"/api/v1/cassandra/lifecycle")
+                           .send());
+    }
+
+    private HttpResponse<Buffer> putLifecycle(String desiredState)
+    {
+        return getBlocking(client
+                           .put(sidecarPort, sidecarHost, 
"/api/v1/cassandra/lifecycle")
+                           .sendBuffer(JsonObject.of("state", 
desiredState).toBuffer()));
+    }
+
+    public void waitForCqlStatus(String expectedStatus, int timeoutSeconds) 
throws TimeoutException
+    {
+        long startTime = System.nanoTime();
+        String cqlStatus = getCqlStatus();
+        while (!cqlStatus.equals(expectedStatus))
+        {
+            if (System.nanoTime() - startTime > 
TimeUnit.SECONDS.toNanos(timeoutSeconds))
+            {
+                throw new TimeoutException("Expected CQL status not reached 
after " + timeoutSeconds + " seconds. " +
+                                           ", status: " + cqlStatus);
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            cqlStatus = getCqlStatus();
+        }
+    }
+
+    private String getCqlStatus()
+    {
+        HttpResponse<Buffer> cqlStatus = getBlocking(
+        client
+        .get(sidecarPort, sidecarHost, "/api/v1/cassandra/native/__health")
+        .send());
+        logger.info("Native health response: {}", cqlStatus.bodyAsString());
+
+        return cqlStatus.bodyAsJsonObject().getString("status");
+    }
+
+    private void waitForLastUpdateToConverge(String expectedLastUpdate, int 
timeoutSeconds) throws TimeoutException
+    {
+        long startTime = System.nanoTime();
+        String lastUpdate = 
getLifecycle().bodyAsJsonObject().getString("last_update");
+        while (!lastUpdate.equals(expectedLastUpdate))
+        {
+            if (System.nanoTime() - startTime > 
TimeUnit.SECONDS.toNanos(timeoutSeconds))
+            {
+                throw new TimeoutException("Expected lifecycle update not 
reached after " + timeoutSeconds + " seconds. " +
+                                           ", last_update: " + lastUpdate);
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            lastUpdate = 
getLifecycle().bodyAsJsonObject().getString("last_update");
+        }
+    }
+
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 0a809b1b..b57ac623 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -84,4 +84,8 @@ public class BasicPermissions
     // Live Migration permissions
     public static final Permission LIST_FILES = new 
DomainAwarePermission("LIVE_MIGRATION:LIST_FILES", CLUSTER_SCOPE);
     public static final Permission STREAM_FILES = new 
DomainAwarePermission("LIVE_MIGRATION:STREAM", CLUSTER_SCOPE);
+
+    // Lifecycle permissions
+    public static final Permission READ_LIFECYCLE = new 
DomainAwarePermission("LIFECYCLE:READ", CLUSTER_SCOPE);
+    public static final Permission MODIFY_LIFECYCLE = new 
DomainAwarePermission("LIFECYCLE:MODIFY", CLUSTER_SCOPE);
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
new file mode 100644
index 00000000..7268384f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/LifecycleConfiguration.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+/**
+ * Configuration for Cassandra lifecycle management service
+ */
+public interface LifecycleConfiguration
+{
+
+    /**
+     * @return {@code true} if lifecycle management is enabled, {@code false} 
otherwise
+     */
+    boolean enabled();
+
+    /**
+     * @return configuration needed for setting up lifecycle in Sidecar
+     */
+    ParameterizedClassConfiguration lifecycleProvider();
+
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 011b9f7c..917c3b55 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -109,4 +109,9 @@ public interface SidecarConfiguration
      * @return the configuration for live migration
      */
     LiveMigrationConfiguration liveMigrationConfiguration();
+
+    /**
+     * @return the configuration for lifecycle management
+     */
+    LifecycleConfiguration lifecycleConfiguration();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
new file mode 100644
index 00000000..aa13b7d3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LifecycleConfigurationImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import java.util.Collections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
+import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
+import org.apache.cassandra.sidecar.lifecycle.ProcessLifecycleProvider;
+
+/**
+ * Configuration for Cassandra lifecycle management service
+ */
+public class LifecycleConfigurationImpl implements LifecycleConfiguration
+{
+    private static final boolean DEFAULT_ENABLED = false;
+    private static final ParameterizedClassConfiguration 
DEFAULT_LIFECYCLE_PROVIDER =
+            new 
ParameterizedClassConfigurationImpl(ProcessLifecycleProvider.class.getName(), 
Collections.emptyMap());
+
+    protected final Boolean enabled;
+
+    protected final String directory;
+
+    protected final ParameterizedClassConfiguration lifecycleProvider;
+
+    public LifecycleConfigurationImpl()
+    {
+        this(DEFAULT_ENABLED, null, DEFAULT_LIFECYCLE_PROVIDER);
+    }
+
+    @JsonCreator
+    public LifecycleConfigurationImpl(@JsonProperty("enabled") Boolean enabled,
+                                      @JsonProperty("directory") String 
directory,
+                                      @JsonProperty("provider") 
ParameterizedClassConfiguration lifecycleProvider)
+    {
+        this.enabled = enabled;
+        this.directory = directory;
+        this.lifecycleProvider = lifecycleProvider;
+    }
+
+    @JsonProperty(value = "enabled")
+    public boolean enabled()
+    {
+        return enabled;
+    }
+
+    @JsonProperty(value = "provider")
+    public ParameterizedClassConfiguration lifecycleProvider()
+    {
+        return lifecycleProvider;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index 887ace1b..392c12e0 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -42,6 +42,7 @@ import 
org.apache.cassandra.sidecar.config.AccessControlConfiguration;
 import 
org.apache.cassandra.sidecar.config.CassandraInputValidationConfiguration;
 import org.apache.cassandra.sidecar.config.DriverConfiguration;
 import org.apache.cassandra.sidecar.config.InstanceConfiguration;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
 import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
 import org.apache.cassandra.sidecar.config.MetricsConfiguration;
 import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
@@ -115,6 +116,9 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
     @JsonProperty("live_migration")
     private LiveMigrationConfiguration liveMigrationConfiguration;
 
+    @JsonProperty("lifecycle")
+    private LifecycleConfiguration lifecycleConfiguration;
+
     public SidecarConfigurationImpl()
     {
         this(builder());
@@ -138,6 +142,7 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         vertxConfiguration = builder.vertxConfiguration;
         schemaReportingConfiguration = builder.schemaReportingConfiguration;
         liveMigrationConfiguration = builder.liveMigrationConfiguration;
+        lifecycleConfiguration = builder.lifecycleConfiguration;
     }
 
     /**
@@ -301,6 +306,13 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         return liveMigrationConfiguration;
     }
 
+    @Override
+    @JsonProperty("lifecycle")
+    public LifecycleConfiguration lifecycleConfiguration()
+    {
+        return lifecycleConfiguration;
+    }
+
 
     public static SidecarConfigurationImpl readYamlConfiguration(Path 
yamlConfigurationPath) throws IOException
     {
@@ -413,6 +425,7 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         private VertxConfiguration vertxConfiguration = new 
VertxConfigurationImpl();
         private SchemaReportingConfiguration schemaReportingConfiguration = 
new SchemaReportingConfigurationImpl();
         private LiveMigrationConfiguration liveMigrationConfiguration = new 
LiveMigrationConfigurationImpl();
+        private LifecycleConfiguration lifecycleConfiguration = new 
LifecycleConfigurationImpl();
 
         protected Builder()
         {
@@ -606,6 +619,17 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
             return update(b -> b.liveMigrationConfiguration = 
liveMigrationConfiguration);
         }
 
+        /**
+         * Sets the {@code lifecycleConfiguration} and returns a reference to 
this Builder enabling method chaining.
+         *
+         * @param lifecycleConfiguration the {@code lifecycleConfiguration} to 
set
+         * @return a reference to this Builder
+         */
+        public Builder lifecycleConfiguration(LifecycleConfiguration 
lifecycleConfiguration)
+        {
+            return update(b -> b.lifecycleConfiguration = 
lifecycleConfiguration);
+        }
+
         /**
          * Returns a {@code SidecarConfigurationImpl} built from the 
parameters previously set.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
new file mode 100644
index 00000000..e789f90a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/LifecycleTaskConflictException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.exceptions;
+
+/**
+ * Exception thrown when a lifecycle operation is already in progress for a 
Cassandra host.
+ */
+public class LifecycleTaskConflictException extends Exception
+{
+    public LifecycleTaskConflictException(String message)
+    {
+        super(message);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
new file mode 100644
index 00000000..f887169b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Handles {@code GET /api/v1/cassandra/lifecycle} requests for retrieving 
lifecycle information
+ * for a given Cassandra node.
+ */
+@Singleton
+public class LifecycleInfoHandler extends AbstractHandler<Void> implements 
AccessProtected
+{
+
+    private final LifecycleManager lifecycleManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the metadata fetcher
+     * @param executorPools executor pools for blocking executions
+     */
+    @Inject
+    public LifecycleInfoHandler(InstanceMetadataFetcher metadataFetcher, 
ExecutorPools executorPools, LifecycleManager lifecycleManager)
+    {
+        super(metadataFetcher, executorPools, null);
+        this.lifecycleManager = lifecycleManager;
+    }
+
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        return 
Collections.singleton(BasicPermissions.READ_LIFECYCLE.toAuthorization());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               @NotNull String host,
+                               SocketAddress remoteAddress,
+                               Void request)
+    {
+        executorPools.service()
+                     .executeBlocking(() -> 
lifecycleManager.getLifecycleInfo(host))
+                     .onSuccess(context::json)
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
new file mode 100644
index 00000000..6f69fcec
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import 
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.utils.HttpExceptions;
+
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Handles {@code PUT /api/v1/cassandra/lifecycle} requests to start or stop a 
Cassandra node.
+ *
+ * <p> Expects a JSON payload:
+ * { "state": "start" } or { "state": "stop" }
+ * and will record the desired state. </p>
+ */
+@Singleton
+public class LifecycleUpdateHandler extends NodeCommandHandler implements 
AccessProtected
+{
+    private final LifecycleManager lifecycleManager;
+
+    @Inject
+    public LifecycleUpdateHandler(InstanceMetadataFetcher metadataFetcher, 
ExecutorPools executorPools, LifecycleManager lifecycleManager)
+    {
+        super(metadataFetcher, executorPools, null);
+        this.lifecycleManager = lifecycleManager;
+    }
+
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        return 
Collections.singleton(BasicPermissions.MODIFY_LIFECYCLE.toAuthorization());
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  NodeCommandRequestPayload request)
+    {
+        CassandraState desiredState = 
CassandraState.fromNodeCommandState(request.state());
+        executorPools.service()
+                     .executeBlocking(() -> 
lifecycleManager.updateDesiredState(host, desiredState))
+                     .onSuccess(info ->
+                                {
+                                    HttpServerResponse response = 
context.response().putHeader("Content-Type", "application/json");
+                                    switch (info.status())
+                                    {
+                                        case CONVERGED:
+                                            
response.setStatusCode(HttpResponseStatus.OK.code());
+                                            break;
+                                        case CONVERGING:
+                                            
response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
+                                            break;
+                                        default:
+                                            logger.warn("{} request failed 
with unexpected result. request={}, remoteAddress={}, instance={}, 
operationStatus={}",
+                                                        
this.getClass().getSimpleName(), request, remoteAddress, host, info.status());
+                                            
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+                                    }
+                                    response.end(Json.encode(info));
+                                })
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+    }
+
+    @Override
+    protected void processFailure(Throwable cause, RoutingContext context, 
String host, SocketAddress remoteAddress, NodeCommandRequestPayload request)
+    {
+        if (cause instanceof LifecycleTaskConflictException)
+        {
+            HttpException httpException = 
HttpExceptions.wrapHttpException(HttpResponseStatus.CONFLICT,
+                                                                           
cause.getMessage(), cause);
+            logger.warn("{} request failed due to lifecycle conflict. 
request={}, remoteAddress={}, instance={}",
+                        this.getClass().getSimpleName(), request, 
remoteAddress, host, cause);
+            context.fail(httpException);
+        }
+        else
+        {
+            super.processFailure(cause, context, host, remoteAddress, request);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
new file mode 100644
index 00000000..18338741
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+/**
+ * Manages the lifecycle intents for local Cassandra hosts and submits tasks 
to the
+ * LifecycleProvider, while ensuring that only one task can be active per host 
at a time.
+ */
+@Singleton
+public class LifecycleManager
+{
+    protected static final Logger LOG = 
LoggerFactory.getLogger(LifecycleManager.class);
+
+    private final InstanceMetadataFetcher metadataFetcher;
+    private final LifecycleProvider lifecycleProvider;
+    private final ExecutorPools executorPools;
+
+    private final Set<String> convergingInstances = 
ConcurrentHashMap.newKeySet();
+    private final Map<String, OperationStatus> lastCompletedStatus = new 
ConcurrentHashMap<>();
+    private final Map<String, CassandraState> desiredStateByInstance = new 
ConcurrentHashMap<>();
+    private final Map<String, String> lastUpdateMsgByInstance = new 
ConcurrentHashMap<>();
+
+    @Inject
+    public LifecycleManager(InstanceMetadataFetcher metadataFetcher,
+                            LifecycleProvider lifecycleProvider,
+                            ExecutorPools executorPools)
+    {
+        this.metadataFetcher = metadataFetcher;
+        this.lifecycleProvider = lifecycleProvider;
+        this.executorPools = executorPools;
+    }
+
+    public synchronized LifecycleInfoResponse updateDesiredState(String 
instanceId, CassandraState desiredState)
+                                                        throws 
LifecycleTaskConflictException
+    {
+        // Nothing to do: already in successful desired state
+        LifecycleInfoResponse current = getLifecycleInfo(instanceId);
+        if  (current.desiredState() == desiredState && current.status() == 
OperationStatus.CONVERGED)
+        {
+            return current;
+        }
+        // Make sure there is a single lifecycle task running at a time per 
instance
+        if (!convergingInstances.add(instanceId))
+        {
+            throw new LifecycleTaskConflictException(String.format("Cannot 
update lifecycle state of instance %s to %s. " +
+                                                                   "Task 
already in progress for this host.",
+                                                                   instanceId, 
desiredState));
+        }
+        // Converge state asynchronously and cleanup in-progress tasks after 
completion or error
+        try
+        {
+            lastCompletedStatus.remove(instanceId);
+            desiredStateByInstance.put(instanceId, desiredState);
+            lastUpdateMsgByInstance.put(instanceId, "Submitting " +
+                    (desiredState.isRunning() ? "start" : "stop") + " task for 
instance");
+            LifecycleInfoResponse updatedDesireLifecycleInfo = 
getLifecycleInfo(instanceId);
+            Future<Void> convergeTask = desiredState.isRunning()
+                                          ? submitStartTask(instanceId)
+                                          : submitStopTask(instanceId);
+            convergeTask.onFailure(x -> LOG.warn("Unexpected failure while 
converging state.", x))
+                        .onComplete(x -> 
convergingInstances.remove(instanceId));
+            return updatedDesireLifecycleInfo;
+        }
+        catch (Exception e)
+        {
+            LOG.error("Unexpected error while submitting lifecycle task for 
instance {}: {}", instanceId, e.getMessage());
+            convergingInstances.remove(instanceId);
+            throw e;
+        }
+    }
+
+    public synchronized LifecycleInfoResponse getLifecycleInfo(String 
instanceId)
+    {
+        CassandraState currentState = lifecycleProvider
+            .isRunning(metadata(instanceId)) ? CassandraState.RUNNING : 
CassandraState.STOPPED;
+        CassandraState desiredState = 
this.desiredStateByInstance.getOrDefault(instanceId, CassandraState.UNKNOWN);
+        OperationStatus currentStatus = getStatus(instanceId, currentState, 
desiredState);
+        maybeRefreshLastCompletedStatus(instanceId, currentStatus, 
currentState, desiredState);
+        String lastUpdate = 
this.lastUpdateMsgByInstance.getOrDefault(instanceId, "No lifecycle task 
submitted for this instance yet.");
+        return new LifecycleInfoResponse(currentState, desiredState, 
currentStatus, lastUpdate);
+    }
+
+    /**
+     * This method keeps track of the last completed status of an instance and 
logs a warning if the operation status has changed unexpectedly.
+     * This is useful for detecting unexpected state changes that may indicate 
issues with the lifecycle management.
+     * See @{link LifecycleManagerTest#testStateChangesUnexpectedlyFlapping} 
for an example test case.
+     */
+    private void maybeRefreshLastCompletedStatus(String instanceId, 
OperationStatus currentStatus, CassandraState currentState,
+                                                 CassandraState desiredState)
+    {
+        if (!currentStatus.isCompleted()) // this logic is only valid after a 
lifecycle task has completed
+            return;
+        OperationStatus lastStatus = lastCompletedStatus.put(instanceId, 
currentStatus);
+        if (lastStatus != null && lastStatus != currentStatus)
+        {
+            String msg = currentStatus.isConverged() ?
+                         String.format("Instance %s has converged back to the 
desired state %s.", instanceId, desiredState) :
+                         String.format("Instance %s has unexpectedly diverged 
from the desired state %s to %s.", instanceId, desiredState, currentState);
+            lastUpdateMsgByInstance.put(instanceId, msg);
+            LOG.warn(msg);
+        }
+    }
+
+    private Future<Void> submitStartTask(String instanceId)
+    {
+        return executorPools.internal().runBlocking(() ->
+        {
+            try
+            {
+                lastUpdateMsgByInstance.put(instanceId, "Starting instance");
+                lifecycleProvider.start(metadata(instanceId));
+                lastUpdateMsgByInstance.put(instanceId, "Instance has 
started");
+            }
+            catch (Exception e)
+            {
+                LOG.error("Failed to start instance {}: {}", instanceId, 
e.getMessage());
+                lastUpdateMsgByInstance.put(instanceId, String.format("Failed 
to start instance %s: %s", instanceId, e.getMessage()));
+            }
+        });
+    }
+
+    private Future<Void> submitStopTask(String instanceId)
+    {
+        return executorPools.internal().runBlocking(() ->
+        {
+            try
+            {
+                lastUpdateMsgByInstance.put(instanceId, "Stopping instance");
+                lifecycleProvider.stop(metadata(instanceId));
+                lastUpdateMsgByInstance.put(instanceId, "Instance has 
stopped");
+            }
+            catch (Exception e)
+            {
+                LOG.error("Failed to stop instance {}: {}", instanceId, 
e.getMessage());
+                lastUpdateMsgByInstance.put(instanceId, String.format("Failed 
to stop instance %s: %s", instanceId, e.getMessage()));
+            }
+        });
+    }
+
+    private OperationStatus getStatus(String instanceId, CassandraState 
currentState, CassandraState desiredState)
+    {
+        if (convergingInstances.contains(instanceId) && desiredState != 
currentState)
+        {
+            return OperationStatus.CONVERGING;
+        }
+        if (desiredState == CassandraState.UNKNOWN)
+        {
+            return OperationStatus.UNDEFINED;
+        }
+        if (desiredState != currentState)
+        {
+            return OperationStatus.DIVERGED;
+        }
+        return OperationStatus.CONVERGED;
+    }
+
+    private InstanceMetadata metadata(String host)
+    {
+        return metadataFetcher.instance(host);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
new file mode 100644
index 00000000..49897990
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/LifecycleProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Manages the lifecycle of Cassandra instances through Sidecar
+ */
+public interface LifecycleProvider
+{
+    /**
+     * Start a Cassandra process
+     *
+     *  @param instance Cassandra instance metadata
+     */
+    void start(InstanceMetadata instance);
+
+    /**
+     * Stop a Cassandra process
+     *
+     * @param instance Cassandra instance metadata
+     */
+    void stop(InstanceMetadata instance);
+
+    /**
+     * Check whether a Cassandra process is running or not
+     *
+     * @param instance Cassandra instance metadata
+     * @return true if the Cassandra process is running
+     */
+    boolean isRunning(InstanceMetadata instance);
+
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
new file mode 100644
index 00000000..681dc990
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/lifecycle/ProcessLifecycleProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.Map;
+
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * A {@link LifecycleProvider} that manages Cassandra instances as OS 
processes.
+ * <p>
+ * This implementation is a placeholder and is not yet implemented.
+ */
+public class ProcessLifecycleProvider implements LifecycleProvider
+{
+    public ProcessLifecycleProvider(Map<String, String> params)
+    {
+        // Params unused for now
+    }
+
+    @Override
+    public void start(InstanceMetadata instance)
+    {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void stop(InstanceMetadata instance)
+    {
+        throw new UnsupportedOperationException("Not implemented yet");
+
+    }
+
+    @Override
+    public boolean isRunning(InstanceMetadata instance)
+    {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
new file mode 100644
index 00000000..6da8f3c5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LifecycleModule.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.modules;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoMap;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.PUT;
+import jakarta.ws.rs.Path;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
+import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.handlers.LifecycleInfoHandler;
+import org.apache.cassandra.sidecar.handlers.LifecycleUpdateHandler;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider;
+import org.apache.cassandra.sidecar.lifecycle.ProcessLifecycleProvider;
+import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
+import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
+import org.apache.cassandra.sidecar.routes.RouteBuilder;
+import org.apache.cassandra.sidecar.routes.VertxRoute;
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.eclipse.microprofile.openapi.annotations.media.Content;
+import org.eclipse.microprofile.openapi.annotations.media.Schema;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Provides the telemetry capability
+ */
+public class LifecycleModule extends AbstractModule
+{
+    @Provides
+    @Singleton
+    LifecycleProvider lifecycleProvider(SidecarConfiguration 
sidecarConfiguration)
+    {
+        LifecycleConfiguration lifecycleConfiguration = 
sidecarConfiguration.lifecycleConfiguration();
+        if (!lifecycleConfiguration.enabled())
+        {
+            return getNoopProvider();
+        }
+
+        ParameterizedClassConfiguration providerClass = 
lifecycleConfiguration.lifecycleProvider();
+        if (providerClass == null)
+        {
+            throw new ConfigurationException("Lifecycle management is enabled, 
but provider not set.");
+        }
+
+        if 
(providerClass.className().equalsIgnoreCase(ProcessLifecycleProvider.class.getName()))
+        {
+            Map<String, String> namedParams = providerClass.namedParameters();
+            return new ProcessLifecycleProvider(namedParams != null ? 
namedParams : Collections.emptyMap());
+        }
+
+        throw new ConfigurationException("Unrecognized authorization provider 
" + providerClass.className() + " set");
+    }
+
+    private static @NotNull LifecycleProvider getNoopProvider()
+    {
+        return new LifecycleProvider()
+        {
+            @Override
+            public void start(InstanceMetadata instance)
+            {
+                throw new UnsupportedOperationException("Lifecycle management 
is disabled. Cannot start instance " + instance.host());
+            }
+
+            @Override
+            public void stop(InstanceMetadata instance)
+            {
+                throw new UnsupportedOperationException("Lifecycle management 
is disabled. Cannot stop instance " + instance.host());
+            }
+
+            @Override
+            public boolean isRunning(InstanceMetadata instance)
+            {
+                throw new UnsupportedOperationException("Lifecycle management 
is disabled. Cannot check if instance " + instance.host() + " is running");
+            }
+        };
+    }
+
+    @PUT
+    @Path(ApiEndpointsV1.LIFECYCLE_ROUTE)
+    @Operation(summary = "Updates desired lifecycle state",
+            description = "Updates the desired lifecycle state for a local 
Cassandra node. Valid states are 'RUNNING' or 'STOPPED'.")
+    @APIResponse(description = "Desired lifecycle state updated successfully",
+            responseCode = "202",
+            content = @Content(mediaType = "application/json",
+            schema = @Schema(implementation = LifecycleInfoResponse.class)))
+    @ProvidesIntoMap
+    @KeyClassMapKey(VertxRouteMapKeys.LifecycleUpdateRouteKey.class)
+    VertxRoute lifecycleUpdateRoute(RouteBuilder.Factory factory,
+                                    LifecycleUpdateHandler 
lifecycleUpdateHandler)
+    {
+        return factory.builderForRoute()
+                      .setBodyHandler(true)
+                      .handler(lifecycleUpdateHandler)
+                      .build();
+    }
+
+    @GET
+    @Path(ApiEndpointsV1.LIFECYCLE_ROUTE)
+    @Operation(summary = "Gets lifecycle information",
+            description = "Returns the lifecycle information for a local 
Cassandra node")
+    @APIResponse(description = "Lifecycle information retrieved successfully",
+            responseCode = "200",
+            content = @Content(mediaType = "application/json",
+            schema = @Schema(implementation = LifecycleInfoResponse.class)))
+    @ProvidesIntoMap
+    @KeyClassMapKey(VertxRouteMapKeys.LifecycleInfoRouteKey.class)
+    VertxRoute lifecycleInfoRoute(RouteBuilder.Factory factory,
+                                    LifecycleInfoHandler lifecycleInfoHandler)
+    {
+        return factory.buildRouteWithHandler(lifecycleInfoHandler);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
index 9aa5b057..dc798177 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
@@ -51,17 +51,18 @@ public class SidecarModules
                        new CdcModule(),
                        new ConfigurationModule(confPath),
                        new CoordinationModule(),
-                       new SchemaReportingModule(),
                        new HealthCheckModule(),
+                       new LifecycleModule(),
+                       new LiveMigrationModule(),
+                       new MultiBindingTypeResolverModule(),
+                       new OpenApiModule(),
                        new RestoreJobModule(),
                        new SchedulingModule(),
+                       new SchemaReportingModule(),
                        new SidecarSchemaModule(),
                        new SSTablesAccessModule(),
                        new TelemetryModule(),
-                       new UtilitiesModule(),
-                       new MultiBindingTypeResolverModule(),
-                       new LiveMigrationModule(),
-                       new OpenApiModule());
+                       new UtilitiesModule());
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index 48b1e38f..e3bec846 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -183,6 +183,16 @@ public interface VertxRouteMapKeys
         HttpMethod HTTP_METHOD = HttpMethod.GET;
         String ROUTE_URI = ApiEndpointsV1.KEYSPACE_SCHEMA_ROUTE;
     }
+    interface LifecycleInfoRouteKey extends RouteClassKey
+    {
+        HttpMethod HTTP_METHOD = HttpMethod.GET;
+        String ROUTE_URI = ApiEndpointsV1.LIFECYCLE_ROUTE;
+    }
+    interface LifecycleUpdateRouteKey extends RouteClassKey
+    {
+        HttpMethod HTTP_METHOD = HttpMethod.PUT;
+        String ROUTE_URI = ApiEndpointsV1.LIFECYCLE_ROUTE;
+    }
     interface ListCassandraOperationalJobRouteKey extends RouteClassKey
     {
         HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
new file mode 100644
index 00000000..0a363b3b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleInfoHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link LifecycleInfoHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class LifecycleInfoHandlerTest
+{
+
+    Vertx vertx;
+    Server server;
+    LifecycleManager mockLifecycleManager = mock(LifecycleManager.class);
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule()).with(new 
LifecycleInfoHandlerTestModule());
+        injector = 
Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start().onSuccess(s -> 
context.completeNow()).onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        getBlocking(TestResourceReaper.create().with(server).close(), 60, 
TimeUnit.SECONDS, "Closing server");
+    }
+
+    @Test
+    void testGetLifecycleInfoNoTask(VertxTestContext context)
+    {
+        // Mock response for initial state with no task submitted
+        LifecycleInfoResponse expectedResponse = new 
LifecycleInfoResponse(CassandraState.RUNNING, CassandraState.RUNNING,
+                OperationStatus.CONVERGED, "All good");
+        
when(mockLifecycleManager.getLifecycleInfo(anyString())).thenReturn(expectedResponse);
+
+        WebClient client = WebClient.create(vertx);
+        client.get(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LifecycleInfoResponse lifecycleResponse = 
response.bodyAsJson(LifecycleInfoResponse.class);
+                  assertThat(lifecycleResponse).isEqualTo(expectedResponse);
+                  context.completeNow();
+              }));
+    }
+
+
+    @Test
+    void testGetLifecycleInfoTaskInProgress(VertxTestContext context)
+    {
+        // Mock response for a task in progress
+        LifecycleInfoResponse mockResponse = new LifecycleInfoResponse(
+        CassandraState.STOPPED, CassandraState.RUNNING, 
OperationStatus.CONVERGING, null);
+        
when(mockLifecycleManager.getLifecycleInfo("127.0.0.1")).thenReturn(mockResponse);
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/lifecycle";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LifecycleInfoResponse lifecycleResponse = 
response.bodyAsJson(LifecycleInfoResponse.class);
+                  assertThat(lifecycleResponse).isNotNull();
+                  
assertThat(lifecycleResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+                  
assertThat(lifecycleResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+                  
assertThat(lifecycleResponse.status()).isEqualTo(OperationStatus.CONVERGING);
+                  assertThat(lifecycleResponse.lastUpdate()).isNull();
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testGetLifecycleInfoFinished(VertxTestContext context)
+    {
+        // Mock response for a finished task
+        LifecycleInfoResponse mockResponse = new LifecycleInfoResponse(
+        CassandraState.RUNNING, CassandraState.RUNNING, 
OperationStatus.CONVERGED, "Host has started");
+        
when(mockLifecycleManager.getLifecycleInfo("127.0.0.1")).thenReturn(mockResponse);
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/lifecycle";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LifecycleInfoResponse lifecycleResponse = 
response.bodyAsJson(LifecycleInfoResponse.class);
+                  assertThat(lifecycleResponse).isNotNull();
+                  
assertThat(lifecycleResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+                  
assertThat(lifecycleResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+                  
assertThat(lifecycleResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+                  assertThat(lifecycleResponse.lastUpdate()).isEqualTo("Host 
has started");
+                  context.completeNow();
+              }));
+    }
+
+    /**
+     * Test guice module for {@link LifecycleInfoHandler} tests
+     */
+    class LifecycleInfoHandlerTestModule extends AbstractModule
+    {
+
+        @Provides
+        @Singleton
+        public LifecycleManager intentManager()
+        {
+            return mockLifecycleManager;
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
new file mode 100644
index 00000000..3a70774f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/LifecycleUpdateHandlerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.lifecycle.LifecycleManager;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link LifecycleUpdateHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class LifecycleUpdateHandlerTest
+{
+    Vertx vertx;
+    Server server;
+    LifecycleManager mockLifecycleManager = mock(LifecycleManager.class);
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule()).with(new 
LifecycleUpdateHandlerTestModule());
+        injector = 
Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start().onSuccess(s -> 
context.completeNow()).onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+        reset(mockLifecycleManager);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        getBlocking(TestResourceReaper.create().with(server).close(), 60, 
TimeUnit.SECONDS, "Closing server");
+    }
+
+    @Test
+    void testSuccessfulPutWithAcceptedResponse(VertxTestContext ctx) throws 
LifecycleTaskConflictException
+    {
+        WebClient client = WebClient.create(vertx);
+        JsonObject payload = JsonObject.of("state", "start");
+        LifecycleInfoResponse expectedResponse = new 
LifecycleInfoResponse(CassandraState.STOPPED,
+                                                                           
CassandraState.RUNNING,
+                                                                           
OperationStatus.CONVERGING,
+                                                                           
"Submitted task to start instance");
+        when(mockLifecycleManager.updateDesiredState("127.0.0.1", 
CassandraState.RUNNING))
+                                                                               
 .thenReturn(expectedResponse);
+        client.put(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+                  ctx.verify(() -> {
+                      verify(mockLifecycleManager, 
times(1)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+                      
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+                      assertThat(resp.statusCode()).isEqualTo(ACCEPTED.code());
+                  });
+                  ctx.completeNow();
+              }));
+    }
+
+    @Test
+    void testSuccessfulPutWithOKResponse(VertxTestContext ctx) throws 
LifecycleTaskConflictException
+    {
+        WebClient client = WebClient.create(vertx);
+        JsonObject payload = JsonObject.of("state", "stop");
+        LifecycleInfoResponse expectedResponse = new 
LifecycleInfoResponse(CassandraState.STOPPED,
+                                                                           
CassandraState.STOPPED,
+                                                                           
OperationStatus.CONVERGED,
+                                                                           
"Submitted task to stop instance");
+        when(mockLifecycleManager.updateDesiredState("127.0.0.1", 
CassandraState.STOPPED)).thenReturn(expectedResponse);
+        client.put(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+                  ctx.verify(() -> {
+                      verify(mockLifecycleManager, 
times(1)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+                      
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+                      
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+                  });
+                  ctx.completeNow();
+              }));
+    }
+
+    @Test
+    void testSuccessfulPutWithFailedResponse(VertxTestContext ctx) throws 
LifecycleTaskConflictException
+    {
+        WebClient client = WebClient.create(vertx);
+        JsonObject payload = JsonObject.of("state", "stop");
+        LifecycleInfoResponse expectedResponse = new 
LifecycleInfoResponse(CassandraState.RUNNING,
+                                                                           
CassandraState.STOPPED,
+                                                                           
OperationStatus.DIVERGED,
+                                                                           
"Error while stopping instance");
+        when(mockLifecycleManager.updateDesiredState("127.0.0.1", 
CassandraState.STOPPED)).thenReturn(expectedResponse);
+        client.put(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+                  ctx.verify(() -> {
+                      verify(mockLifecycleManager, 
times(1)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+                      
assertThat(resp.bodyAsJson(LifecycleInfoResponse.class)).isEqualTo(expectedResponse);
+                      
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+                  });
+                  ctx.completeNow();
+              }));
+    }
+
+    @Test
+    void testInvalidState(VertxTestContext ctx)
+    {
+        WebClient client = WebClient.create(vertx);
+        JsonObject payload = JsonObject.of("state", "invalid");
+        client.put(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+                  ctx.verify(() -> {
+                      
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+                      verify(mockLifecycleManager, 
times(0)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+                      verify(mockLifecycleManager, 
times(0)).updateDesiredState("127.0.0.1", CassandraState.STOPPED);
+                  });
+                  ctx.completeNow();
+              }));
+    }
+
+    @Test
+    void testSubmitTaskAlreadyInProgress(VertxTestContext ctx) throws 
LifecycleTaskConflictException
+    {
+        // Setup mock to throw conflict exception
+        doThrow(new LifecycleTaskConflictException("Cannot start host 
127.0.0.1. Task already in progress for this host."))
+        .when(mockLifecycleManager).updateDesiredState("127.0.0.1", 
CassandraState.RUNNING);
+
+        WebClient client = WebClient.create(vertx);
+        JsonObject payload = JsonObject.of("state", "start");
+        client.put(server.actualPort(), "127.0.0.1", 
"/api/v1/cassandra/lifecycle")
+              .sendBuffer(payload.toBuffer(), ctx.succeeding(resp -> {
+                  ctx.verify(() -> {
+                      assertThat(resp.statusCode()).isEqualTo(CONFLICT.code());
+                      verify(mockLifecycleManager, 
times(1)).updateDesiredState("127.0.0.1", CassandraState.RUNNING);
+                  });
+                  ctx.completeNow();
+              }));
+    }
+
+    /**
+     * Test guice module for {@link LifecycleUpdateHandler} tests
+     */
+    class LifecycleUpdateHandlerTestModule extends AbstractModule
+    {
+
+        @Provides
+        @Singleton
+        public LifecycleManager lifecycleManager()
+        {
+            return mockLifecycleManager;
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
new file mode 100644
index 00000000..ee6addf6
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/lifecycle/LifecycleManagerTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.lifecycle;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
+import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
+import org.apache.cassandra.sidecar.common.response.LifecycleInfoResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.exceptions.LifecycleTaskConflictException;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link LifecycleManager} to validate lifecycle management behavior
+ */
+class LifecycleManagerTest
+{
+    private static final String TEST_HOST = "127.0.0.1";
+    private static final InstanceMetadata TEST_HOST_META = 
mock(InstanceMetadata.class);
+    private static final String TEST_HOST_2 = "127.0.0.2";
+    private static final InstanceMetadata TEST_HOST_2_META = 
mock(InstanceMetadata.class);
+
+    protected Vertx vertx;
+    protected ExecutorPools executorPools;
+    protected LifecycleProvider mockLifecycleProvider;
+    protected InstanceMetadataFetcher metadataFetcher = 
mock(InstanceMetadataFetcher.class);
+
+    @BeforeEach
+    void setup()
+    {
+        vertx = Vertx.vertx();
+        executorPools = new ExecutorPools(vertx, new 
ServiceConfigurationImpl());
+        mockLifecycleProvider = mock(LifecycleProvider.class);
+        when(metadataFetcher.instance(TEST_HOST)).thenReturn(TEST_HOST_META);
+        
when(metadataFetcher.instance(TEST_HOST_2)).thenReturn(TEST_HOST_2_META);
+    }
+
+    @AfterEach
+    void cleanup()
+    {
+        TestResourceReaper.create().with(vertx).with(executorPools).close();
+    }
+
+    @Test
+    void testGetLifecycleInfoWithNoTaskSubmitted()
+    {
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+        LifecycleInfoResponse response = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+        assertThat(response.currentState()).isEqualTo(CassandraState.STOPPED);
+        assertThat(response.desiredState()).isEqualTo(CassandraState.UNKNOWN);
+        assertThat(response.status()).isEqualTo(OperationStatus.UNDEFINED);
+        assertThat(response.lastUpdate()).isEqualTo("No lifecycle task 
submitted for this instance yet.");
+    }
+
+    @Test
+    void testSubmittedTaskSucceeds() throws LifecycleTaskConflictException
+    {
+        // Submit slow start task
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+        CountDownLatch startLatch = slowCassandraStart();
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+
+        LifecycleInfoResponse actualResponse = 
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+        LifecycleInfoResponse expectedResponse = new 
LifecycleInfoResponse(CassandraState.STOPPED, CassandraState.RUNNING,
+                                                                           
OperationStatus.CONVERGING,
+                                                                           
"Submitting start task for instance");
+        assertThat(actualResponse).isEqualTo(expectedResponse);
+
+        // Wait for the task to complete
+        startLatch.countDown();
+        when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+        
+        // Check status was updated
+        LifecycleInfoResponse expectedResponseAfterStart = new 
LifecycleInfoResponse(CassandraState.RUNNING, CassandraState.RUNNING,
+                                                     OperationStatus.CONVERGED,
+                                                     "Instance has started");
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse actualResponseAfterStart = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(actualResponseAfterStart).isEqualTo(expectedResponseAfterStart);
+        });
+
+        // Attempt to start the instance again, should be no-op since instance 
is already running
+        LifecycleInfoResponse responseAfterStartAgain = 
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+        
assertThat(responseAfterStartAgain).isEqualTo(expectedResponseAfterStart);
+    }
+
+    @Test
+    void testSubmittedTaskFails() throws LifecycleTaskConflictException
+    {
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+        String errorMessage = "Cannot find Cassandra executable to start 
instance.";
+        doThrow(new 
RuntimeException(errorMessage)).when(mockLifecycleProvider).start(TEST_HOST_META);
+
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse response = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(response.currentState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(response.desiredState()).isEqualTo(CassandraState.RUNNING);
+            assertThat(response.status()).isEqualTo(OperationStatus.DIVERGED);
+            assertThat(response.lastUpdate()).isEqualTo(String.format("Failed 
to start instance 127.0.0.1: %s", errorMessage));
+        });
+
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+        reset(mockLifecycleProvider);
+
+        when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse newResponse = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(newResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(newResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(newResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+            assertThat(newResponse.lastUpdate()).isEqualTo("Instance has 
started");
+        });
+    }
+
+    @Test
+    void testSubmitTaskWhenInProgressThrowsException() throws 
LifecycleTaskConflictException
+    {
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        // Mock a slow start operation
+        slowCassandraStart();
+
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+        assertThatThrownBy(() -> 
lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING))
+        .isExactlyInstanceOf(LifecycleTaskConflictException.class)
+        .hasMessage("Cannot update lifecycle state of instance " + TEST_HOST + 
" to RUNNING. Task already in progress for this host.");
+    }
+
+    @Test
+    void testSubmitNewTaskSucceedsAfterOldTaskFinishes() throws 
LifecycleTaskConflictException
+    {
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse firstResponse = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(firstResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(firstResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(firstResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+            assertThat(firstResponse.lastUpdate()).isEqualTo("Instance has 
started");
+        });
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+        loopAssert(1, 200, () -> {
+            try
+            {
+                lifecycleManager.updateDesiredState(TEST_HOST, 
CassandraState.STOPPED);
+            }
+            catch (LifecycleTaskConflictException e)
+            {
+                // Ignore and retry in case the previous task is still in 
progress
+            }
+            LifecycleInfoResponse finalResponse = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(finalResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(finalResponse.desiredState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(finalResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+            assertThat(finalResponse.lastUpdate()).isEqualTo("Instance has 
stopped");
+        });
+    }
+
+    @Test
+    void testStateChangesUnexpectedlyFlapping() throws 
LifecycleTaskConflictException
+    {
+        // Update state to RUNNING
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse response = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(response.currentState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(response.desiredState()).isEqualTo(CassandraState.RUNNING);
+            assertThat(response.status()).isEqualTo(OperationStatus.CONVERGED);
+            assertThat(response.lastUpdate()).isEqualTo("Instance has 
started");
+        });
+
+        // Now simulate instance getting stopped without a STOP lifecycle task 
being submitted
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse divergedResponse = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            
assertThat(divergedResponse.currentState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(divergedResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(divergedResponse.status()).isEqualTo(OperationStatus.DIVERGED);
+            assertThat(divergedResponse.lastUpdate()).isEqualTo("Instance 
127.0.0.1 has unexpectedly diverged from the desired state RUNNING to 
STOPPED.");
+        });
+
+        // Now simulate instance getting running without a START lifecycle 
task being submitted
+        when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(true);
+        LifecycleInfoResponse convergedResponse = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+        
assertThat(convergedResponse.currentState()).isEqualTo(CassandraState.RUNNING);
+        
assertThat(convergedResponse.desiredState()).isEqualTo(CassandraState.RUNNING);
+        
assertThat(convergedResponse.status()).isEqualTo(OperationStatus.CONVERGED);
+        assertThat(convergedResponse.lastUpdate()).isEqualTo("Instance 
127.0.0.1 has converged back to the desired state RUNNING.");
+    }
+
+    @Test
+    void testCanSubmitTasksForIndependentHosts() throws 
LifecycleTaskConflictException
+    {
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_META)).thenReturn(false);
+        
when(mockLifecycleProvider.isRunning(TEST_HOST_2_META)).thenReturn(true);
+
+        // Mock a slow start operation
+        CountDownLatch startLatch = slowCassandraStart();
+        // Mock a slow stop operation
+        CountDownLatch stopLatch = slowCassandraStop();
+
+        LifecycleManager lifecycleManager = new 
LifecycleManager(metadataFetcher, mockLifecycleProvider, executorPools);
+        lifecycleManager.updateDesiredState(TEST_HOST, CassandraState.RUNNING);
+        lifecycleManager.updateDesiredState(TEST_HOST_2, 
CassandraState.STOPPED);
+
+
+        loopAssert(1, 200, () -> {
+            LifecycleInfoResponse response1 = 
lifecycleManager.getLifecycleInfo(TEST_HOST);
+            LifecycleInfoResponse response2 = 
lifecycleManager.getLifecycleInfo(TEST_HOST_2);
+
+            
assertThat(response1.currentState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(response1.desiredState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(response1.status()).isEqualTo(OperationStatus.CONVERGING);
+            assertThat(response1.lastUpdate()).isEqualTo("Starting instance");
+
+            
assertThat(response2.currentState()).isEqualTo(CassandraState.RUNNING);
+            
assertThat(response2.desiredState()).isEqualTo(CassandraState.STOPPED);
+            
assertThat(response2.status()).isEqualTo(OperationStatus.CONVERGING);
+            assertThat(response2.lastUpdate()).isEqualTo("Stopping instance");
+        });
+
+        startLatch.countDown();
+        stopLatch.countDown();
+    }
+
+    private @NotNull CountDownLatch slowCassandraStop()
+    {
+        CountDownLatch stopLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            try
+            {
+                stopLatch.await(5, TimeUnit.SECONDS);
+                return null;
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }).when(mockLifecycleProvider).stop(TEST_HOST_2_META);
+        return stopLatch;
+    }
+
+    private @NotNull CountDownLatch slowCassandraStart()
+    {
+        CountDownLatch startLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            try
+            {
+                startLatch.await(5, TimeUnit.SECONDS);
+                return null;
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }).when(mockLifecycleProvider).start(TEST_HOST_META);
+        return startLatch;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to