yifan-c commented on code in PR #321:
URL: https://github.com/apache/cassandra-sidecar/pull/321#discussion_r2968427387
##########
CHANGES.txt:
##########
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Adding gossip based safety check to Live Migration data copy task endpoint
(CASSSIDECAR-409)
Review Comment:
Please update the change entry and the PR title; they should be the same as
the jira title
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +103,83 @@ Future<LiveMigrationTask>
createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
- {
- return newTask;
- }
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted =
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ if (!taskInMap.isCompleted())
+ {
+ // Accept new task if and only if the existing task
has completed.
+ return taskInMap;
+ }
+ else
+ {
+ return newTask;
+ }
Review Comment:
nit: can be easier to read with ternary operator
```suggestion
// Accept new task if and only if the existing
taskInMap has completed.
return taskInMap.isCompleted() ? newTask : taskInMap;
```
##########
server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java:
##########
@@ -324,7 +393,7 @@ private <T> void awaitForFuture(Future<T> future) throws
InterruptedException
CountDownLatch latch = new CountDownLatch(1);
future.onComplete(res -> latch.countDown());
- latch.await(100, TimeUnit.MILLISECONDS);
+ latch.await(2, TimeUnit.SECONDS);
Review Comment:
Why the wait time is increased?
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +103,83 @@ Future<LiveMigrationTask>
createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
- {
- return newTask;
- }
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted =
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ if (!taskInMap.isCompleted())
+ {
+ // Accept new task if and only if the existing task
has completed.
+ return taskInMap;
+ }
+ else
+ {
+ return newTask;
+ }
+ }) == newTask;
Review Comment:
nit: can you break it into 2 statements? It is easy to miss the `== newTask`
portion.
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java:
##########
@@ -744,4 +768,51 @@ public FileAttributes(long size, long lastModifiedTime)
this.lastModifiedTime = lastModifiedTime;
}
}
+
+ /**
+ * Implementation of {@link
LiveMigrationFileDownloadPreCheck.PreCheckContext} that provides
+ * the downloader's context to pre-check implementations.
+ */
+ static class PreCheckContextImpl implements
LiveMigrationFileDownloadPreCheck.PreCheckContext
Review Comment:
make it `private static class`?
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +103,83 @@ Future<LiveMigrationTask>
createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
- {
- return newTask;
- }
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted =
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ if (!taskInMap.isCompleted())
+ {
+ // Accept new task if and only if the existing task
has completed.
+ return taskInMap;
+ }
+ else
+ {
+ return newTask;
+ }
+ }) == newTask;
+
+ if (!accepted)
+ {
+ return Future.failedFuture(
+ new LiveMigrationDataCopyInProgressException("Another
task is already under progress. Cannot accept new task."));
Review Comment:
It is identical if you just throw the exception.
```suggestion
throw new
LiveMigrationDataCopyInProgressException("Another task is already under
progress. Cannot accept new task.");
```
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+
+/**
+ * A pluggable pre-check hook that runs before each file download iteration in
the live migration
+ * data copy process. Since the data copy involves deleting local files and
downloading files from
+ * the source, implementations can perform safety validations (e.g., verifying
cluster state via gossip,
+ * checking instance readiness) to prevent data corruption or unsafe
operations.
+ *
+ * <p>The pre-check is invoked at the beginning of every download iteration
(not just the first one),
+ * allowing implementations to continuously validate that conditions remain
safe throughout the
+ * multi-iteration copy process.</p>
+ *
+ * <p>A {@link #DEFAULT} no-op implementation is provided for cases where no
pre-check is needed.
+ * Custom implementations can be bound via Guice to override the default
behavior.</p>
+ *
+ * <p>Example use cases:</p>
+ * <ul>
+ * <li>Gossip-based validation: verify the source node is present and the
destination node
+ * is absent from cluster gossip, preventing data copy to a node that
has already joined</li>
Review Comment:
The implementation does not exist
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]