yifan-c commented on code in PR #144:
URL: https://github.com/apache/cassandra-sidecar/pull/144#discussion_r1913678915
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java:
##########
@@ -126,6 +126,7 @@ public final class ApiEndpointsV1
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 +
CASSANDRA + OPERATIONAL_JOBS;
public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA +
PER_OPERATIONAL_JOB;
+ public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA +
"/operations/decommission";
Review Comment:
nit: move to line #128. Basically, remove the empty line before this
statement and add the empty line after it.
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java:
##########
@@ -36,7 +36,8 @@ public static OperationalJobException wraps(Throwable
throwable)
}
else
{
- return new OperationalJobException(throwable.getMessage(),
throwable);
+ Throwable failingCause = throwable.getCause() != null ?
throwable.getCause() : throwable;
+ return new OperationalJobException(failingCause.getMessage(),
failingCause);
Review Comment:
It is still unclear about trimming here. Conservatively thinking, retaining
more information is usually preferred. Can you explain why skipping `throwable`
and using its cause?
##########
server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission
operation.
+ */
+public class NodeDecommissionJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionJob.class);
+ private static final String operation = "decommission";
+ private final boolean isForce;
+ protected StorageOperations storageOperations;
+
+ public NodeDecommissionJob(UUID jobId, StorageOperations storageOps,
boolean isForce)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ this.isForce = isForce;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ String operationMode = storageOperations.getOperationMode();
+ return operationMode.equals("LEAVING") ||
operationMode.equals("DECOMMISSIONED");
+
Review Comment:
nit: remove empty line
##########
server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission
operation.
+ */
+public class NodeDecommissionJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionJob.class);
+ private static final String operation = "decommission";
+ private final boolean isForce;
+ protected StorageOperations storageOperations;
+
+ public NodeDecommissionJob(UUID jobId, StorageOperations storageOps,
boolean isForce)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ this.isForce = isForce;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ String operationMode = storageOperations.getOperationMode();
+ return operationMode.equals("LEAVING") ||
operationMode.equals("DECOMMISSIONED");
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OperationalJobStatus status()
+ {
+ String operationMode = storageOperations.getOperationMode();
+
+ if (operationMode.equals("LEAVING"))
+ {
+ return OperationalJobStatus.RUNNING;
+ }
+ else if (operationMode.equals("DECOMMISSIONED"))
+ {
+ return OperationalJobStatus.SUCCEEDED;
+ }
+ else
+ {
+ return super.status();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+
+ protected void executeInternal()
+ {
+ if (isRunningOnCassandra())
+ {
+ LOGGER.info("Not executing job as an ongoing or completed
decommission operation was found jobId={}", jobId);
Review Comment:
how about retaining the `operationMode` value read during calling
`isRunningOnCassandra()` (as an instance variable) and log it here?
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java:
##########
@@ -102,4 +102,16 @@ default void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tab
{
outOfRangeDataCleanup(keyspace, table, 1);
}
+
+ /**
+ * @return the operation-mode of the Cassandra instance
+ */
+ String getOperationMode();
Review Comment:
not necessary for now. we could consider cache the operationMode in the
future if it is queried a lot.
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ boolean isForce = parseBooleanQueryParam(context.request(), "force",
false);
+
+ OperationalJob job = new NodeDecommissionJob(UUIDs.timeBased(),
operations, isForce);
+ try
+ {
+ jobManager.trySubmitJob(job);
+ }
+ catch (OperationalJobConflictException oje)
+ {
+ logger.error("Conflicting job encountered. reason={}",
oje.getMessage());
+ context.fail(wrapHttpException(HttpResponseStatus.CONFLICT,
oje.getMessage(), oje.getCause()));
+ }
+
+ // Get the result, waiting for the specified wait time for result
+ job.asyncResult(executorPools.service(),
+
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(),
ChronoUnit.MILLIS))
+ .onComplete(v -> sendResponse(context, job, request, remoteAddress,
host));
+ }
+
+ private void sendResponse(RoutingContext context, OperationalJob job, Void
request, SocketAddress remoteAddress, String host)
Review Comment:
What prevents to reusing the code in
`org.apache.cassandra.sidecar.routes.OperationalJobHandler`?
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -213,4 +213,34 @@ public void outOfRangeDataCleanup(@NotNull String
keyspace, @NotNull String tabl
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.forceKeyspaceCleanup(concurrency, keyspace, table);
}
+
+// /**
+// * {@inheritDoc}
+// */
+// @Override
+// public boolean isDecommissioning()
+// {
+// StorageJmxOperations ssProxy =
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+// String nodeOperationMode = ssProxy.getOperationMode();
+// // The following check is equivalent to the operation-mode checks
within Cassandra to determine if a
+// // decommission operation is in progress
+// return nodeOperationMode.equals("LEAVING");
+// }
Review Comment:
Is it to be removed?
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ final String[] jobId = new String[1];
+ String testRoute =
"/api/v1/cassandra/operations/decommission?force=true";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(jobId[0], SUCCEEDED, null);
+ context.completeNow();
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeWithFailure(VertxTestContext context) throws
InterruptedException
+ {
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ logger.info("Response Body:" +
response.bodyAsString());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(FAILED);
+
assertThat(decommissionResponse.jobId()).isNotNull();
+ String jobId =
String.valueOf(decommissionResponse.jobId());
+ assertThat(jobId).isNotNull();
+ context.completeNow();
+ })));
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+ int attempts = 10;
+ String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+ AtomicBoolean stateReached = new AtomicBoolean(false);
+ logger.info("Job Stats Attempt: {}", attempts);
Review Comment:
maybe you want to place it in the `loopAssert`? Right now, it only logs once
with the `attempts` value 10
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ final String[] jobId = new String[1];
+ String testRoute =
"/api/v1/cassandra/operations/decommission?force=true";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(jobId[0], SUCCEEDED, null);
+ context.completeNow();
+ context.awaitCompletion(2, TimeUnit.MINUTES);
Review Comment:
`context.awaitCompletion` is no longer necessary since it is blocking style.
##########
server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDecommissionHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionHandlerTest.class);
+ Vertx vertx;
+ Server server;
+
+ @Mock
+ static
+ StorageOperations mockStorageOperations;
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ MockitoAnnotations.openMocks(this);
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
NodeDecommissionHandlerTest.NodeDecommissionTestModule());
+ //mockJobManager));
+ injector = Guice.createInjector(Modules.override(new MainModule())
+ .with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testDecommissionLongRunning(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+ .when(mockStorageOperations).decommission(anyBoolean());
+
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_ACCEPTED)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+ OperationalJobResponse decommissionResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(decommissionResponse).isNotNull();
+ assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionCompleted(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Decommission Response: {}",
response.bodyAsString());
+
+ OperationalJobResponse decommissionResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(decommissionResponse).isNotNull();
+
assertThat(decommissionResponse.status()).isEqualTo(SUCCEEDED);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionFailed(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ doThrow(new RuntimeException("Simulated
failure")).when(mockStorageOperations).decommission(anyBoolean());
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionConflict(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("LEAVING");
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_CONFLICT)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
Review Comment:
nit: add the assertion for the returned jobId, so that client can use this
jobId to track the existing inflight job.
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ boolean isForce = parseBooleanQueryParam(context.request(), "force",
false);
+
+ OperationalJob job = new NodeDecommissionJob(UUIDs.timeBased(),
operations, isForce);
+ try
+ {
+ jobManager.trySubmitJob(job);
+ }
+ catch (OperationalJobConflictException oje)
+ {
+ logger.error("Conflicting job encountered. reason={}",
oje.getMessage());
+ context.fail(wrapHttpException(HttpResponseStatus.CONFLICT,
oje.getMessage(), oje.getCause()));
+ }
Review Comment:
Can you include the jobId of the conflicting job in the response json
payload?
##########
server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDecommissionHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionHandlerTest.class);
+ Vertx vertx;
+ Server server;
+
+ @Mock
+ static
+ StorageOperations mockStorageOperations;
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ MockitoAnnotations.openMocks(this);
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
NodeDecommissionHandlerTest.NodeDecommissionTestModule());
+ //mockJobManager));
+ injector = Guice.createInjector(Modules.override(new MainModule())
+ .with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testDecommissionLongRunning(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+ .when(mockStorageOperations).decommission(anyBoolean());
+
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_ACCEPTED)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+ OperationalJobResponse decommissionResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(decommissionResponse).isNotNull();
+ assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionCompleted(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Decommission Response: {}",
response.bodyAsString());
+
+ OperationalJobResponse decommissionResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(decommissionResponse).isNotNull();
+
assertThat(decommissionResponse.status()).isEqualTo(SUCCEEDED);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionFailed(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("NORMAL");
+ doThrow(new RuntimeException("Simulated
failure")).when(mockStorageOperations).decommission(anyBoolean());
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDecommissionConflict(VertxTestContext context)
+ {
+ when(mockStorageOperations.getOperationMode()).thenReturn("LEAVING");
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ client.put(server.actualPort(), "127.0.0.1", testRoute)
+ .expect(ResponsePredicate.SC_CONFLICT)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
+ context.completeNow();
+ }));
+ }
+
+ /**
+ * Test guice module for Node Decommission handler tests
+ */
+ static class NodeDecommissionTestModule extends AbstractModule
+ {
+
+ public NodeDecommissionTestModule()
+ {
+ MockitoAnnotations.openMocks(this);
Review Comment:
This openMocks has no effect. Can you remove it?
##########
server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDecommissionHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionHandlerTest.class);
+ Vertx vertx;
+ Server server;
+
+ @Mock
+ static
Review Comment:
remove `static`. It is updated by each test case.
##########
server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission
operation.
+ */
+public class NodeDecommissionJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionJob.class);
+ private static final String operation = "decommission";
+ private final boolean isForce;
+ protected StorageOperations storageOperations;
+
+ public NodeDecommissionJob(UUID jobId, StorageOperations storageOps,
boolean isForce)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ this.isForce = isForce;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ String operationMode = storageOperations.getOperationMode();
+ return operationMode.equals("LEAVING") ||
operationMode.equals("DECOMMISSIONED");
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OperationalJobStatus status()
+ {
+ String operationMode = storageOperations.getOperationMode();
+
+ if (operationMode.equals("LEAVING"))
+ {
+ return OperationalJobStatus.RUNNING;
+ }
+ else if (operationMode.equals("DECOMMISSIONED"))
+ {
+ return OperationalJobStatus.SUCCEEDED;
+ }
+ else
+ {
+ return super.status();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+
Review Comment:
remove empty line
##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -179,10 +183,8 @@ public Future<Void> asyncResult(TaskExecutorPool
executorPool, Duration waitTime
/**
* OperationalJob body. The implementation is executed in a blocking
manner.
- *
- * @throws OperationalJobException OperationalJobException that wraps job
failure
*/
- protected abstract void executeInternal() throws OperationalJobException;
+ protected abstract void executeInternal();
Review Comment:
👍 the `throws` is unnecessary. It is handled by `execute()` already.
##########
server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Void>
+{
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration
serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+
+ protected void handleInternal(RoutingContext context, HttpServerRequest
httpRequest, String host, SocketAddress remoteAddress, Void request)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ boolean isForce = parseBooleanQueryParam(context.request(), "force",
false);
+
+ OperationalJob job = new NodeDecommissionJob(UUIDs.timeBased(),
operations, isForce);
+ try
+ {
+ jobManager.trySubmitJob(job);
+ }
+ catch (OperationalJobConflictException oje)
+ {
+ logger.error("Conflicting job encountered. reason={}",
oje.getMessage());
+ context.fail(wrapHttpException(HttpResponseStatus.CONFLICT,
oje.getMessage(), oje.getCause()));
+ }
+
+ // Get the result, waiting for the specified wait time for result
+ job.asyncResult(executorPools.service(),
+
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(),
ChronoUnit.MILLIS))
+ .onComplete(v -> sendResponse(context, job, request, remoteAddress,
host));
Review Comment:
align with the the dot at `job.asyncResult...`
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ final String[] jobId = new String[1];
+ String testRoute =
"/api/v1/cassandra/operations/decommission?force=true";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
Review Comment:
nit: sleep is unnecessary since `pollStatusForState` retries.
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeDefault(VertxTestContext context) throws
InterruptedException
+ {
+ final String[] jobId = new String[1];
+ String testRoute =
"/api/v1/cassandra/operations/decommission?force=true";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+ jobId[0] =
String.valueOf(decommissionResponse.jobId());
+ })));
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ pollStatusForState(jobId[0], SUCCEEDED, null);
+ context.completeNow();
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 2)
+ void decommissionNodeWithFailure(VertxTestContext context) throws
InterruptedException
+ {
+ String testRoute = "/api/v1/cassandra/operations/decommission";
+ testWithClient(client -> client.put(server.actualPort(), "127.0.0.1",
testRoute)
+ .send(context.succeeding(response -> {
+ logger.info("Response Status:" +
response.statusCode());
+ logger.info("Response Body:" +
response.bodyAsString());
+ OperationalJobResponse
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+
assertThat(decommissionResponse.status()).isEqualTo(FAILED);
+
assertThat(decommissionResponse.jobId()).isNotNull();
+ String jobId =
String.valueOf(decommissionResponse.jobId());
+ assertThat(jobId).isNotNull();
+ context.completeNow();
+ })));
+ context.awaitCompletion(2, TimeUnit.MINUTES);
+ }
+
+ private void pollStatusForState(String uuid,
+ OperationalJobStatus expectedStatus,
+ String expectedReason)
+ {
+ int attempts = 10;
+ String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+ AtomicBoolean stateReached = new AtomicBoolean(false);
+ logger.info("Job Stats Attempt: {}", attempts);
+ AtomicInteger counter = new AtomicInteger(0);
+ loopAssert(30, () -> {
+ counter.incrementAndGet();
+ // todo: create a helper method in the base class to get response
in the blocking manner
Review Comment:
Are you going to address this todo?
##########
server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDecommissionHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeDecommissionHandlerTest.class);
+ Vertx vertx;
+ Server server;
+
+ @Mock
+ static
+ StorageOperations mockStorageOperations;
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ MockitoAnnotations.openMocks(this);
Review Comment:
The test code is responsible for closing the opened mocks when completing.
Since there is only a single field (i.e. mockStorageOperations) gets mock,
consider mocking it the simple way?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]