yifan-c commented on code in PR #144:
URL: https://github.com/apache/cassandra-sidecar/pull/144#discussion_r1906304018
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.forceKeyspaceCleanup(concurrency, keyspace, table);
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isDecommissioning()
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String nodeOperationMode = ssProxy.getOperationMode();
+ // The following check is equivalent to the operation-mode checks
within Cassandra to determine if a
+ // decommission operation is in progress
+ return nodeOperationMode.equals("LEAVING") ||
nodeOperationMode.equals("DECOMMISSION_FAILED");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OperationalJobStatus decommission(boolean force)
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String mode = ssProxy.getOperationMode();
+ switch(mode)
+ {
+ case "LEAVING":
+ case "DECOMMISSION_FAILED":
+ return OperationalJobStatus.RUNNING;
Review Comment:
Can you comment why `DECOMMISSION_FAILED` is considered `RUNNING`?
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.forceKeyspaceCleanup(concurrency, keyspace, table);
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isDecommissioning()
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String nodeOperationMode = ssProxy.getOperationMode();
Review Comment:
How about extract out into a method to retrieve the operationMode? It is
used in other locations.
##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -44,7 +43,7 @@ public abstract class OperationalJob implements Task<Void>
// use v1 time-based uuid
public final UUID jobId;
- private final Promise<Void> executionPromise;
+ private final Promise<OperationalJobStatus> executionPromise;
Review Comment:
I am not convinced of having `Promise<OperationalJobStatus>`. The future
state is already good enough to map to `OperationalJobStatus`.
From the semantics, status should not be read from future. The callsite
should be able to query the status directly, w/o extracting from future.
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.forceKeyspaceCleanup(concurrency, keyspace, table);
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isDecommissioning()
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String nodeOperationMode = ssProxy.getOperationMode();
+ // The following check is equivalent to the operation-mode checks
within Cassandra to determine if a
+ // decommission operation is in progress
+ return nodeOperationMode.equals("LEAVING") ||
nodeOperationMode.equals("DECOMMISSION_FAILED");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OperationalJobStatus decommission(boolean force)
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String mode = ssProxy.getOperationMode();
+ switch(mode)
+ {
+ case "LEAVING":
Review Comment:
nit: define enum for the modes.
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
Review Comment:
can you use only 2 nodes cluster for this and other test cases in the class?
I think it is good enough for testing decommission. We want to create clusters
with less nodes to use less testing resources.
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], SUCCEEDED, null);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster =
false)
+ void decommissionNodeWithFailure(VertxTestContext context,
+ ConfigurableCassandraTestContext
cassandraTestContext) throws InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ cassandraTestContext.configureAndStartCluster(builder -> {
+ builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+ });
+
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], FAILED,
DECOMMISSION_FAILED_MESSAGE);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(VertxTestContext context,
+ String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+
+ int attempts = 10;
+ String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+ AtomicBoolean stateReached = new AtomicBoolean(false);
+ logger.info("Job Stats Attempt:" + attempts);
+ final int[] counter = {0};
Review Comment:
how about just use `AtomicInteger`? We do not need the atomicity property
here, but its API is way more expressive than `counter[0]++`.
##########
server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java:
##########
@@ -55,26 +54,29 @@
*/
class OperationalJobManagerTest
{
- @Mock
- SidecarConfiguration mockConfig;
+// @Mock
+// SidecarConfiguration mockConfig;
protected Vertx vertx;
+ protected ExecutorPools executorPool;
+
@BeforeEach
void setup()
{
vertx = Vertx.vertx();
+ executorPool = new ExecutorPools(vertx, new
ServiceConfigurationImpl());
MockitoAnnotations.openMocks(this);
- ServiceConfiguration mockServiceConfig =
mock(ServiceConfiguration.class);
- when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
-
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);
+// ServiceConfiguration mockServiceConfig =
mock(ServiceConfiguration.class);
+//
when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
+//
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);
Review Comment:
Update?
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -664,6 +665,16 @@ public CompletableFuture<ListOperationalJobsResponse>
listOperationalJobs(Sideca
.build());
}
+ /**
+ * Executes the node decommission request using the default retry policy
and configured selection policy
+ *
+ * @return a completable future of the jobs list
+ */
+ public CompletableFuture<NodeDecommissionResponse> nodeDecommission()
+ {
+ return
executor.executeRequestAsync(requestBuilder().nodeDecommissionRequest().build());
+ }
Review Comment:
Given decommission is per node, the client should want to talk to a specific
instance, i.e. taking `SidecarInstance` as parameter.
Please take
`org.apache.cassandra.sidecar.client.SidecarClient#operationalJobs` for
example.
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +214,38 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.forceKeyspaceCleanup(concurrency, keyspace, table);
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isDecommissioning()
+ {
+ StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+ String nodeOperationMode = ssProxy.getOperationMode();
+ // The following check is equivalent to the operation-mode checks
within Cassandra to determine if a
+ // decommission operation is in progress
+ return nodeOperationMode.equals("LEAVING") ||
nodeOperationMode.equals("DECOMMISSION_FAILED");
Review Comment:
Cassandra code use this condition to check if the node is decommissioning.
Why do we differ from the implementation?
```java
public boolean isDecommissioning()
{
return operationMode() == LEAVING;
}
```
##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -95,12 +94,18 @@ public boolean isStale(long referenceTimestampInMillis,
long ttlInMillis)
return referenceTimestampInMillis - createdAt > ttlInMillis;
}
+ /**
+ * The concrete-job-specific implementation to determine if the job is
running on the Cassandra node.
+ * @return true if the job is running on the Cassandra node. For example,
node decommission is tracked by the
+ * operationMode exposed from Cassandra.
+ */
+ public abstract boolean isRunningOnCassandra();
+
+
Review Comment:
nit: usually keep just one empty line after the end of method
##########
server/src/main/java/org/apache/cassandra/sidecar/job/DecommissionJob.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission
operation.
+ */
+public class DecommissionJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DecommissionJob.class);
+ private static final String operation = "decommission";
+ private final boolean isForce;
+ protected StorageOperations storageOperations;
+
+ public DecommissionJob(UUID jobId, StorageOperations storageOps, boolean
isForce)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ this.isForce = isForce;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ return storageOperations.isDecommissioning();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+
+ protected OperationalJobStatus executeInternal() throws
OperationalJobException
+ {
+ LOGGER.info("Executing decommission operation. jobId={}", jobId);
+ return storageOperations.decommission(isForce);
+ }
+
+ @Override
+ public void close()
+ {
+ super.close();
+ }
Review Comment:
seems redundant, if there is no concrete close action.
##########
server/src/main/java/org/apache/cassandra/sidecar/job/DecommissionJob.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission
operation.
+ */
+public class DecommissionJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DecommissionJob.class);
+ private static final String operation = "decommission";
+ private final boolean isForce;
+ protected StorageOperations storageOperations;
+
+ public DecommissionJob(UUID jobId, StorageOperations storageOps, boolean
isForce)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ this.isForce = isForce;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ return storageOperations.isDecommissioning();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+
+ protected OperationalJobStatus executeInternal() throws
OperationalJobException
+ {
+ LOGGER.info("Executing decommission operation. jobId={}", jobId);
+ return storageOperations.decommission(isForce);
Review Comment:
The method signature defines that the thrown exception is
`OperationalJobException`. There is no catch block to convert the exceptions to
`OperationalJobException`.
##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -193,11 +199,41 @@ public void execute(Promise<Void> promise)
{
isExecuting = true;
LOGGER.info("Executing job. jobId={}", jobId);
- promise.future().onComplete(executionPromise);
+
+ promise.future().onComplete(res -> {
+ if (res.succeeded())
+ {
+ try
+ {
+ OperationalJobStatus result = executeInternal();
+ executionPromise.tryComplete(result);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Complete job execution. jobId={}
status={}", jobId, status());
+ }
+ }
+ catch (Throwable e)
+ {
+ executionPromise.tryFail(e);
+ }
+ }
+ else
+ {
+ Throwable cause = res.cause();
+ executionPromise.tryFail(cause);
+ }
+ });
+
+
+
+
+
+
try
{
// Blocking call to perform concrete job-specific execution,
returning the status
- executeInternal();
+ OperationalJobStatus result = executeInternal();
+ executionPromise.tryComplete(result);
Review Comment:
The change does not look correct.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/NodeDecommissionResponse.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+
+/**
+ * Response structure of the jobs status API
+ */
+public class NodeDecommissionResponse
+{
+ private final UUID jobId;
+ private final OperationalJobStatus status;
+
+ private final String instance;
+
+ private final String reason;
Review Comment:
nit: remove empty lines
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java:
##########
@@ -36,7 +36,8 @@ public static OperationalJobException wraps(Throwable
throwable)
}
else
{
- return new OperationalJobException(throwable.getMessage(),
throwable);
+ String msg = throwable.getCause() != null ?
throwable.getCause().getMessage() : throwable.getMessage();
+ return new OperationalJobException(msg, throwable);
Review Comment:
Would it look odd to use the message of the throwable's cause, but set the
throwable as the cause of `OperationalJobException`? The message of the cause
of throwable is going to repeat in the stack trace.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java:
##########
@@ -126,6 +126,7 @@ public final class ApiEndpointsV1
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 +
CASSANDRA + OPERATIONAL_JOBS;
public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA +
PER_OPERATIONAL_JOB;
+ public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA +
"/node/decommission";
Review Comment:
I am trying to understand the role the url segment `/node` plays here. It
seems as clear as `/api/v1/cassandra/decommission`, but this is shorter w/o the
`/node` segment.
Or, if we want to follow the `resource/identifier` structure strictly,
`/operations` is a better placement for `/node`, i.e.
`/api/v1/cassandra/operations/decommission`, where `decommission` is viewed as
an identifier of the operation.
##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java:
##########
@@ -76,22 +81,33 @@ public OperationalJob getJobIfExists(UUID jobId)
* The job execution failure behavior is tracked within the {@link
OperationalJob}.
*
* @param job OperationalJob instance to submit
- * @return OperationalJob instance that is submitted
* @throws OperationalJobConflictException when the same operational job
is already running on Cassandra
*/
- public OperationalJob trySubmitJob(OperationalJob job) throws
OperationalJobConflictException
+ public void trySubmitJob(OperationalJob job) throws
OperationalJobConflictException
{
checkConflict(job);
// New job is submitted for all cases when we do not have a
corresponding downstream job
- return jobTracker.computeIfAbsent(job.jobId, jobId -> job);
+ jobTracker.computeIfAbsent(job.jobId, jobId -> {
+ internalExecutorPool.executeBlocking(() -> {
+ job.execute(Promise.promise());
+ return null;
+ });
+ return job;
+ });
Review Comment:
Although `executeBlocking` with promise is deprecated, let's still use it
for the succinct code it produces here. We can migrate away from the deprecated
method all together in future refactoring.
```suggestion
jobTracker.computeIfAbsent(job.jobId, jobId -> {
internalExecutorPool.executeBlocking(job::execute);
return job;
});
```
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+ private boolean isForce;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+ OperationalJob job = new DecommissionJob(UUIDs.timeBased(),
operations, isForce);
+ jobManager.trySubmitJob(job);
+
+ job.asyncResult(executorPools.service(),
+
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(),
ChronoUnit.MILLIS))
+ .onSuccess(f -> sendResponse(context, job, request, remoteAddress,
host))
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ private void sendResponse(RoutingContext context, OperationalJob job, Void
request, SocketAddress remoteAddress, String host)
+ {
+ OperationalJobStatus jobStatus = job.status();
+ logger.info("Job completion status={}, request={}, remoteAddress={},
instance={}",
Review Comment:
remove commas.
```suggestion
logger.info("Job completion status={} request={} remoteAddress={}
instance={}",
```
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+ private boolean isForce;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+ OperationalJob job = new DecommissionJob(UUIDs.timeBased(),
operations, isForce);
Review Comment:
`isForce` can be local variable, instead of being an instance variable.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/NodeDecommissionResponse.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+
+/**
+ * Response structure of the jobs status API
+ */
+public class NodeDecommissionResponse
Review Comment:
Is it a dup of
`org.apache.cassandra.sidecar.common.response.OperationalJobResponse`? Can we
only use `OperationalJobResponse`?
The response payload should be the same between the endpoint that triggers
an operation and the endpoint that queries the operation status.
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], SUCCEEDED, null);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster =
false)
+ void decommissionNodeWithFailure(VertxTestContext context,
+ ConfigurableCassandraTestContext
cassandraTestContext) throws InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ cassandraTestContext.configureAndStartCluster(builder -> {
+ builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+ });
+
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], FAILED,
DECOMMISSION_FAILED_MESSAGE);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(VertxTestContext context,
+ String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+
Review Comment:
nit: remove the empty line.
##########
server/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java:
##########
@@ -60,6 +60,7 @@ void testCqlSessionProviderWorksAsExpected(VertxTestContext
context, CassandraTe
throws Exception
{
UpgradeableCluster cluster = cassandraTestContext.cluster();
+
Review Comment:
nit: unrelated change
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.DecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+ private boolean isForce;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ isForce = parseBooleanQueryParam(context.request(), "force", false);
+
+ OperationalJob job = new DecommissionJob(UUIDs.timeBased(),
operations, isForce);
+ jobManager.trySubmitJob(job);
+
+ job.asyncResult(executorPools.service(),
+
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(),
ChronoUnit.MILLIS))
+ .onSuccess(f -> sendResponse(context, job, request, remoteAddress,
host))
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ private void sendResponse(RoutingContext context, OperationalJob job, Void
request, SocketAddress remoteAddress, String host)
+ {
+ OperationalJobStatus jobStatus = job.status();
+ logger.info("Job completion status={}, request={}, remoteAddress={},
instance={}",
+ jobStatus, request, remoteAddress, host);
+
+ String reason = null;
+ switch(jobStatus)
+ {
+ case SUCCEEDED:
+ context.response().setStatusCode(HttpResponseStatus.OK.code());
+ break;
+ case FAILED:
+ reason = job.asyncResult().cause().getMessage();
+
context.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+ break;
Review Comment:
Should the status of failed job be `OK` too?
It is the job that fails. The result should be indicated in the response
payload. The http request is successful, i.e. 200.
There is almost the same logic in
`org.apache.cassandra.sidecar.routes.OperationalJobHandler`
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], SUCCEEDED, null);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster =
false)
+ void decommissionNodeWithFailure(VertxTestContext context,
+ ConfigurableCassandraTestContext
cassandraTestContext) throws InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ cassandraTestContext.configureAndStartCluster(builder -> {
+ builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+ });
+
+ final String[] jobId = new String[1];
Review Comment:
AtomicReference<String>?
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], SUCCEEDED, null);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster =
false)
+ void decommissionNodeWithFailure(VertxTestContext context,
+ ConfigurableCassandraTestContext
cassandraTestContext) throws InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ cassandraTestContext.configureAndStartCluster(builder -> {
+ builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+ });
+
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], FAILED,
DECOMMISSION_FAILED_MESSAGE);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(VertxTestContext context,
+ String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+
+ int attempts = 10;
+ String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+ AtomicBoolean stateReached = new AtomicBoolean(false);
+ logger.info("Job Stats Attempt:" + attempts);
Review Comment:
use placeholder `{}` for logging messages
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeDecommissionResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.BootstrapBBUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ private static final String DECOMMISSION_FAILED_MESSAGE = "Failed to
decommission node";
+
+ @CassandraIntegrationTest(nodesPerDc = 5)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], SUCCEEDED, null);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster =
false)
+ void decommissionNodeWithFailure(VertxTestContext context,
+ ConfigurableCassandraTestContext
cassandraTestContext) throws InterruptedException
+ {
+ BBHelperDecommissionNode.reset();
+ cassandraTestContext.configureAndStartCluster(builder -> {
+ builder.withInstanceInitializer(BBHelperDecommissionNode::install);
+ });
+
+ final String[] jobId = new String[1];
+ String testRoute = "/api/v1/cassandra/node/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ NodeDecommissionResponse
decommissionResponse = response.bodyAsJson(NodeDecommissionResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(context, jobId[0], FAILED,
DECOMMISSION_FAILED_MESSAGE);
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(VertxTestContext context,
+ String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+
+ int attempts = 10;
+ String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+ AtomicBoolean stateReached = new AtomicBoolean(false);
+ logger.info("Job Stats Attempt:" + attempts);
+ final int[] counter = {0};
+ int finalAttempts = attempts;
+ vertx.setPeriodic(10000, id -> {
+ counter[0]++;
+ testWithClient(false, client -> client.get(server.actualPort(),
"127.0.0.1", status)
+
.send(context.succeeding(resp -> {
+ if (resp.statusCode() ==
HttpResponseStatus.OK.code())
+ {
+
stateReached.set(true);
+ logger.info("Success
Status Response code:" + resp.statusCode());
+ logger.info("Status
Response:" + resp.bodyAsString());
+
OperationalJobResponse jobStatusResp =
resp.bodyAsJson(OperationalJobResponse.class);
+
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+
assertThat(jobStatusResp.status()).isEqualTo(expectedStatus);
+
assertThat(jobStatusResp.reason()).isEqualTo(expectedReason);
+
assertThat(jobStatusResp.operation()).isEqualTo("decommission");
+ }
+ else
+ {
+
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
+ logger.info("Status
Response code:" + resp.statusCode());
+ logger.info("Status
Response:" + resp.bodyAsString());
+
OperationalJobResponse jobStatusResp =
resp.bodyAsJson(OperationalJobResponse.class);
+
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+ //
assertThat(jobStatusResp).isNull();
+ }
+ logger.info("Request
completed");
+ })));
+ if (stateReached.get() || counter[0] == finalAttempts)
+ {
+ vertx.cancelTimer(id);
+ assertThat(stateReached.get()).isTrue();
+ context.completeNow();
+ }
+ });
Review Comment:
This looks odd.
`WebClient` sends requests asyncly. `testWithClient` returns immediately. It
checks the condition `stateReached.get() || counter[0] == finalAttempts`
prematurely.
I would simplify the method to this with blocking client to make it easier
to script the repeated check logic. Note that `VertxTestContext` is no longer
needed when using the client blockingly.
```java
private void pollStatusForState(String uuid,
OperationalJobStatus expectedStatus,
String expectedReason)
{
int attempts = 10;
String status = "/api/v1/cassandra/operational-jobs/" + uuid;
AtomicBoolean stateReached = new AtomicBoolean(false);
logger.info("Job Stats Attempt: {}", attempts);
AtomicInteger counter = new AtomicInteger(0);
loopAssert(30, () -> {
counter.incrementAndGet();
// todo: create a helper method in the base class to get
response in the blocking manner
HttpResponse<Buffer> resp = client.get(server.actualPort(),
"127.0.0.1", status)
.send().toCompletionStage().toCompletableFuture().get();
logger.info("Success Status Response code: {}",
resp.statusCode());
logger.info("Status Response: {}", resp.bodyAsString());
if (resp.statusCode() == HttpResponseStatus.OK.code())
{
stateReached.set(true);
OperationalJobResponse jobStatusResp =
resp.bodyAsJson(OperationalJobResponse.class);
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
assertThat(jobStatusResp.status()).isEqualTo(expectedStatus);
assertThat(jobStatusResp.reason()).isEqualTo(expectedReason);
assertThat(jobStatusResp.operation()).isEqualTo("decommission");
}
else
{
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
OperationalJobResponse jobStatusResp =
resp.bodyAsJson(OperationalJobResponse.class);
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
// assertThat(jobStatusResp).isNull();
}
logger.info("Request completed");
assertThat(stateReached.get()).isTrue();
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]