nvharikrishna commented on code in PR #321:
URL: https://github.com/apache/cassandra-sidecar/pull/321#discussion_r2906812730
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java:
##########
@@ -124,6 +130,130 @@ public Future<OperationStatus> downloadFiles()
.otherwise(this::handleDownloadFailure);
}
+ /**
+ * Validates via cluster gossip that destination has not been started.
+ * This is a best-effort safety check performed before each file download
iteration.
+ * <p>
+ * Fetches gossip info from cluster instances and checks:
+ * <ul>
+ * <li>Source node is present in gossip</li>
+ * <li>Destination node is NOT present in gossip</li>
+ * </ul>
+ * <p>
+ * <strong>Note:</strong> This cannot guarantee destination won't start
during data copy.
+ * Operators must ensure destination is not started until migration
completes.
+ *
+ * @return Future that succeeds if validation passes, fails with
+ * {@link LiveMigrationInvalidRequestException} if unsafe
condition detected
+ */
+ private Future<Void> checkGossip()
+ {
+ // Skip gossip check if explicitly requested
+ if (request.skipGossipCheck != null && request.skipGossipCheck)
+ {
+ LOGGER.warn("{} Skipping gossip safety check as requested. " +
+ "This bypasses validation that destination has not
been started.", logPrefix);
+ return Future.succeededFuture();
+ }
+
+ String destination = instanceMetadata.host();
+ LOGGER.debug("{} Validating gossip state: source={}, destination={}",
logPrefix, source, destination);
+
+ // Apply configuration defaults for optional gossip fetch parameters
if not specified by client
+ int batchSize = request.gossipFetchBatchSize != null
+ ? request.gossipFetchBatchSize
+ : liveMigrationConfiguration.gossipFetchBatchSize();
+ int maxRetries = request.gossipFetchMaxRetries != null
+ ? request.gossipFetchMaxRetries
+ : liveMigrationConfiguration.gossipFetchMaxRetries();
+
+ GossipInfoFetcher gossipFetcher = new GossipInfoFetcher(
+ sidecarClient,
+ instancesMetadata,
+ port,
+ batchSize,
+ maxRetries);
+
+ return gossipFetcher.fetchGossipInfo()
+ .compose(gossipResponse -> {
+ try
+ {
+ return
validateGossipResponse(gossipResponse, source, destination);
+ }
+ catch (UnknownHostException e)
+ {
+ return Future.failedFuture(e);
+ }
+ })
+ .onSuccess(v -> LOGGER.debug("{} Gossip validation
passed", logPrefix))
+ .onFailure(err -> LOGGER.error("{} Gossip
validation failed: {}", logPrefix, err.getMessage()));
+ }
+
+ /**
+ * Validates the gossip response to ensure source is present in gossip and
destination is not present.
+ *
+ * @param gossipResponse the gossip information from a healthy node
+ * @param source the source instance to check for
+ * @param destination the destination instance to check for
+ * @return Future that succeeds if source is found and destination not
found,
+ * fails if source is not found or destination is found in gossip
+ */
+ private Future<Void> validateGossipResponse(GossipInfoResponse
gossipResponse,
+ String source,
+ String destination) throws
UnknownHostException
+ {
+ GossipInfoResponse.GossipInfo srcInfo = findGossipInfo(gossipResponse,
source);
+ if (srcInfo == null)
+ {
+ String errorMsg = "SAFETY CHECK FAILED: Source node '" + source +
"' not found in cluster gossip. " +
+ "Cannot proceed with data copy. Please fix the
source/cluster " +
+ "and then re-trigger data copy task if required";
+ LOGGER.error("{} {}", logPrefix, errorMsg);
+ return Future.failedFuture(new
LiveMigrationInvalidRequestException(errorMsg));
+ }
+
+ GossipInfoResponse.GossipInfo destInfo =
findGossipInfo(gossipResponse, destination);
+ if (destInfo != null)
+ {
+ // Destination found in gossip - it was started!
+ String status = destInfo.statusWithPort();
+ String errorMsg = String.format(
+ "SAFETY CHECK FAILED: Destination node '%s' found in cluster
gossip with status '%s'. " +
+ "This indicates Cassandra was previously started on the
destination. " +
+ "Data copy would overwrite potentially newer data, causing DATA
LOSS. Aborting.",
+ destination, status);
+
+ LOGGER.error("{} {}", logPrefix, errorMsg);
+ return Future.failedFuture(new
LiveMigrationInvalidRequestException(errorMsg));
+ }
+
+ // Reached here means, source found and destination not found in
gossip - safe to proceed
+ LOGGER.info("{} Gossip validation passed: destination {} not found in
cluster gossip",
+ logPrefix, destination);
+ return Future.succeededFuture();
+ }
+
+ private GossipInfoResponse.GossipInfo findGossipInfo(GossipInfoResponse
gossipResponse,
+ String instance)
throws UnknownHostException
+ {
+ InstanceMetadata metadata =
instancesMetadata.instanceFromHost(instance);
Review Comment:
Thanks for catching it! I think it changes most of the things in this pr.
InstanceMetadata is needed for both destination (local) and source (remote).
Unfortunately, I don't see a way to get either source instance metadata or
cluster information (considering source and/or destination may be down briefly
during migration), which are needed to fetch gossip info and perform
validation. I will remove Gossipinfo-related checks. Probably, I will create a
plug-in/placeholder so that if anyone wants to inject any custom check, they
can do it.
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +107,85 @@ Future<LiveMigrationTask>
createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted =
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ if (!taskInMap.isCompleted())
+ {
+ // Accept new task if and only if the existing task
has completed.
+ return taskInMap;
+ }
+ else
+ {
+ return newTask;
+ }
+ }) == newTask;
+
+ if (!accepted)
+ {
+ return Future.failedFuture(
+ new LiveMigrationDataCopyInProgressException("Another
task is already under progress. Cannot accept new task."));
+ }
+ LOGGER.info("Starting data copy task with id={}, source={},
destination={}",
+ newTask.id(), source,
localInstanceMetadata.host());
+ newTask.start();
+ return Future.succeededFuture(newTask);
+ });
+ }
+
+ /**
+ * Initiating data copy once a Cassandra instance starts is not
acceptable. This method checks whether
+ * Cassandra is running or not at the moment on the destination instance
by checking if Sidecar
+ * was able to connect to the Cassandra instance's JMX port. It returns a
failed future if Sidecar
+ * is able to connect to the JMX port of Cassandra.
+ *
+ * @param localInstance metadata for the local Cassandra instance
+ * @return Future that succeeds if Cassandra is not running, fails if it
is running
+ */
+ private Future<Void> verifyCassandraNotRunning(InstanceMetadata
localInstance)
+ {
+ return executorPools.internal().executeBlocking(() -> {
Review Comment:
You are right, I took it out of 'executeBlocking'. Since gossip-based check
is also present and it is only the first level of defense, I am fine with
checking the cached value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]