yifan-c commented on code in PR #144:
URL: https://github.com/apache/cassandra-sidecar/pull/144#discussion_r1906304018


##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tabl
         jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
                  .forceKeyspaceCleanup(concurrency, keyspace, table);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isDecommissioning()
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String nodeOperationMode = ssProxy.getOperationMode();
+        // The following check is equivalent to the operation-mode checks 
within Cassandra to determine if a
+        // decommission operation is in progress
+        return nodeOperationMode.equals("LEAVING") || 
nodeOperationMode.equals("DECOMMISSION_FAILED");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public OperationalJobStatus decommission(boolean force)
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String mode = ssProxy.getOperationMode();
+        switch(mode)
+        {
+            case "LEAVING":
+            case "DECOMMISSION_FAILED":
+                return OperationalJobStatus.RUNNING;

Review Comment:
   Can you comment why `DECOMMISSION_FAILED` is considered `RUNNING`? 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tabl
         jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
                  .forceKeyspaceCleanup(concurrency, keyspace, table);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isDecommissioning()
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String nodeOperationMode = ssProxy.getOperationMode();

Review Comment:
   How about extract out into a method to retrieve the operationMode? It is 
used in other locations. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -44,7 +43,7 @@ public abstract class OperationalJob implements Task<Void>
     // use v1 time-based uuid
     public final UUID jobId;
 
-    private final Promise<Void> executionPromise;
+    private final Promise<OperationalJobStatus> executionPromise;

Review Comment:
   I am not convinced of having `Promise<OperationalJobStatus>`. The future 
state is already good enough to map to `OperationalJobStatus`. 
   
   From the semantics, status should not be read from future. The callsite 
should be able to query the status directly, w/o extracting from future. 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tabl
         jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
                  .forceKeyspaceCleanup(concurrency, keyspace, table);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isDecommissioning()
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String nodeOperationMode = ssProxy.getOperationMode();
+        // The following check is equivalent to the operation-mode checks 
within Cassandra to determine if a
+        // decommission operation is in progress
+        return nodeOperationMode.equals("LEAVING") || 
nodeOperationMode.equals("DECOMMISSION_FAILED");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public OperationalJobStatus decommission(boolean force)
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String mode = ssProxy.getOperationMode();
+        switch(mode)
+        {
+            case "LEAVING":

Review Comment:
   nit: define enum for the modes. 



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)

Review Comment:
   can you use only 2 nodes cluster for this and other test cases in the class? 
I think it is good enough for testing decommission. We want to create clusters 
with less nodes to use less testing resources. 



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)
+    void decommissionNodeDefault(VertxTestContext context) throws 
InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], SUCCEEDED, null);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = 
false)
+    void decommissionNodeWithFailure(VertxTestContext context,
+                                     ConfigurableCassandraTestContext 
cassandraTestContext) throws InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        cassandraTestContext.configureAndStartCluster(builder -> {
+            builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+        });
+
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], FAILED, 
DECOMMISSION_FAILED_MESSAGE);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void pollStatusForState(VertxTestContext context,
+                                    String uuid,
+                                    OperationalJobStatus expectedStatus,
+                                    String expectedReason)
+    {
+
+        int attempts = 10;
+        String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+        AtomicBoolean stateReached = new AtomicBoolean(false);
+        logger.info("Job Stats Attempt:" + attempts);
+        final int[] counter = {0};

Review Comment:
   how about just use `AtomicInteger`? We do not need the atomicity property 
here, but its API is way more expressive than `counter[0]++`. 



##########
server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java:
##########
@@ -55,26 +54,29 @@
  */
 class OperationalJobManagerTest
 {
-    @Mock
-    SidecarConfiguration mockConfig;
+//    @Mock
+//    SidecarConfiguration mockConfig;
 
     protected Vertx vertx;
 
+    protected ExecutorPools executorPool;
+
     @BeforeEach
     void setup()
     {
         vertx = Vertx.vertx();
+        executorPool = new ExecutorPools(vertx, new 
ServiceConfigurationImpl());
         MockitoAnnotations.openMocks(this);
-        ServiceConfiguration mockServiceConfig = 
mock(ServiceConfiguration.class);
-        when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
-        
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);
+//        ServiceConfiguration mockServiceConfig = 
mock(ServiceConfiguration.class);
+//        
when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
+//        
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);

Review Comment:
   Update?



##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -664,6 +665,16 @@ public CompletableFuture<ListOperationalJobsResponse> 
listOperationalJobs(Sideca
                                             .build());
     }
 
+    /**
+     * Executes the node decommission request using the default retry policy 
and configured selection policy
+     *
+     * @return a completable future of the jobs list
+     */
+    public CompletableFuture<NodeDecommissionResponse> nodeDecommission()
+    {
+        return 
executor.executeRequestAsync(requestBuilder().nodeDecommissionRequest().build());
+    }

Review Comment:
   Given decommission is per node, the client should want to talk to a specific 
instance, i.e. taking `SidecarInstance` as parameter. 
   
   Please take 
`org.apache.cassandra.sidecar.client.SidecarClient#operationalJobs` for 
example. 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tabl
         jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
                  .forceKeyspaceCleanup(concurrency, keyspace, table);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isDecommissioning()
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        String nodeOperationMode = ssProxy.getOperationMode();
+        // The following check is equivalent to the operation-mode checks 
within Cassandra to determine if a
+        // decommission operation is in progress
+        return nodeOperationMode.equals("LEAVING") || 
nodeOperationMode.equals("DECOMMISSION_FAILED");

Review Comment:
   Cassandra code use this condition to check if the node is decommissioning. 
Why do we differ from the implementation? 
   
   ```java
       public boolean isDecommissioning()
       {
           return operationMode() == LEAVING;
       }
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -95,12 +94,18 @@ public boolean isStale(long referenceTimestampInMillis, 
long ttlInMillis)
         return referenceTimestampInMillis - createdAt > ttlInMillis;
     }
 
+    /**
+     * The concrete-job-specific implementation to determine if the job is 
running on the Cassandra node.
+     * @return true if the job is running on the Cassandra node. For example, 
node decommission is tracked by the
+     * operationMode exposed from Cassandra.
+     */
+    public abstract boolean isRunningOnCassandra();
+
+

Review Comment:
   nit: usually keep just one empty line after the end of method



##########
server/src/main/java/org/apache/cassandra/sidecar/job/DecommissionJob.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission 
operation.
+ */
+public class DecommissionJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DecommissionJob.class);
+    private static final String operation = "decommission";
+    private final boolean isForce;
+    protected StorageOperations storageOperations;
+
+    public DecommissionJob(UUID jobId, StorageOperations storageOps, boolean 
isForce)
+    {
+        super(jobId);
+        this.storageOperations = storageOps;
+        this.isForce = isForce;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        return storageOperations.isDecommissioning();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+
+    protected OperationalJobStatus executeInternal() throws 
OperationalJobException
+    {
+        LOGGER.info("Executing decommission operation. jobId={}", jobId);
+        return storageOperations.decommission(isForce);
+    }
+
+    @Override
+    public void close()
+    {
+        super.close();
+    }

Review Comment:
   seems redundant, if there is no concrete close action. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/DecommissionJob.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission 
operation.
+ */
+public class DecommissionJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DecommissionJob.class);
+    private static final String operation = "decommission";
+    private final boolean isForce;
+    protected StorageOperations storageOperations;
+
+    public DecommissionJob(UUID jobId, StorageOperations storageOps, boolean 
isForce)
+    {
+        super(jobId);
+        this.storageOperations = storageOps;
+        this.isForce = isForce;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        return storageOperations.isDecommissioning();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+
+    protected OperationalJobStatus executeInternal() throws 
OperationalJobException
+    {
+        LOGGER.info("Executing decommission operation. jobId={}", jobId);
+        return storageOperations.decommission(isForce);

Review Comment:
   The method signature defines that the thrown exception is 
`OperationalJobException`. There is no catch block to convert the exceptions to 
`OperationalJobException`.



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -193,11 +199,41 @@ public void execute(Promise<Void> promise)
     {
         isExecuting = true;
         LOGGER.info("Executing job. jobId={}", jobId);
-        promise.future().onComplete(executionPromise);
+
+        promise.future().onComplete(res -> {
+            if (res.succeeded())
+            {
+                try
+                {
+                    OperationalJobStatus result = executeInternal();
+                    executionPromise.tryComplete(result);
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Complete job execution. jobId={} 
status={}", jobId, status());
+                    }
+                }
+                catch (Throwable e)
+                {
+                    executionPromise.tryFail(e);
+                }
+            }
+            else
+            {
+                Throwable cause = res.cause();
+                executionPromise.tryFail(cause);
+            }
+        });
+
+
+
+
+
+
         try
         {
             // Blocking call to perform concrete job-specific execution, 
returning the status
-            executeInternal();
+            OperationalJobStatus result = executeInternal();
+            executionPromise.tryComplete(result);

Review Comment:
   The change does not look correct. 



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/NodeDecommissionResponse.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+
+/**
+ * Response structure of the jobs status API
+ */
+public class NodeDecommissionResponse
+{
+    private final UUID jobId;
+    private final OperationalJobStatus status;
+
+    private final String instance;
+
+    private final String reason;

Review Comment:
   nit: remove empty lines



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java:
##########
@@ -36,7 +36,8 @@ public static OperationalJobException wraps(Throwable 
throwable)
         }
         else
         {
-            return new OperationalJobException(throwable.getMessage(), 
throwable);
+            String msg = throwable.getCause() != null ? 
throwable.getCause().getMessage() : throwable.getMessage();
+            return new OperationalJobException(msg, throwable);

Review Comment:
   Would it look odd to use the message of the throwable's cause, but set the 
throwable as the cause of `OperationalJobException`? The message of the cause 
of throwable is going to repeat in the stack trace. 



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java:
##########
@@ -126,6 +126,7 @@ public final class ApiEndpointsV1
     public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
     public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
 
+    public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + 
"/node/decommission";

Review Comment:
   I am trying to understand the role the url segment `/node` plays here. It 
seems as clear as `/api/v1/cassandra/decommission`, but this is shorter w/o the 
`/node` segment. 
   Or, if we want to follow the `resource/identifier` structure strictly, 
`/operations` is a better placement for `/node`, i.e. 
`/api/v1/cassandra/operations/decommission`, where `decommission` is viewed as 
an identifier of the operation. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java:
##########
@@ -76,22 +81,33 @@ public OperationalJob getJobIfExists(UUID jobId)
      * The job execution failure behavior is tracked within the {@link 
OperationalJob}.
      *
      * @param job OperationalJob instance to submit
-     * @return OperationalJob instance that is submitted
      * @throws OperationalJobConflictException when the same operational job 
is already running on Cassandra
      */
-    public OperationalJob trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
+    public void trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
     {
         checkConflict(job);
 
         // New job is submitted for all cases when we do not have a 
corresponding downstream job
-        return jobTracker.computeIfAbsent(job.jobId, jobId -> job);
+        jobTracker.computeIfAbsent(job.jobId, jobId -> {
+            internalExecutorPool.executeBlocking(() -> {
+                job.execute(Promise.promise());
+                return null;
+            });
+            return job;
+        });

Review Comment:
   Although `executeBlocking` with promise is deprecated, let's still use it 
for the succinct code it produces here. We can migrate away from the deprecated 
method all together in future refactoring. 
   
   ```suggestion
           jobTracker.computeIfAbsent(job.jobId, jobId -> {
               internalExecutorPool.executeBlocking(job::execute);
               return job;
           });
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding 
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+    private final OperationalJobManager jobManager;
+    private final ServiceConfiguration config;
+    private boolean isForce;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+                                      ExecutorPools executorPools,
+                                      ServiceConfiguration 
serviceConfiguration,
+                                      CassandraInputValidator validator,
+                                      OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+
+    protected void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+        OperationalJob job = new DecommissionJob(UUIDs.timeBased(), 
operations, isForce);
+        jobManager.trySubmitJob(job);
+
+        job.asyncResult(executorPools.service(),
+                        
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(), 
ChronoUnit.MILLIS))
+           .onSuccess(f -> sendResponse(context, job, request, remoteAddress, 
host))
+           .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+    }
+
+    private void sendResponse(RoutingContext context, OperationalJob job, Void 
request, SocketAddress remoteAddress, String host)
+    {
+        OperationalJobStatus jobStatus = job.status();
+        logger.info("Job completion status={}, request={}, remoteAddress={}, 
instance={}",

Review Comment:
   remove commas.
   ```suggestion
           logger.info("Job completion status={} request={} remoteAddress={} 
instance={}",
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding 
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+    private final OperationalJobManager jobManager;
+    private final ServiceConfiguration config;
+    private boolean isForce;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+                                      ExecutorPools executorPools,
+                                      ServiceConfiguration 
serviceConfiguration,
+                                      CassandraInputValidator validator,
+                                      OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+
+    protected void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+        OperationalJob job = new DecommissionJob(UUIDs.timeBased(), 
operations, isForce);

Review Comment:
   `isForce` can be local variable, instead of being an instance variable.



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/NodeDecommissionResponse.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+
+/**
+ * Response structure of the jobs status API
+ */
+public class NodeDecommissionResponse

Review Comment:
   Is it a dup of 
`org.apache.cassandra.sidecar.common.response.OperationalJobResponse`? Can we 
only use `OperationalJobResponse`? 
   
   The response payload should be the same between the endpoint that triggers 
an operation and the endpoint that queries the operation status. 



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)
+    void decommissionNodeDefault(VertxTestContext context) throws 
InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], SUCCEEDED, null);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = 
false)
+    void decommissionNodeWithFailure(VertxTestContext context,
+                                     ConfigurableCassandraTestContext 
cassandraTestContext) throws InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        cassandraTestContext.configureAndStartCluster(builder -> {
+            builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+        });
+
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], FAILED, 
DECOMMISSION_FAILED_MESSAGE);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void pollStatusForState(VertxTestContext context,
+                                    String uuid,
+                                    OperationalJobStatus expectedStatus,
+                                    String expectedReason)
+    {
+

Review Comment:
   nit: remove the empty line.



##########
server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java:
##########
@@ -60,6 +60,7 @@ void testCqlSessionProviderWorksAsExpected(VertxTestContext 
context, CassandraTe
     throws Exception
     {
         UpgradeableCluster cluster = cassandraTestContext.cluster();
+

Review Comment:
   nit: unrelated change



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding 
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+    private final OperationalJobManager jobManager;
+    private final ServiceConfiguration config;
+    private boolean isForce;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+                                      ExecutorPools executorPools,
+                                      ServiceConfiguration 
serviceConfiguration,
+                                      CassandraInputValidator validator,
+                                      OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+
+    protected void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+        OperationalJob job = new DecommissionJob(UUIDs.timeBased(), 
operations, isForce);
+        jobManager.trySubmitJob(job);
+
+        job.asyncResult(executorPools.service(),
+                        
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(), 
ChronoUnit.MILLIS))
+           .onSuccess(f -> sendResponse(context, job, request, remoteAddress, 
host))
+           .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+    }
+
+    private void sendResponse(RoutingContext context, OperationalJob job, Void 
request, SocketAddress remoteAddress, String host)
+    {
+        OperationalJobStatus jobStatus = job.status();
+        logger.info("Job completion status={}, request={}, remoteAddress={}, 
instance={}",
+                    jobStatus, request, remoteAddress, host);
+
+        String reason = null;
+        switch(jobStatus)
+        {
+            case SUCCEEDED:
+                context.response().setStatusCode(HttpResponseStatus.OK.code());
+                break;
+            case FAILED:
+                reason = job.asyncResult().cause().getMessage();
+                
context.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+                break;

Review Comment:
   Should the status of failed job be `OK` too? 
   
   It is the job that fails. The result should be indicated in the response 
payload. The http request is successful, i.e. 200. 
   
   There is almost the same logic in 
`org.apache.cassandra.sidecar.routes.OperationalJobHandler`



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)
+    void decommissionNodeDefault(VertxTestContext context) throws 
InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], SUCCEEDED, null);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = 
false)
+    void decommissionNodeWithFailure(VertxTestContext context,
+                                     ConfigurableCassandraTestContext 
cassandraTestContext) throws InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        cassandraTestContext.configureAndStartCluster(builder -> {
+            builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+        });
+
+        final String[] jobId = new String[1];

Review Comment:
   AtomicReference<String>?



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)
+    void decommissionNodeDefault(VertxTestContext context) throws 
InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], SUCCEEDED, null);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = 
false)
+    void decommissionNodeWithFailure(VertxTestContext context,
+                                     ConfigurableCassandraTestContext 
cassandraTestContext) throws InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        cassandraTestContext.configureAndStartCluster(builder -> {
+            builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+        });
+
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], FAILED, 
DECOMMISSION_FAILED_MESSAGE);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void pollStatusForState(VertxTestContext context,
+                                    String uuid,
+                                    OperationalJobStatus expectedStatus,
+                                    String expectedReason)
+    {
+
+        int attempts = 10;
+        String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+        AtomicBoolean stateReached = new AtomicBoolean(false);
+        logger.info("Job Stats Attempt:" + attempts);

Review Comment:
   use placeholder `{}` for logging messages



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to 
decommission node";
+
+    @CassandraIntegrationTest(nodesPerDc = 5)
+    void decommissionNodeDefault(VertxTestContext context) throws 
InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], SUCCEEDED, null);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = 
false)
+    void decommissionNodeWithFailure(VertxTestContext context,
+                                     ConfigurableCassandraTestContext 
cassandraTestContext) throws InterruptedException
+    {
+        BBHelperDecommissionNode.reset();
+        cassandraTestContext.configureAndStartCluster(builder -> {
+            builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+        });
+
+        final String[] jobId = new String[1];
+        String testRoute = "/api/v1/cassandra/node/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           logger.info("Response Status:" + 
response.statusCode());
+                                           NodeDecommissionResponse 
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           jobId[0] = 
String.valueOf(decommissionResponse.jobId());
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        pollStatusForState(context, jobId[0], FAILED, 
DECOMMISSION_FAILED_MESSAGE);
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void pollStatusForState(VertxTestContext context,
+                                    String uuid,
+                                    OperationalJobStatus expectedStatus,
+                                    String expectedReason)
+    {
+
+        int attempts = 10;
+        String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+        AtomicBoolean stateReached = new AtomicBoolean(false);
+        logger.info("Job Stats Attempt:" + attempts);
+        final int[] counter = {0};
+        int finalAttempts = attempts;
+        vertx.setPeriodic(10000, id -> {
+            counter[0]++;
+            testWithClient(false, client -> client.get(server.actualPort(), 
"127.0.0.1", status)
+                                                  
.send(context.succeeding(resp -> {
+                                                      if (resp.statusCode() == 
HttpResponseStatus.OK.code())
+                                                      {
+                                                          
stateReached.set(true);
+                                                          logger.info("Success 
Status Response code:" + resp.statusCode());
+                                                          logger.info("Status 
Response:" + resp.bodyAsString());
+                                                          
OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
+                                                          
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+                                                          
assertThat(jobStatusResp.status()).isEqualTo(expectedStatus);
+                                                          
assertThat(jobStatusResp.reason()).isEqualTo(expectedReason);
+                                                          
assertThat(jobStatusResp.operation()).isEqualTo("decommission");
+                                                      }
+                                                      else
+                                                      {
+                                                          
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
+                                                          logger.info("Status 
Response code:" + resp.statusCode());
+                                                          logger.info("Status 
Response:" + resp.bodyAsString());
+                                                          
OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
+                                                          
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+                                                          // 
assertThat(jobStatusResp).isNull();
+                                                      }
+                                                      logger.info("Request 
completed");
+                                                  })));
+            if (stateReached.get() || counter[0] == finalAttempts)
+            {
+                vertx.cancelTimer(id);
+                assertThat(stateReached.get()).isTrue();
+                context.completeNow();
+            }
+        });

Review Comment:
   This looks odd. 
   
   `WebClient` sends requests asyncly. `testWithClient` returns immediately. It 
checks the condition `stateReached.get() || counter[0] == finalAttempts` 
prematurely. 
   
   
   I would simplify the method to this with blocking client to make it easier 
to script the repeated check logic. Note that `VertxTestContext` is no longer 
needed when using the client blockingly.  
   
   ```java
       private void pollStatusForState(String uuid,
                                       OperationalJobStatus expectedStatus,
                                       String expectedReason)
       {
           int attempts = 10;
           String status = "/api/v1/cassandra/operational-jobs/" + uuid;
           AtomicBoolean stateReached = new AtomicBoolean(false);
           logger.info("Job Stats Attempt: {}", attempts);
           AtomicInteger counter = new AtomicInteger(0);
           loopAssert(30, () -> {
               counter.incrementAndGet();
               // todo: create a helper method in the base class to get 
response in the blocking manner
               HttpResponse<Buffer> resp = client.get(server.actualPort(), 
"127.0.0.1", status)
                                                 
.send().toCompletionStage().toCompletableFuture().get();
               logger.info("Success Status Response code: {}", 
resp.statusCode());
               logger.info("Status Response: {}", resp.bodyAsString());
               if (resp.statusCode() == HttpResponseStatus.OK.code())
               {
                   stateReached.set(true);
                   OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
                   
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
                   assertThat(jobStatusResp.status()).isEqualTo(expectedStatus);
                   assertThat(jobStatusResp.reason()).isEqualTo(expectedReason);
                   
assertThat(jobStatusResp.operation()).isEqualTo("decommission");
               }
               else
               {
                   
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
                   OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
                   
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
                   // assertThat(jobStatusResp).isNull();
               }
               logger.info("Request completed");
               assertThat(stateReached.get()).isTrue();
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to