This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d4ad5d3 CASSSIDECAR-344: Sidecar endpoint for moving a node to a new
token (#275)
8d4ad5d3 is described below
commit 8d4ad5d3024fd8e95056d03681fc1447a6fd8381
Author: Sudipta <[email protected]>
AuthorDate: Fri Jan 9 11:40:10 2026 -0800
CASSSIDECAR-344: Sidecar endpoint for moving a node to a new token (#275)
Patch by Sudipta Laha; reviewed by Francisco Guerrero, Saranya Krishnakumar
for CASSSIDECAR-344
---
CHANGES.txt | 1 +
.../adapters/base/CassandraStorageOperations.java | 10 +
.../jmx/GossipDependentStorageJmxOperations.java | 6 +
.../adapters/base/jmx/StorageJmxOperations.java | 7 +
.../sidecar/adapters/base/utils/DataTypeUtils.java | 14 +-
.../cassandra/sidecar/common/ApiEndpointsV1.java | 1 +
.../sidecar/common/request/NodeMoveRequest.java | 61 +++
.../request/data/NodeMoveRequestPayload.java | 63 ++++
.../request/data/NodeMoveRequestPayloadTest.java | 52 +++
.../cassandra/sidecar/client/RequestContext.java | 13 +
.../cassandra/sidecar/client/SidecarClient.java | 15 +
.../sidecar/client/SidecarClientTest.java | 25 ++
.../CassandraNodeOperationsIntegrationTest.java | 181 ++++++++-
.../sidecar/common/server/StorageOperations.java | 7 +
.../acl/authorization/BasicPermissions.java | 1 +
.../sidecar/handlers/NodeMoveHandler.java | 146 ++++++++
.../apache/cassandra/sidecar/job/NodeMoveJob.java | 71 ++++
.../cassandra/sidecar/job/OperationalJob.java | 2 +-
.../sidecar/modules/CassandraOperationsModule.java | 28 ++
.../modules/multibindings/VertxRouteMapKeys.java | 5 +
.../sidecar/handlers/NodeMoveHandlerTest.java | 412 +++++++++++++++++++++
.../cassandra/sidecar/job/NodeMoveJobTest.java | 162 ++++++++
22 files changed, 1267 insertions(+), 16 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 46ffa359..2c8d7ebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344)
* Returning JSON responses for live migration status endpoints in case of
errors (CASSSIDECAR-395)
* Upgrade vertx to 4.5.23 (CASSSIDECAR-391)
* Fix for deadlock during JMX reconnection (CASSSIDECAR-390)
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 3ce3bf2e..56dfca31 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -344,4 +344,14 @@ public class CassandraStorageOperations implements
StorageOperations
return jmxClient.proxy(StorageJmxOperations.class,
STORAGE_SERVICE_OBJ_NAME)
.getCompactionThroughputMbPerSec();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void move(String newToken) throws IOException
+ {
+ jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+ .move(newToken);
+ }
}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
index fb753237..ae7f561d 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
@@ -225,4 +225,10 @@ public class GossipDependentStorageJmxOperations
implements StorageJmxOperations
{
return delegate.getCompactionThroughputMbPerSec();
}
+
+ @Override
+ public void move(String newToken) throws IOException
+ {
+ delegate.move(newToken);
+ }
}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
index 11e838c4..ced40624 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
@@ -233,4 +233,11 @@ public interface StorageJmxOperations
* @return the current compaction throughput in megabytes per second, or 0
if throughput cannot be determined
*/
int getCompactionThroughputMbPerSec();
+
+ /**
+ * Triggers the node move operation to move this node to a new token
+ *
+ * @param newToken the new token for the node to move to
+ */
+ void move(String newToken) throws IOException;
}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java
index e5dac604..edd777e8 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/utils/DataTypeUtils.java
@@ -62,20 +62,20 @@ public class DataTypeUtils
* This method performs a runtime type check before casting to prevent
ClassCastException
* and provides meaningful error messages when the cast fails.
*
- * @param value the object to be cast
- * @param expectedType the expected type to cast to
+ * @param value the object to be cast
+ * @param expectedType the expected type to cast to
* @param contextDescription descriptive context for error messages (e.g.,
"keyspace name", "table data")
- * @param <T> the target type
+ * @param <T> the target type
* @return the cast object of type T
* @throws IllegalStateException if the value is not an instance of the
expected type,
- * with a descriptive message indicating what was expected vs what
was received
+ * with a descriptive message indicating
what was expected vs what was received
*/
public static <T> T safeCast(Object value, Class<T> expectedType, String
contextDescription)
{
if (!expectedType.isInstance(value))
{
throw new ClassCastException("Expected " +
expectedType.getSimpleName() + " for " + contextDescription + " but got: " +
- (value == null ? "null" :
value.getClass().getSimpleName()));
+ (value == null ? "null" :
value.getClass().getSimpleName()));
}
return expectedType.cast(value);
}
@@ -84,11 +84,11 @@ public class DataTypeUtils
* Safely parses a string to a long with descriptive error handling.
* This method handles null values and provides meaningful error messages
when parsing fails.
*
- * @param value the string value to be parsed
+ * @param value the string value to be parsed
* @param contextDescription descriptive context for error messages (e.g.,
"completed bytes", "total bytes")
* @return the parsed long value
* @throws IllegalStateException if the value cannot be parsed as a long,
- * with a descriptive message indicating what failed to parse
+ * with a descriptive message indicating
what failed to parse
*/
public static long safeParseLong(String value, String contextDescription)
{
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index f9911b63..8af3b966 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -144,6 +144,7 @@ public final class ApiEndpointsV1
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 +
CASSANDRA + OPERATIONAL_JOBS;
public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA +
PER_OPERATIONAL_JOB;
public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA +
"/operations/decommission";
+ public static final String NODE_MOVE_ROUTE = API_V1 + CASSANDRA +
"/operations/move";
public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA +
"/operations/drain";
public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA +
"/stats/streams";
public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA +
PER_KEYSPACE + PER_TABLE + "/stats";
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java
new file mode 100644
index 00000000..2ea8b325
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeMoveRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+
+/**
+ * Represents a request to execute node move operation
+ */
+public class NodeMoveRequest extends JsonRequest<OperationalJobResponse>
+{
+ private final NodeMoveRequestPayload payload;
+
+ /**
+ * Constructs a request to execute a node move operation
+ *
+ * @param newToken the new token for the node to move to
+ */
+ public NodeMoveRequest(String newToken)
+ {
+ super(ApiEndpointsV1.NODE_MOVE_ROUTE);
+ this.payload = new NodeMoveRequestPayload(newToken);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.PUT;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object requestBody()
+ {
+ return payload;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java
new file mode 100644
index 00000000..3254c149
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Request payload for node move operations.
+ *
+ * <p>Valid JSON:</p>
+ * <pre>
+ * { "newToken": "123456789" }
+ * </pre>
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class NodeMoveRequestPayload
+{
+ private final String newToken;
+
+ /**
+ * @param newToken the new token for the node to move to
+ */
+ @JsonCreator
+ public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required =
true) String newToken)
+ {
+ this.newToken = Objects.requireNonNull(newToken, "newToken is
required");
+ }
+
+ /**
+ * @return the new token for the node to move to
+ */
+ @JsonProperty("newToken")
+ public String newToken()
+ {
+ return newToken;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NodeMoveRequestPayload{newToken='" + newToken + "'}";
+ }
+}
diff --git
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java
new file mode 100644
index 00000000..8cfa07d3
--- /dev/null
+++
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayloadTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link NodeMoveRequestPayload}
+ */
+public class NodeMoveRequestPayloadTest
+{
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Test
+ void testJsonSerialization() throws JsonProcessingException
+ {
+ NodeMoveRequestPayload payload = new
NodeMoveRequestPayload("123456789");
+ String json = objectMapper.writeValueAsString(payload);
+ assertThat(json).contains("\"newToken\":\"123456789\"");
+
+ NodeMoveRequestPayload deserialized = objectMapper.readValue(json,
NodeMoveRequestPayload.class);
+ assertThat(deserialized.newToken()).isEqualTo("123456789");
+ }
+
+ @Test
+ void testToString()
+ {
+ NodeMoveRequestPayload payload = new
NodeMoveRequestPayload("123456789");
+ assertThat(payload.toString()).contains("123456789");
+ }
+}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 966aa47d..c3442373 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -48,6 +48,7 @@ import
org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
import org.apache.cassandra.sidecar.common.request.NodeDrainRequest;
+import org.apache.cassandra.sidecar.common.request.NodeMoveRequest;
import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest;
@@ -610,6 +611,18 @@ public class RequestContext
return request(NODE_DRAIN_REQUEST);
}
+ /**
+ * Sets the {@code request} to be a {@link NodeMoveRequest} and
returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @param newToken the new token for the node to move to
+ * @return a reference to this Builder
+ */
+ public Builder nodeMoveRequest(@NotNull String newToken)
+ {
+ return request(new NodeMoveRequest(newToken));
+ }
+
/**
* Sets the {@code request} to be a {@link GossipUpdateRequest} for the
* given {@link NodeCommandRequestPayload.State state}, and returns a
reference to this Builder enabling method chaining.
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 04d0298c..dacf4ded 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -855,6 +855,21 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Executes the node move request using the default retry policy and
configured selection policy
+ *
+ * @param instance the instance where the request will be executed
+ * @param newToken the new token for the node to move to
+ * @return a completable future of the operational job response
+ */
+ public CompletableFuture<OperationalJobResponse> nodeMove(SidecarInstance
instance, String newToken)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+
.singleInstanceSelectionPolicy(instance)
+ .nodeMoveRequest(newToken)
+ .build());
+ }
+
/**
* Sends a request to start or stop Cassandra gossiping on the provided
instance.
* <p>
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 7cc117c3..032a937c 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -1468,6 +1468,31 @@ abstract class SidecarClientTest
validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE);
}
+ @Test
+ public void testNodeMove() throws Exception
+ {
+ UUID jobId = UUID.randomUUID();
+ String newToken = "123456789";
+ String nodeMoveString = "{\"jobId\":\"" + jobId +
"\",\"jobStatus\":\"SUCCEEDED\"}";
+
+ MockResponse response = new MockResponse()
+ .setResponseCode(OK.code())
+ .setHeader("content-type", "application/json")
+ .setBody(nodeMoveString);
+ enqueue(response);
+
+ SidecarInstanceImpl sidecarInstance =
RequestExecutorTest.newSidecarInstance(servers.get(0));
+ OperationalJobResponse result = client.nodeMove(sidecarInstance,
newToken).get(30, TimeUnit.SECONDS);
+ assertThat(result).isNotNull();
+ assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+ validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE, request -> {
+ // Verify that the request body contains the expected JSON payload
+ String requestBody = request.getBody().readUtf8();
+ assertThat(requestBody).contains("\"newToken\":\"" + newToken +
"\"");
+
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
+ });
+ }
+
@Test
void testFailsWithOneAttemptPerServer()
{
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
index 8557b451..5566342d 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
@@ -25,22 +25,36 @@ import org.junit.jupiter.api.Test;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.RingResponse;
+import org.apache.cassandra.sidecar.common.response.data.RingEntry;
import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Integration tests for Cassandra node drain operations
+ * Integration tests for Cassandra node operations
*/
public class CassandraNodeOperationsIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
{
public static final String CASSANDRA_VERSION_4_0 = "4.0";
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .dcCount(1)
+ .nodesPerDc(3)
+ .requestFeature(Feature.NETWORK);
+ }
+
@Override
protected void initializeSchemaForTest()
{
@@ -88,17 +102,160 @@ public class CassandraNodeOperationsIntegrationTest
extends SharedClusterSidecar
// Validate the operational job status using the OperationalJobHandler
String jobId = responseBody.getString("jobId");
- validateOperationalJobStatus(jobId, "drain");
+ validateOperationalJobStatus(jobId, "drain",
OperationalJobStatus.SUCCEEDED);
+ }
+
+
+ @Test
+ void testNodeMoveOperationSuccess()
+ {
+ // Use a test token - this is a valid token for Murmur3Partitioner
+ String testToken = "123456789";
+ String requestBody = "{\"newToken\":\"" + testToken + "\"}";
+
+ // Validate that the node owns a different token than testToken
+ String currentToken = getCurrentTokenForNode("localhost");
+ assertThat(currentToken).isNotEqualTo(testToken);
+
+ // Initiate move operation
+ HttpResponse<Buffer> moveResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.NODE_MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(Buffer.buffer(requestBody)));
+
+ assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code());
+
+ JsonObject responseBody = moveResponse.bodyAsJsonObject();
+ assertThat(responseBody).isNotNull();
+ assertThat(responseBody.getString("jobId")).isNotNull();
+ assertThat(responseBody.getString("operation")).isEqualTo("move");
+ assertThat(responseBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.CREATED.name(),
+ OperationalJobStatus.RUNNING.name(),
+ OperationalJobStatus.SUCCEEDED.name()
+ );
+
+ // Verify the job eventually completes (or at least gets processed)
+ loopAssert(30, 500, () -> {
+ HttpResponse<Buffer> streamStatsResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.STREAM_STATS_ROUTE)
+ .send());
+
+ assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject streamStats = streamStatsResponse.bodyAsJsonObject();
+ assertThat(streamStats).isNotNull();
+ // The operationMode should be either NORMAL (completed) or MOVING
(in progress)
+ assertThat(streamStats.getString("operationMode")).isIn("NORMAL",
"MOVING");
+ });
+
+ // Validate the operational job status using the OperationalJobHandler
+ String jobId = responseBody.getString("jobId");
+ validateOperationalJobStatus(jobId, "move",
OperationalJobStatus.SUCCEEDED);
+
+ // Validate that the node actually owns the new token
+ currentToken = getCurrentTokenForNode("localhost");
+ assertThat(currentToken).isEqualTo(testToken);
+ }
+
+ /**
+ * Tests the failure case of node move operation when attempting to move
to a token
+ * already owned by another node in the cluster.
+ * <p>
+ * This test validates that:
+ * - The system properly rejects invalid move operations that would create
token conflicts
+ * - The move operation fails with OperationalJobStatus.FAILED when
targeting an existing token
+ * - The original node retains its initial token after the failed move
attempt
+ * <p>
+ * Token conflicts must be prevented to maintain cluster integrity, as
having multiple
+ * nodes own the same token would break the consistent hashing ring and
cause data
+ * distribution issues.
+ */
+ @Test
+ void testNodeMoveOperationFailure()
+ {
+ // Get a token already owned by a node
+ String testToken = getCurrentTokenForNode("localhost2");
+ String requestBody = "{\"newToken\":\"" + testToken + "\"}";
+
+ // Validate that the node owns a different token than testToken
+ String initialToken = getCurrentTokenForNode("localhost");
+ assertThat(initialToken).isNotEqualTo(testToken);
+
+ // Initiate move operation
+ HttpResponse<Buffer> moveResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.NODE_MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(Buffer.buffer(requestBody)));
+
+ assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code());
+
+ JsonObject responseBody = moveResponse.bodyAsJsonObject();
+ assertThat(responseBody).isNotNull();
+ assertThat(responseBody.getString("jobId")).isNotNull();
+ assertThat(responseBody.getString("operation")).isEqualTo("move");
+ assertThat(responseBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.CREATED.name(),
+ OperationalJobStatus.RUNNING.name(),
+ OperationalJobStatus.FAILED.name()
+ );
+
+ // Verify the job eventually completes (or at least gets processed)
+ loopAssert(30, 500, () -> {
+ HttpResponse<Buffer> streamStatsResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.STREAM_STATS_ROUTE)
+ .send());
+
+ assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject streamStats = streamStatsResponse.bodyAsJsonObject();
+ assertThat(streamStats).isNotNull();
+ // The operationMode should be either NORMAL (completed) or MOVING
(in progress)
+ assertThat(streamStats.getString("operationMode")).isIn("NORMAL",
"MOVING");
+ });
+
+ // Validate the operational job status using the OperationalJobHandler
+ String jobId = responseBody.getString("jobId");
+ validateOperationalJobStatus(jobId, "move",
OperationalJobStatus.FAILED);
+
+ // Validate that the node didn't move
+ String currentToken = getCurrentTokenForNode("localhost");
+ assertThat(currentToken).isEqualTo(initialToken);
+ assertThat(currentToken).isNotEqualTo(testToken);
+ }
+
+ /**
+ * Gets the current token for the specified node by querying the ring
endpoint.
+ *
+ * @param node the node hostname to get the token for
+ * @return the token currently owned by the specified node
+ */
+ private String getCurrentTokenForNode(String node)
+ {
+ HttpResponse<Buffer> ringResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, node,
ApiEndpointsV1.RING_ROUTE)
+ .send());
+
+ assertThat(ringResponse.statusCode()).isEqualTo(OK.code());
+
+ RingResponse ring = ringResponse.bodyAsJson(RingResponse.class);
+ assertThat(ring).isNotNull();
+
+ RingEntry ringEntry = ring.stream()
+ .filter(entry -> entry.fqdn().equals(node))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Node
" + node + " not found in ring"));
+ return ringEntry.token();
}
/**
* Validates the operational job status by querying the
OperationalJobHandler endpoint
* and waiting for the job to reach a final state if necessary.
*
- * @param jobId the ID of the operational job to validate
+ * @param jobId the ID of the operational job to validate
* @param expectedOperation the expected operation name (e.g., "move",
"decommission", "drain")
*/
- private void validateOperationalJobStatus(String jobId, String
expectedOperation)
+ private void validateOperationalJobStatus(String jobId, String
expectedOperation, OperationalJobStatus expectedEndStatus)
{
String operationalJobRoute =
ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId);
@@ -112,10 +269,6 @@ public class CassandraNodeOperationsIntegrationTest
extends SharedClusterSidecar
assertThat(jobStatusBody).isNotNull();
assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);
- assertThat(jobStatusBody.getString("jobStatus")).isIn(
- OperationalJobStatus.RUNNING.name(),
- OperationalJobStatus.SUCCEEDED.name()
- );
// If the job is still running, wait for it to complete or reach a
final state
if
(OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus")))
@@ -135,6 +288,18 @@ public class CassandraNodeOperationsIntegrationTest
extends SharedClusterSidecar
);
});
}
+
+ jobStatusResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
operationalJobRoute)
+ .send());
+
+ assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code());
+
+ jobStatusBody = jobStatusResponse.bodyAsJsonObject();
+ assertThat(jobStatusBody).isNotNull();
+ assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
+
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);
+
assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedEndStatus.name());
}
/**
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index 021afa52..163a30c9 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -179,4 +179,11 @@ public interface StorageOperations
* @return the current compaction throughput in megabytes per second, or 0
if throughput cannot be determined
*/
int getCompactionThroughputMbPerSec();
+
+ /**
+ * Triggers the node move operation to move the node to a new token.
+ *
+ * @param newToken the new token for the node to move to
+ */
+ void move(String newToken) throws IOException;
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 2195122e..f25e4b34 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -65,6 +65,7 @@ public class BasicPermissions
// sidecar operation related permissions
public static final Permission READ_OPERATIONAL_JOB = new
DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
public static final Permission DECOMMISSION_NODE = new
DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);
+ public static final Permission MOVE_NODE = new
DomainAwarePermission("NODE:MOVE", OPERATION_SCOPE);
public static final Permission DRAIN_NODE = new
DomainAwarePermission("NODE:DRAIN", OPERATION_SCOPE);
// Permissions related to Schema Reporting
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java
new file mode 100644
index 00000000..c7679869
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.job.NodeMoveJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Provides REST API for asynchronously moving the corresponding Cassandra
node to a new token
+ */
+@Singleton
+public class NodeMoveHandler extends AbstractHandler<String> implements
AccessProtected
+{
+ /**
+ * Maximum allowed length for token strings in characters.
+ * <p>
+ * This limit is set conservatively to accommodate all valid token
formats, including
+ * the string representation of {@code Long.MIN_VALUE}
(-9223372036854775808, 20 characters)
+ * with substantial margin for other token representations or future
extensions.
+ */
+ private static final int MAX_TOKEN_LENGTH = 128;
+
+ private final OperationalJobManager jobManager;
+ private final ServiceConfiguration config;
+
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the interface to retrieve instance metadata
+ * @param executorPools the executor pools for blocking executions
+ * @param validator a validator instance to validate
Cassandra-specific input
+ */
+ @Inject
+ protected NodeMoveHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.jobManager = jobManager;
+ this.config = serviceConfiguration;
+ }
+
+ @Override
+ public Set<Authorization> requiredAuthorizations()
+ {
+ return
Collections.singleton(BasicPermissions.MOVE_NODE.toAuthorization());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ @NotNull String host,
+ SocketAddress remoteAddress,
+ String newToken)
+ {
+ StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
+ NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken,
operations);
+ this.jobManager.trySubmitJob(job,
+ (completedJob, exception) ->
+
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+ executorPools.service(),
+
config.operationalJobExecutionMaxWaitTime());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected String extractParamsOrThrow(RoutingContext context)
+ {
+ String body = context.body().asString();
+ if (body == null || body.equalsIgnoreCase("null"))
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Request
body must be JSON with a non-null \"newToken\" field");
+ }
+ try
+ {
+ NodeMoveRequestPayload payload = Json.decodeValue(body,
NodeMoveRequestPayload.class);
+ String newToken = payload.newToken();
+ if (StringUtils.isNullOrEmpty(newToken))
+ {
+ throw new IllegalArgumentException("newToken value cannot be
null or empty");
+ }
+
+ String trimmedToken = newToken.trim();
+ if (trimmedToken.length() >= MAX_TOKEN_LENGTH)
+ {
+ throw new IllegalArgumentException(
+ String.format("newToken value must be less than %d characters.
Provided value length=%d",
+ MAX_TOKEN_LENGTH, trimmedToken.length()));
+ }
+ return trimmedToken;
+ }
+ catch (DecodeException e)
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+ "Failed to parse NodeMoveRequestPayload
error=" + e.getMessage());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java
new file mode 100644
index 00000000..f03d6338
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node move operation.
+ */
+public class NodeMoveJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeMoveJob.class);
+ private static final String OPERATION = "move";
+ private static final String OPERATION_MODE_MOVING = "MOVING";
+ private final String newToken;
+ protected StorageOperations storageOperations;
+
+ public NodeMoveJob(UUID jobId, String newToken, StorageOperations
storageOps)
+ {
+ super(jobId);
+ this.newToken = newToken;
+ this.storageOperations = storageOps;
+ }
+
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ String operationMode = storageOperations.operationMode();
+ return OPERATION_MODE_MOVING.equals(operationMode);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void executeInternal() throws Exception
+ {
+ LOGGER.info("Executing move operation. jobId={} newToken={}",
this.jobId(), newToken);
+ storageOperations.move(newToken);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String name()
+ {
+ return OPERATION;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
index c4b6b9a3..04fa5d0c 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
@@ -189,7 +189,7 @@ public abstract class OperationalJob implements Task<Void>
/**
* OperationalJob body. The implementation is executed in a blocking
manner.
*/
- protected abstract void executeInternal();
+ protected abstract void executeInternal() throws Exception;
/**
* Execute the job behavior as specified in the internal execution {@link
#executeInternal()},
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index ebf94af9..8f7a2f6e 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -48,6 +48,7 @@ import
org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler;
import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler;
import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler;
import org.apache.cassandra.sidecar.handlers.NodeDrainHandler;
+import org.apache.cassandra.sidecar.handlers.NodeMoveHandler;
import org.apache.cassandra.sidecar.handlers.OperationalJobHandler;
import org.apache.cassandra.sidecar.handlers.RingHandler;
import org.apache.cassandra.sidecar.handlers.SchemaHandler;
@@ -180,6 +181,33 @@ public class CassandraOperationsModule extends
AbstractModule
return factory.buildRouteWithHandler(nodeDrainHandler);
}
+ @PUT
+ @Path(ApiEndpointsV1.NODE_MOVE_ROUTE)
+ @Operation(summary = "Move node to new token",
+ description = "Moves the Cassandra node to a new token in the
ring")
+ @APIResponse(description = "Node move operation completed successfully",
+ responseCode = "200",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
OperationalJobResponse.class)))
+ @APIResponse(description = "Node move operation initiated successfully",
+ responseCode = "202",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
OperationalJobResponse.class)))
+ @APIResponse(description = "Conflicting node move job encountered",
+ responseCode = "409",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
OperationalJobResponse.class)))
+ @ProvidesIntoMap
+ @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeMoveRouteKey.class)
+ VertxRoute cassandraNodeMoveRoute(RouteBuilder.Factory factory,
+ NodeMoveHandler nodeMoveHandler)
+ {
+ return factory.builderForRoute()
+ .setBodyHandler(true)
+ .handler(nodeMoveHandler)
+ .build();
+ }
+
@GET
@Path(ApiEndpointsV1.STREAM_STATS_ROUTE)
@Operation(summary = "Get stream statistics",
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index c798fbde..9875d90b 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -89,6 +89,11 @@ public interface VertxRouteMapKeys
HttpMethod HTTP_METHOD = HttpMethod.PUT;
String ROUTE_URI = ApiEndpointsV1.NODE_DRAIN_ROUTE;
}
+ interface CassandraNodeMoveRouteKey extends RouteClassKey
+ {
+ HttpMethod HTTP_METHOD = HttpMethod.PUT;
+ String ROUTE_URI = ApiEndpointsV1.NODE_MOVE_ROUTE;
+ }
interface CassandraNodeSettingsRouteKey extends RouteClassKey
{
HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java
new file mode 100644
index 00000000..1ff8294c
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeMoveHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeMoveHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeMoveHandlerTest.class);
+ public static final String MOVE_ROUTE =
"/api/v1/cassandra/operations/move";
+ public static final String LOCAL_HOST = "127.0.0.1";
+ public static final String OPERATION_MODE_MOVING = "MOVING";
+ public static final String OPERATION_MODE_NORMAL = "NORMAL";
+ Vertx vertx;
+ Server server;
+ StorageOperations mockStorageOperations = mock(StorageOperations.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
NodeMoveHandlerTest.NodeMoveTestModule());
+ injector = Guice.createInjector(Modules.override(SidecarModules.all())
+ .with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testMoveLongRunning(VertxTestContext context) throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+ .when(mockStorageOperations).move(anyString());
+
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_ACCEPTED)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(RUNNING);
+ assertThat(moveResponse.operation()).isEqualTo("move");
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveCompleted(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Move Response: {}", response.bodyAsString());
+
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ assertThat(moveResponse.operation()).isEqualTo("move");
+ try
+ {
+ verify(mockStorageOperations).move("123456789");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveFailed(VertxTestContext context) throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doThrow(new RuntimeException("Simulated
failure")).when(mockStorageOperations).move(anyString());
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_OK)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(FAILED);
+ assertThat(moveResponse.operation()).isEqualTo("move");
+ assertThat(moveResponse.reason()).isEqualTo("Simulated
failure");
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveConflictAlreadyMoving(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_CONFLICT)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
+ LOGGER.info("Move Response: {}", response.bodyAsString());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.jobId()).isNotNull();
+ try
+ {
+ verify(mockStorageOperations,
never()).move(anyString()); // Should not call move when already moving
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithMissingToken(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{}"; // Empty JSON body
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_BAD_REQUEST)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
+ try
+ {
+ verify(mockStorageOperations, never()).move(anyString());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithEmptyToken(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"\"}"; // Empty token in JSON
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_BAD_REQUEST)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
+ try
+ {
+ verify(mockStorageOperations, never()).move(anyString());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithInvalidTokenTooLong(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ // Create a token string that is exactly 128 characters (should fail)
+ String longToken = "a".repeat(128);
+ String requestBody = "{\"newToken\":\"" + longToken + "\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_BAD_REQUEST)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
+ try
+ {
+ verify(mockStorageOperations, never()).move(anyString());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithValidAlphanumericToken(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"validtoken123\"}"; // Valid
alphanumeric token
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ try
+ {
+ verify(mockStorageOperations).move("validtoken123");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithTokenAtMaxLength(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ // Create a token string that is 127 characters (should pass)
+ String maxLengthToken = "a".repeat(127);
+ String requestBody = "{\"newToken\":\"" + maxLengthToken + "\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ try
+ {
+ verify(mockStorageOperations).move(maxLengthToken);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithNegativeToken(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"-9223372036854775808\"}"; //
Negative token (valid for Murmur3)
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ try
+ {
+
verify(mockStorageOperations).move("-9223372036854775808");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveWithZeroToken(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"0\"}"; // Zero token
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ try
+ {
+ verify(mockStorageOperations).move("0");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ /**
+ * Test guice module for Node Move handler tests
+ */
+ class NodeMoveTestModule extends AbstractModule
+ {
+ @Provides
+ @Singleton
+ public InstancesMetadata instanceMetadata()
+ {
+ final int instanceId = 100;
+ final String host = LOCAL_HOST;
+ final InstanceMetadata instanceMetadata =
mock(InstanceMetadata.class);
+ when(instanceMetadata.host()).thenReturn(host);
+ when(instanceMetadata.port()).thenReturn(9042);
+ when(instanceMetadata.id()).thenReturn(instanceId);
+ when(instanceMetadata.stagingDir()).thenReturn("");
+
+ CassandraAdapterDelegate delegate =
mock(CassandraAdapterDelegate.class);
+
+
when(delegate.storageOperations()).thenReturn(mockStorageOperations);
+ when(instanceMetadata.delegate()).thenReturn(delegate);
+
+ InstancesMetadata mockInstancesMetadata =
mock(InstancesMetadata.class);
+
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+ return mockInstancesMetadata;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java
new file mode 100644
index 00000000..ecd4271b
--- /dev/null
+++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeMoveJobTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link NodeMoveJob}
+ */
+class NodeMoveJobTest
+{
+ public static final String OPERATION_MOVE = "move";
+ public static final String OPERATION_MODE_MOVING = "MOVING";
+ public static final String OPERATION_MODE_NORMAL = "NORMAL";
+ public static final String OPERATION_MODE_JOINING = "JOINING";
+ private StorageOperations mockStorageOperations;
+ private UUID jobId;
+ private String newToken;
+
+ @BeforeEach
+ void setUp()
+ {
+ mockStorageOperations = mock(StorageOperations.class);
+ jobId = UUIDs.timeBased();
+ newToken = "123456789";
+ }
+
+ @Test
+ void testJobName()
+ {
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.name()).isEqualTo(OPERATION_MOVE);
+ }
+
+ @Test
+ void testJobId()
+ {
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.jobId()).isEqualTo(jobId);
+ }
+
+ @Test
+ void testIsRunningOnCassandraWhenMoving()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_MOVING);
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.isRunningOnCassandra()).isTrue();
+ }
+
+ @Test
+ void testIsRunningOnCassandraWhenNormal()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testIsRunningOnCassandraWhenOtherMode()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_JOINING);
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testStatusWhenNormal()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+ assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED);
+ }
+
+ @Test
+ void testStatusWhenFailed() throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ RuntimeException testException = new RuntimeException("Test failure");
+ doThrow(testException).when(mockStorageOperations).move(newToken);
+
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+
+ Promise<Void> promise = Promise.promise();
+ job.execute(promise);
+
+ assertThat(promise.future().failed()).isTrue();
+ assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED);
+ }
+
+ @Test
+ void testExecuteInternalCallsMove() throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+
+ Promise<Void> promise = Promise.promise();
+ job.execute(promise);
+
+ verify(mockStorageOperations).move(newToken);
+ assertThat(promise.future().succeeded()).isTrue();
+ }
+
+ @Test
+ void testExecuteInternalHandlesException() throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ RuntimeException testException = new RuntimeException("Test
exception");
+ doThrow(testException).when(mockStorageOperations).move(newToken);
+
+ NodeMoveJob job = new NodeMoveJob(jobId, newToken,
mockStorageOperations);
+
+ Promise<Void> promise = Promise.promise();
+ job.execute(promise);
+
+ verify(mockStorageOperations).move(newToken);
+ assertThat(promise.future().failed()).isTrue();
+
assertThat(promise.future().cause()).isInstanceOf(OperationalJobException.class);
+
assertThat(promise.future().cause().getCause()).isEqualTo(testException);
+ assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED);
+ }
+
+ @Test
+ void testJobWithNegativeToken()
+ {
+ String negativeToken = "-9223372036854775808";
+ NodeMoveJob job = new NodeMoveJob(jobId, negativeToken,
mockStorageOperations);
+ assertThat(job.name()).isEqualTo(OPERATION_MOVE);
+ assertThat(job.jobId()).isEqualTo(jobId);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]