nvharikrishna commented on code in PR #321:
URL: https://github.com/apache/cassandra-sidecar/pull/321#discussion_r2906812730


##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java:
##########
@@ -124,6 +130,130 @@ public Future<OperationStatus> downloadFiles()
                .otherwise(this::handleDownloadFailure);
     }
 
+    /**
+     * Validates via cluster gossip that destination has not been started.
+     * This is a best-effort safety check performed before each file download 
iteration.
+     * <p>
+     * Fetches gossip info from cluster instances and checks:
+     * <ul>
+     *   <li>Source node is present in gossip</li>
+     *   <li>Destination node is NOT present in gossip</li>
+     * </ul>
+     * <p>
+     * <strong>Note:</strong> This cannot guarantee destination won't start 
during data copy.
+     * Operators must ensure destination is not started until migration 
completes.
+     *
+     * @return Future that succeeds if validation passes, fails with
+     *         {@link LiveMigrationInvalidRequestException} if unsafe 
condition detected
+     */
+    private Future<Void> checkGossip()
+    {
+        // Skip gossip check if explicitly requested
+        if (request.skipGossipCheck != null && request.skipGossipCheck)
+        {
+            LOGGER.warn("{} Skipping gossip safety check as requested. " +
+                        "This bypasses validation that destination has not 
been started.", logPrefix);
+            return Future.succeededFuture();
+        }
+
+        String destination = instanceMetadata.host();
+        LOGGER.debug("{} Validating gossip state: source={}, destination={}", 
logPrefix, source, destination);
+
+        // Apply configuration defaults for optional gossip fetch parameters 
if not specified by client
+        int batchSize = request.gossipFetchBatchSize != null
+                        ? request.gossipFetchBatchSize
+                        : liveMigrationConfiguration.gossipFetchBatchSize();
+        int maxRetries = request.gossipFetchMaxRetries != null
+                         ? request.gossipFetchMaxRetries
+                         : liveMigrationConfiguration.gossipFetchMaxRetries();
+
+        GossipInfoFetcher gossipFetcher = new GossipInfoFetcher(
+        sidecarClient,
+        instancesMetadata,
+        port,
+        batchSize,
+        maxRetries);
+
+        return gossipFetcher.fetchGossipInfo()
+                            .compose(gossipResponse -> {
+                                try
+                                {
+                                    return 
validateGossipResponse(gossipResponse, source, destination);
+                                }
+                                catch (UnknownHostException e)
+                                {
+                                    return Future.failedFuture(e);
+                                }
+                            })
+                            .onSuccess(v -> LOGGER.debug("{} Gossip validation 
passed", logPrefix))
+                            .onFailure(err -> LOGGER.error("{} Gossip 
validation failed: {}", logPrefix, err.getMessage()));
+    }
+
+    /**
+     * Validates the gossip response to ensure source is present in gossip and 
destination is not present.
+     *
+     * @param gossipResponse the gossip information from a healthy node
+     * @param source         the source instance to check for
+     * @param destination    the destination instance to check for
+     * @return Future that succeeds if source is found and destination not 
found,
+     * fails if source is not found or destination is found in gossip
+     */
+    private Future<Void> validateGossipResponse(GossipInfoResponse 
gossipResponse,
+                                                String source,
+                                                String destination) throws 
UnknownHostException
+    {
+        GossipInfoResponse.GossipInfo srcInfo = findGossipInfo(gossipResponse, 
source);
+        if (srcInfo == null)
+        {
+            String errorMsg = "SAFETY CHECK FAILED: Source node '" + source + 
"' not found in cluster gossip. " +
+                              "Cannot proceed with data copy. Please fix the 
source/cluster " +
+                              "and then re-trigger data copy task if required";
+            LOGGER.error("{} {}", logPrefix, errorMsg);
+            return Future.failedFuture(new 
LiveMigrationInvalidRequestException(errorMsg));
+        }
+
+        GossipInfoResponse.GossipInfo destInfo = 
findGossipInfo(gossipResponse, destination);
+        if (destInfo != null)
+        {
+            // Destination found in gossip - it was started!
+            String status = destInfo.statusWithPort();
+            String errorMsg = String.format(
+            "SAFETY CHECK FAILED: Destination node '%s' found in cluster 
gossip with status '%s'. " +
+            "This indicates Cassandra was previously started on the 
destination. " +
+            "Data copy would overwrite potentially newer data, causing DATA 
LOSS. Aborting.",
+            destination, status);
+
+            LOGGER.error("{} {}", logPrefix, errorMsg);
+            return Future.failedFuture(new 
LiveMigrationInvalidRequestException(errorMsg));
+        }
+
+        // Reached here means, source found and destination not found in 
gossip - safe to proceed
+        LOGGER.info("{} Gossip validation passed: destination {} not found in 
cluster gossip",
+                    logPrefix, destination);
+        return Future.succeededFuture();
+    }
+
+    private GossipInfoResponse.GossipInfo findGossipInfo(GossipInfoResponse 
gossipResponse,
+                                                         String instance) 
throws UnknownHostException
+    {
+        InstanceMetadata metadata = 
instancesMetadata.instanceFromHost(instance);

Review Comment:
   Thanks for catching it! I think it changes most of the things in this pr. 
InstanceMetadata is needed for both destination (local) and source (remote). 
Unfortunately, I don't see a way to get either source instance metadata or 
cluster information (considering source and/or destination may be down briefly 
during migration), which are needed to fetch gossip info and perform 
validation. I will remove Gossipinfo-related checks. Probably, I will create a 
plug-in/placeholder so that if anyone wants to inject any custom check, they 
can do it.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +107,85 @@ Future<LiveMigrationTask> 
createDataCopyTask(LiveMigrationDataCopyRequest reques
                                                  String source,
                                                  InstanceMetadata 
localInstanceMetadata)
     {
-        LiveMigrationTask newTask = createTask(request,
-                                               source,
-                                               
sidecarConfiguration.serviceConfiguration().port(),
-                                               localInstanceMetadata);
-
-        // It is possible to serve only one live migration data copy request 
per instance at a time.
-        // Checking if there is another migration is in progress before 
accepting new one.
-        boolean accepted = currentTasks.compute(localInstanceMetadata.id(), 
(integer, taskInMap) -> {
-            if (taskInMap == null)
+        // Fast local JMX check before creating task - prevents task creation 
if Cassandra is running
+        return verifyCassandraNotRunning(localInstanceMetadata)
+               .compose(v -> {
+                   LiveMigrationTask newTask = createTask(request,
+                                                          source,
+                                                          
sidecarConfiguration.serviceConfiguration().port(),
+                                                          
localInstanceMetadata);
+
+                   // It is possible to serve only one live migration data 
copy request per instance at a time.
+                   // Checking if there is another migration is in progress 
before accepting new one.
+                   boolean accepted = 
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+                       if (taskInMap == null)
+                       {
+                           return newTask;
+                       }
+
+                       if (!taskInMap.isCompleted())
+                       {
+                           // Accept new task if and only if the existing task 
has completed.
+                           return taskInMap;
+                       }
+                       else
+                       {
+                           return newTask;
+                       }
+                   }) == newTask;
+
+                   if (!accepted)
+                   {
+                       return Future.failedFuture(
+                       new LiveMigrationDataCopyInProgressException("Another 
task is already under progress. Cannot accept new task."));
+                   }
+                   LOGGER.info("Starting data copy task with id={}, source={}, 
destination={}",
+                               newTask.id(), source, 
localInstanceMetadata.host());
+                   newTask.start();
+                   return Future.succeededFuture(newTask);
+               });
+    }
+
+    /**
+     * Initiating data copy once a Cassandra instance starts is not 
acceptable. This method checks whether
+     * Cassandra is running or not at the moment on the destination instance 
by checking if Sidecar
+     * was able to connect to the Cassandra instance's JMX port. It returns a 
failed future if Sidecar
+     * is able to connect to the JMX port of Cassandra.
+     *
+     * @param localInstance metadata for the local Cassandra instance
+     * @return Future that succeeds if Cassandra is not running, fails if it 
is running
+     */
+    private Future<Void> verifyCassandraNotRunning(InstanceMetadata 
localInstance)
+    {
+        return executorPools.internal().executeBlocking(() -> {

Review Comment:
   You are right, I took it out of 'executeBlocking'. Since gossip-based check 
is also present and it is only the first level of defense, I am fine with 
checking the cached value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to