nvharikrishna commented on code in PR #252:
URL: https://github.com/apache/cassandra-sidecar/pull/252#discussion_r2296498118


##########
client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java:
##########


Review Comment:
   yeah, somehow missed it. Updated the code.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
+
+/**
+ * Factory implementation which produces {@link LiveMigrationTask} instances.
+ */
+@Singleton
+public class LiveMigrationTaskFactoryImpl implements LiveMigrationTaskFactory
+{
+
+    private final Vertx vertx;
+    private final SidecarClientProvider sidecarClientProvider;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final ExecutorPools executorPools;
+
+    @Inject
+    public LiveMigrationTaskFactoryImpl(Vertx vertx,
+                                        ExecutorPools executorPools,
+                                        SidecarClientProvider 
sidecarClientProvider,
+                                        SidecarConfiguration 
sidecarConfiguration)
+    {
+        this.vertx = vertx;
+        this.executorPools = executorPools;
+        this.sidecarClientProvider = sidecarClientProvider;
+        this.liveMigrationConfiguration = 
sidecarConfiguration.liveMigrationConfiguration();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public LiveMigrationTask create(final LiveMigrationDataCopyRequest request,
+                                    final String source,
+                                    final int port,
+                                    final InstanceMetadata instanceMetadata)

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -68,10 +177,10 @@ protected void configure()
                  schema = @Schema(type = SchemaType.OBJECT)))
     @ProvidesIntoMap
     
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationFileStreamHandlerRouteKey.class)
-    VertxRoute downloadFileRoute(RouteBuilder.Factory factory,
-                                 LiveMigrationApiEnableDisableHandler 
liveMigrationApiEnableDisableHandler,
-                                 LiveMigrationFileStreamHandler 
liveMigrationFileStreamHandler,
-                                 FileStreamHandler fileStreamHandler)
+    public VertxRoute downloadFileRoute(RouteBuilder.Factory factory,

Review Comment:
   Injellij showing `@GET method must be public` error. So, modified them to 
public.



##########
server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A task executor that manages concurrent execution of asynchronous tasks 
with configurable concurrency limit.
+ *
+ * <h2>Overview</h2>
+ * <p>This executor ensures that only a specified number of tasks (defined by 
{@code maxConcurrency})
+ * execute simultaneously. When a task completes, the next pending task is 
automatically
+ * triggered, maintaining optimal resource utilization while respecting 
concurrency constraints.</p>
+ *
+ * <h2>Architecture</h2>
+ * <p>The executor uses a promise-based triggering mechanism:</p>
+ * <ul>
+ *   <li>Each task is associated with a {@link Promise} that acts as a trigger 
signal</li>
+ *   <li>Tasks only start execution when their corresponding Promise completes 
successfully</li>
+ *   <li>Initially, {@code maxConcurrency} number of promises are completed to 
start the execution flow</li>
+ *   <li>As tasks complete, the next pending task is automatically 
triggered</li>
+ * </ul>
+ *
+ * <h2>Concurrency Control</h2>
+ * <p>The executor maintains a global task index using {@link AtomicInteger} 
to ensure thread-safe
+ * task scheduling. Tasks are processed in the order they were submitted, with 
automatic
+ * flow control to maintain the desired concurrency level.</p>
+ *
+ * <h2>Error Handling</h2>
+ * <p>When a task completes (successfully or with a non-{@link 
CancellationException} error),
+ * the executor automatically triggers the next pending task. Tasks that are 
cancelled do not
+ * trigger subsequent tasks to prevent further executions.</p>
+ *
+ * <h2>Usage Example</h2>
+ * <pre>{@code
+ * List<Callable<Future<String>>> tasks = Arrays.asList(
+ *     () -> downloadFile("file1.txt"),
+ *     () -> downloadFile("file2.txt"),
+ *     () -> downloadFile("file3.txt")
+ * );
+ *
+ * AsyncConcurrentTaskExecutor<String> executor =
+ *     new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can 
execute concurrently
+ *
+ * List<Future<String>> results = executor.start();
+ * }</pre>
+ *
+ * @param <T> the type of result produced by the tasks
+ */
+public class AsyncConcurrentTaskExecutor<T>
+{
+    protected static final String TASK_CANCEL_MESSAGE = "task cancelled";
+    private final AtomicInteger index = new AtomicInteger(0);
+    private final int maxConcurrency;
+    private final Vertx vertx;
+    private final List<Promise<T>> taskPromises;
+    private final List<Callable<Future<T>>> tasks;
+
+    /**
+     * Creates a new AsyncConcurrentTaskExecutor.
+     *
+     * @param vertx          the Vert.x instance for context execution
+     * @param tasks          the list of tasks to execute (must not be empty)
+     * @param maxConcurrency the maximum number of tasks to execute 
concurrently (must be &gt; 0)
+     * @throws IllegalArgumentException if maxConcurrency &lt;= 0 or tasks is 
empty
+     */
+    public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx,
+                                       @NotNull List<Callable<Future<T>>> 
tasks,
+                                       int maxConcurrency)
+    {
+        Objects.requireNonNull(vertx, "vertx must not be null.");
+        Objects.requireNonNull(tasks, "tasks must not be null.");
+        if (maxConcurrency < 1)
+        {
+            throw (new IllegalArgumentException("maxConcurrency must be > 0"));
+        }
+
+        this.tasks = Collections.unmodifiableList(tasks);

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A task executor that manages concurrent execution of asynchronous tasks 
with configurable concurrency limit.
+ *
+ * <h2>Overview</h2>
+ * <p>This executor ensures that only a specified number of tasks (defined by 
{@code maxConcurrency})
+ * execute simultaneously. When a task completes, the next pending task is 
automatically
+ * triggered, maintaining optimal resource utilization while respecting 
concurrency constraints.</p>
+ *
+ * <h2>Architecture</h2>
+ * <p>The executor uses a promise-based triggering mechanism:</p>
+ * <ul>
+ *   <li>Each task is associated with a {@link Promise} that acts as a trigger 
signal</li>
+ *   <li>Tasks only start execution when their corresponding Promise completes 
successfully</li>
+ *   <li>Initially, {@code maxConcurrency} number of promises are completed to 
start the execution flow</li>
+ *   <li>As tasks complete, the next pending task is automatically 
triggered</li>
+ * </ul>
+ *
+ * <h2>Concurrency Control</h2>
+ * <p>The executor maintains a global task index using {@link AtomicInteger} 
to ensure thread-safe
+ * task scheduling. Tasks are processed in the order they were submitted, with 
automatic
+ * flow control to maintain the desired concurrency level.</p>
+ *
+ * <h2>Error Handling</h2>
+ * <p>When a task completes (successfully or with a non-{@link 
CancellationException} error),
+ * the executor automatically triggers the next pending task. Tasks that are 
cancelled do not
+ * trigger subsequent tasks to prevent further executions.</p>
+ *
+ * <h2>Usage Example</h2>
+ * <pre>{@code
+ * List<Callable<Future<String>>> tasks = Arrays.asList(
+ *     () -> downloadFile("file1.txt"),
+ *     () -> downloadFile("file2.txt"),
+ *     () -> downloadFile("file3.txt")
+ * );
+ *
+ * AsyncConcurrentTaskExecutor<String> executor =
+ *     new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can 
execute concurrently
+ *
+ * List<Future<String>> results = executor.start();
+ * }</pre>
+ *
+ * @param <T> the type of result produced by the tasks
+ */
+public class AsyncConcurrentTaskExecutor<T>
+{
+    protected static final String TASK_CANCEL_MESSAGE = "task cancelled";
+    private final AtomicInteger index = new AtomicInteger(0);
+    private final int maxConcurrency;
+    private final Vertx vertx;
+    private final List<Promise<T>> taskPromises;
+    private final List<Callable<Future<T>>> tasks;
+
+    /**
+     * Creates a new AsyncConcurrentTaskExecutor.
+     *
+     * @param vertx          the Vert.x instance for context execution
+     * @param tasks          the list of tasks to execute (must not be empty)
+     * @param maxConcurrency the maximum number of tasks to execute 
concurrently (must be &gt; 0)
+     * @throws IllegalArgumentException if maxConcurrency &lt;= 0 or tasks is 
empty
+     */
+    public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx,
+                                       @NotNull List<Callable<Future<T>>> 
tasks,
+                                       int maxConcurrency)
+    {
+        Objects.requireNonNull(vertx, "vertx must not be null.");
+        Objects.requireNonNull(tasks, "tasks must not be null.");
+        if (maxConcurrency < 1)
+        {
+            throw (new IllegalArgumentException("maxConcurrency must be > 0"));
+        }
+
+        this.tasks = Collections.unmodifiableList(tasks);
+        this.maxConcurrency = maxConcurrency;
+        this.vertx = vertx;
+        taskPromises = tasks.stream()
+                            .map(item -> Promise.<T>promise())
+                            .collect(Collectors.toUnmodifiableList());
+    }
+
+    /**
+     * Returns the list of task promises for testing purposes.
+     *
+     * @return unmodifiable list of task promises
+     */
+    @VisibleForTesting
+    List<Promise<T>> getTaskPromises()
+    {
+        return taskPromises;
+    }
+
+    /**
+     * Starts the execution of tasks with the configured concurrency limit.
+     *
+     * <p>This method immediately starts up to {@code maxConcurrency} tasks 
and returns
+     * futures for all tasks. As running tasks complete, pending tasks are 
automatically
+     * triggered to maintain the concurrency level.</p>
+     *
+     * <h3>Execution Flow Diagram:</h3>
+     * <pre>
+     *            index
+     *             +
+     *             |
+     *             v
+     * +----+----+-+--+----+------------+----+
+     * | p0 | p1 | p2 | p3 |    ...     |pn-1|
+     * +-+--+-+--+-+--+-+--+------------+-+--+
+     *   |    |    |    |                 |
+     *   |    |    +    +                 +
+     *   |    |
+     *   |    |    +    +                 +
+     *   |    |    |    |                 |
+     *   +    +    +    +                 +
+     *   t0   t1   t2   t3                tn-1
+     *  (r)  (r)
+     * (r) = running
+     * </pre>
+     *
+     * @return list of futures representing all tasks (same order as input)
+     */
+    public List<Future<T>> start()
+    {
+        List<Future<T>> taskFutures = new ArrayList<>(tasks.size());
+        for (int i = 0; i < tasks.size(); i++)
+        {
+            Promise<T> p = taskPromises.get(i);
+            taskFutures.add(waitAndDo(p.future(), tasks.get(i)));
+        }
+
+        // Start maxConcurrency no.of tasks
+        for (int i = 0; i < maxConcurrency; i++)
+        {
+            triggerNextTask();
+        }
+        return taskFutures;
+    }
+
+    /**
+     * Associates a task with its trigger promise and handles task execution 
and completion.
+     *
+     * <p>This method creates a future chain where the task only executes when 
the trigger
+     * future completes. Upon task completion, it automatically triggers the 
next pending task.</p>
+     *
+     * @param future the trigger future that must complete before task 
execution
+     * @param task   the task to execute when triggered
+     * @return future representing the task's execution result
+     */
+    private Future<T> waitAndDo(Future<T> future, Callable<Future<T>> task)
+    {
+        return future.compose(ar -> {
+                         try
+                         {
+                             return task.call();
+                         }
+                         catch (Exception e)
+                         {
+                             return Future.failedFuture(e);
+                         }
+                     })
+                     .onComplete(ar -> {
+                         if (ar.succeeded() || !(ar.cause() instanceof 
CancellationException))

Review Comment:
   Expecting the caller to log if required.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)

Review Comment:
   right, already named the methods in State in the recommended way, but missed 
to change here. Updated similar methods too.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDirType.java:
##########
@@ -28,7 +28,7 @@ public enum LiveMigrationDirType
 {
     CDC_RAW_DIR("cdc_raw"),
     COMMIT_LOG_DIR("commitlog"),
-    DATA_FIlE_DIR("data"),
+    DATA_FILE_DIR("data"),

Review Comment:
   yeah, I missed it last time. 



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class)
+    public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      .setBodyHandler(true)
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCreateDataCopyTaskHandler)
+                      .build();
+    }
+
+    @PATCH
+    @Operation(summary = "Cancel data copy task",
+    description = "Cancels an existing data copy task for live migration")
+    @APIResponse(description = "Data copy task cancelled successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "404",
+                 description = "Data copy task not found",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCancelDataCopyTaskRouteKey.class)
+    public VertxRoute cancelDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCancelDataCopyTaskHandler liveMigrationCancelDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCancelDataCopyTaskHandler)
+                      .build();
+    }
+
+    @GET
+    @Operation(summary = "Get data copy task",
+               description = "Retrieves the status and details of a specific 
data copy task by task ID")
+    @APIResponse(description = "Data copy task retrieved successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()

Review Comment:
   Makes sense. Added methods to increment counters.



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTasksSucceeds(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, 
Boolean.TRUE);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            for (Future<Boolean> taskFuture : taskFutures)
+            {
+                assertThat(taskFuture.succeeded()).isTrue();
+                assertThat(taskFuture.result()).isTrue();
+            }
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testLotsOfTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Integer>>> tasks = getDummySuccessTasks(100_000, 
1);
+        AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 50);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            for (Future<Integer> taskFuture : taskFutures)
+            {
+                assertThat(taskFuture.succeeded()).isTrue();
+                assertThat(taskFuture.result()).isEqualTo(1);
+            }
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testTaskCancel(VertxTestContext context)
+    {
+        Promise<Boolean> promise = Promise.promise();
+        List<Callable<Future<Boolean>>> tasks = getDummyPendingTasks(10, 
promise);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+
+        concurrentTaskExecutor.cancelTasks();
+
+        // Complete promise so that first task gets finished
+        promise.fail(new Exception("ERROR!"));
+
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+
+            Future<Boolean> firstTask = taskFutures.get(0);
+            assertThat(firstTask.failed()).isTrue();
+            assertThat(firstTask.cause()).isInstanceOf(Exception.class);
+
+            for (int i = 1; i < taskFutures.size(); i++)
+            {
+                Future<Boolean> task = taskFutures.get(i);
+                assertThat(task.failed()).isTrue();
+                
assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE);
+                
assertThat(task.cause()).isInstanceOf(CancellationException.class);
+            }
+
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testCancelLotOfTasks(VertxTestContext context)
+    {
+        Promise<Integer> promise = Promise.promise();
+        List<Callable<Future<Integer>>> tasks = getDummyPendingTasks(10_000, 
promise);
+        AsyncConcurrentTaskExecutor<Integer> taskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Integer>> taskFutures = taskExecutor.start();
+
+        taskExecutor.cancelTasks();
+
+        // Resolve the promise so that first task will finish
+        promise.complete(0);
+
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+
+            Future<Integer> firstTask = taskFutures.get(0);
+            assertThat(firstTask.isComplete()).isTrue();
+            assertThat(firstTask.succeeded()).isTrue();
+            assertThat(firstTask.result()).isEqualTo(0);
+
+            for (int i = 1; i < taskFutures.size(); i++)
+            {
+                Future<Integer> task = taskFutures.get(i);
+                assertThat(task.failed()).isTrue();
+                
assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE);
+                
assertThat(task.cause()).isInstanceOf(CancellationException.class);
+            }
+
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testCancelWhenTasksInProgress()
+    {
+        Promise<Integer> task1Promise = Promise.promise();
+        Callable<Future<Integer>> task1 = () -> task1Promise.future()
+                                                            .compose(res -> 
Future.succeededFuture(0));
+        Callable<Future<Integer>> task2 = getDummySuccessTask(1);
+        Promise<Integer> task3Promise = Promise.promise();
+        Callable<Future<Integer>> task3 = () -> task3Promise.future()
+                                                            .compose(res -> 
Future.succeededFuture(2));
+        Callable<Future<Integer>> task4 = getDummySuccessTask(3);
+        Callable<Future<Integer>> task5 = getDummySuccessTask(4);
+
+        List<Callable<Future<Integer>>> tasks = Arrays.asList(task1, task2, 
task3, task4, task5);
+        AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 2);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+
+        // Wait for some time so that independent tasks get time finish.
+        CompositeFuture compositeFuture = waitForTasks(taskFutures, 1_000);
+        assertThat(compositeFuture.isComplete()).isFalse();
+        concurrentTaskExecutor.cancelTasks();
+        waitForTasks(compositeFuture, 1_000);
+
+        assertThat(taskFutures.get(0).succeeded()).isFalse();
+        assertThat(taskFutures.get(0).isComplete()).isFalse();
+
+        assertThat(taskFutures.get(1).succeeded()).isTrue();
+        assertThat(taskFutures.get(1).isComplete()).isTrue();
+
+        assertThat(taskFutures.get(2).succeeded()).isFalse();
+        assertThat(taskFutures.get(2).isComplete()).isFalse();
+
+        assertThat(taskFutures.get(3).isComplete()).isTrue();
+        assertThat(taskFutures.get(3).failed()).isTrue();
+        
assertThat(taskFutures.get(3).cause()).isInstanceOf(CancellationException.class);
+
+        assertThat(taskFutures.get(4).failed()).isTrue();
+        assertThat(taskFutures.get(4).isComplete()).isTrue();
+        
assertThat(taskFutures.get(4).cause()).isInstanceOf(CancellationException.class);
+
+        assertThat(compositeFuture.isComplete()).isFalse();
+
+        task1Promise.complete();
+        task3Promise.complete();
+
+        waitForTasks(compositeFuture, 2_000);
+        assertThat(Future.join(taskFutures).isComplete()).isTrue();
+    }
+
+    @Test
+    public void testMaxConcurrency(VertxTestContext context)
+    {
+        AtomicInteger counter = new AtomicInteger(0);
+        int maxConcurrency = 4;
+        List<Callable<Future<Integer>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 1000; i++)
+        {
+            Callable<Future<Integer>> task = 
getDummySuccessCounterTask(counter);
+            tasks.add(task);
+        }
+        TestAsyncTaskExecutor<Integer> concurrentTaskExecutor =
+        new TestAsyncTaskExecutor<>(vertx, tasks, maxConcurrency, counter);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {

Review Comment:
   Don't need this extra indentation right?



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()
+    {
+        return filesDownloaded;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for bytes downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for bytes downloaded
+     */
+    public AtomicLong bytesDownloaded()

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class)
+    public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      .setBodyHandler(true)
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCreateDataCopyTaskHandler)
+                      .build();
+    }
+
+    @PATCH
+    @Operation(summary = "Cancel data copy task",
+    description = "Cancels an existing data copy task for live migration")
+    @APIResponse(description = "Data copy task cancelled successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",

Review Comment:
   right



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########


Review Comment:
   done



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");

Review Comment:
   It failed when there are assertion failures or exceptions. Tried the 
following and the test did fail
   ```java
           Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
               assertThat(1).isEqualTo(2);  // Making sure that it fails
               assertThat(ar).isNotNull();
               assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");
               context.completeNow();
           }));
   ```
   
   context.verify is doing this:
   
   ```java
     public VertxTestContext verify(ExecutionBlock block) {
       Objects.requireNonNull(block, "The block cannot be null");
       try {
         block.apply();
       } catch (Throwable t) {
         failNow(t);
       }
       return this;
     }
   ```
   
   As per vertx documentation 
"_[verify](https://vertx.io/docs/apidocs/io/vertx/junit5/VertxTestContext.html#verify-io.vertx.junit5.VertxTestContext.ExecutionBlock-)
 to perform assertions, any exception thrown from the code block is considered 
as a test failure_", I feel context.verify is good enough.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()
+    {
+        return filesDownloaded;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for bytes downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for bytes downloaded
+     */
+    public AtomicLong bytesDownloaded()
+    {
+        return bytesDownloaded;
+    }
+
+    public int totalFiles()
+    {
+        return totalFiles;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for download failures.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for download failures
+     */
+    public AtomicInteger downloadFailures()
+    {
+        return downloadFailures;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "OperationStatus{" +
+               "bytesDownloaded=" + bytesDownloaded +
+               ", filesDownloaded=" + filesDownloaded +
+               ", filesToDownload=" + filesToDownload +
+               ", totalSize=" + totalSize +
+               ", downloadSize=" + bytesToDownload +
+               ", state=" + state +
+               '}';
+    }
+
+    /**
+     * Represents the various states of a live migration data copy operation.
+     *
+     * <h3>State Descriptions:</h3>
+     * <ul>
+     *   <li><b>STARTING</b> - Initial state when the operation begins</li>
+     *   <li><b>CLEANING</b> - Removing unnecessary files from the 
destination</li>
+     *   <li><b>PREPARING</b> - Analyzing files to determine what needs to be 
downloaded</li>
+     *   <li><b>DOWNLOADING</b> - Actively downloading files from the 
source</li>
+     *   <li><b>DOWNLOAD_COMPLETE</b> - All files have been downloaded 
(terminal state)</li>
+     *   <li><b>SUCCESS</b> - Operation completed successfully (terminal 
state)</li>
+     *   <li><b>FAILED</b> - Operation failed due to an error (terminal 
state)</li>
+     *   <li><b>CANCELLED</b> - Operation was cancelled by user request 
(terminal state)</li>
+     * </ul>
+     * <p>
+     * Invalid transitions throw {@link IllegalStateTransitionException}.
+     */
+    public enum State

Review Comment:
   okay, done



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.DataCopyTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for creating data copy tasks for live migration between Cassandra 
instances.
+ * <p>
+ * This handler processes requests to initiate data copying from a source 
Cassandra instance to a destination instance.
+ * Data copy tasks must be submitted to the destination Sidecar instance. When 
a task is accepted,
+ * this handler responds with HTTP 202 ACCEPTED status and returns a JSON 
response containing:
+ * - taskId: Unique identifier for tracking the created task
+ * - statusUrl: URL that can be used to query the status of the data copy 
operation
+ */
+public class LiveMigrationCreateDataCopyTaskHandler extends 
AbstractHandler<LiveMigrationDataCopyRequest> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateDataCopyTaskHandler.class);
+    private final DataCopyTaskManager dataCopyTaskManager;
+
+    @Inject
+    public LiveMigrationCreateDataCopyTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                  ExecutorPools executorPools,
+                                                  CassandraInputValidator 
validator,
+                                                  DataCopyTaskManager 
dataCopyTaskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.dataCopyTaskManager = dataCopyTaskManager;
+    }
+
+    @Override
+    protected LiveMigrationDataCopyRequest extractParamsOrThrow(RoutingContext 
context)
+    {
+        try
+        {
+            return Json.decodeValue(context.body().buffer(), 
LiveMigrationDataCopyRequest.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Error while decoding values, please check 
your request body.",
+                                    decodeException);
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  LiveMigrationDataCopyRequest 
liveMigrationTaskRequest)
+    {
+        dataCopyTaskManager

Review Comment:
   Thanks for pointing to it. Saw usage of `ConcurrencyLimiter`. In this case, 
only one task is allowed to be in progress at a time per instance which is 
handled by concurrency map. If I have to use `ConcurrencyLimiter`, then need to 
maintain one for each instance and started looking bit complex. I feel current 
implementation is simple and adequate to for this scenario. Is that fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to