This is an automated email from the ASF dual-hosted git repository.

sarankk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69638cff CASSSIDECAR-454: Sidecar should wake up immediately on the 
instance receiving a phase signal instead of waiting for the discovery loop 
(#346)
69638cff is described below

commit 69638cffdb06b756e8faedb1ec00ebee528d1933
Author: Mansi Khara <[email protected]>
AuthorDate: Fri May 29 15:10:39 2026 -0700

    CASSSIDECAR-454: Sidecar should wake up immediately on the instance 
receiving a phase signal instead of waiting for the discovery loop (#346)
    
    Patch by Mansi Khara; reviewed by Yifan Cai, Saranya Krishnakumar for 
CASSSIDECAR-454
---
 CHANGES.txt                                        |   1 +
 .../RestoreJobDiscovererPhaseSignalIntTest.java    | 165 +++++++++++++++++++++
 .../RestoreJobDiscovererSAIOptionsIntTest.java     |  39 +++--
 .../sidecar/db/RestoreJobDatabaseAccessor.java     |  23 ++-
 .../handlers/restore/UpdateRestoreJobHandler.java  |  49 ++++--
 .../sidecar/restore/RestoreJobDiscoverer.java      |  37 +++++
 .../RestoreJobDiscovererNodeJoinedIntTest.java     |  17 ++-
 .../RestoreJobDiscovererNodeJoiningIntTest.java    |  15 +-
 .../RestoreJobDiscovererNodeLeftIntTest.java       |  17 ++-
 .../RestoreJobDiscovererNodeMovedIntTest.java      |  17 ++-
 .../handlers/restore/BaseRestoreJobTests.java      |  59 ++++++++
 .../restore/UpdateRestoreJobHandlerTest.java       |  44 ++++++
 12 files changed, 433 insertions(+), 50 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index bdca46e1..589d6547 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Sidecar should wake up immediately on the instance receiving a phase signal 
instead of waiting for the discovery loop (CASSSIDECAR-454)
  * Fix schema initialization race, CI test reporting, and build stability 
(CASSSIDECAR-468)
  * Add FileBasedConfigurationProvider for file-based overlay persistence 
(CASSSIDECAR-426)
  * Avoid blocking the event loop in CdcPublisher event-bus handler 
(CASSSIDECAR-452)
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
new file mode 100644
index 00000000..c335bc68
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.inject.AbstractModule;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import 
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
+import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.db.RestoreRange;
+import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.assertRestoreRange;
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Verifies that {@code STAGE_READY} / {@code IMPORT_READY} phase signals 
delivered via
+ * the {@code UpdateRestoreJobHandler} REST endpoint trigger immediate 
processing on the
+ * receiving Sidecar instance, without waiting for the next discovery loop 
cycle.
+ *
+ * <p>Both discovery loop delays are pinned to 1h via {@link 
#configurationOverrides()},
+ * so any successful observation of restore ranges within seconds can only be 
the work of
+ * the wake-up path added by CASSSIDECAR-454 — not the periodic discovery loop.
+ */
+class RestoreJobDiscovererPhaseSignalIntTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    private static final QualifiedName USER_KEYSPACE_TABLE = new 
QualifiedName("restore_phase_signal_ks", "t");
+    private static final QualifiedTableName SIDECAR_QUALIFIED_TABLE =
+    new QualifiedTableName(USER_KEYSPACE_TABLE.keyspace(), 
USER_KEYSPACE_TABLE.table());
+
+    @Override
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
+    {
+        return builder -> builder.restoreJobConfiguration(
+        RestoreJobConfigurationImpl.builder()
+                                   // Pin both discovery loops to 1h. Any 
range created within seconds
+                                   // is necessarily the wake-up path and not 
the discovery loop.
+                                   
.jobDiscoveryActiveLoopDelay(MillisecondBoundConfiguration.parse("1h"))
+                                   
.jobDiscoveryIdleLoopDelay(MillisecondBoundConfiguration.parse("1h"))
+                                   .build());
+    }
+
+    @Override
+    protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        // Disable the RestoreProcessor so range submission stops at the 
database write,
+        // letting the test assert on RestoreRangeDatabaseAccessor without 
S3/import side effects.
+        serverWrapper = startSidecarWithInstances(cluster, (AbstractModule) 
disableRestoreProcessor());
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(USER_KEYSPACE_TABLE, Map.of("datacenter1", 1));
+        createTestTable(USER_KEYSPACE_TABLE, "CREATE TABLE %s (id text PRIMARY 
KEY, name text);");
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        waitForSchemaReady(30, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testStageReadyImmediatelySubmitsSlices()
+    {
+        RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient();
+        UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE);
+        short bucketId = 0;
+        CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload(
+        "sliceId-stage", bucketId, "bucket", "key", "checksum",
+        BigInteger.valueOf(500L), BigInteger.valueOf(1500L), 100L, 100L);
+        testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId, 
slicePayload);
+
+        RestoreRangeDatabaseAccessor rangeAccessor =
+        serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+        assertThat(rangeAccessor.findAll(jobId, bucketId)).isEmpty();
+
+        testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+                                    UpdateRestoreJobRequestPayload.builder()
+                                                                  
.withStatus(RestoreJobStatus.STAGE_READY)
+                                                                  .build());
+
+        // Single-node cluster owns the entire ring, so the slice (500, 1500] 
is not trimmed.
+        // Ranges are stored with exclusive start, so the slice's start of 500 
surfaces as 499.
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeAccessor.findAll(jobId, bucketId);
+            assertThat(ranges)
+            .describedAs("STAGE_READY should immediately create restore ranges 
via the wake-up path")
+            .hasSize(1);
+            assertRestoreRange(ranges.get(0), 499L, 1500L);
+        });
+    }
+
+    @Test
+    void testImportReadyAfterStageReadyDoesNotCreateDuplicateRanges()
+    {
+        RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient();
+        UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE);
+        short bucketId = 0;
+        CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload(
+        "sliceId-dup", bucketId, "bucket", "key", "checksum",
+        BigInteger.valueOf(1L), BigInteger.valueOf(1500L), 100L, 100L);
+        testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId, 
slicePayload);
+
+        testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+                                    UpdateRestoreJobRequestPayload.builder()
+                                                                  
.withStatus(RestoreJobStatus.STAGE_READY)
+                                                                  .build());
+
+        RestoreRangeDatabaseAccessor rangeAccessor =
+        serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+        loopAssert(10, 500, () -> assertThat(rangeAccessor.findAll(jobId, 
bucketId)).isNotEmpty());
+        int rangeCountAfterStageReady = rangeAccessor.findAll(jobId, 
bucketId).size();
+
+        testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+                                    UpdateRestoreJobRequestPayload.builder()
+                                                                  
.withStatus(RestoreJobStatus.IMPORT_READY)
+                                                                  .build());
+
+        loopAssert(5, 500, () -> assertThat(rangeAccessor.findAll(jobId, 
bucketId))
+                                 .describedAs("IMPORT_READY after STAGE_READY 
should not create duplicate ranges")
+                                 .hasSize(rangeCountAfterStageReady));
+    }
+
+    private RestoreJobTestUtils.RestoreJobClient restoreJobClient()
+    {
+        return RestoreJobTestUtils.client(trustedClient(), "localhost", 
serverWrapper.serverPort);
+    }
+}
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
index d2dcf31e..623e3636 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
@@ -50,6 +50,7 @@ import 
org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
 import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
 import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
@@ -128,14 +129,19 @@ class RestoreJobDiscovererSAIOptionsIntTest extends 
SharedClusterSidecarIntegrat
         RestoreJobDiscoverer restoreJobDiscoverer = 
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
         restoreJobDiscoverer.tryExecuteDiscovery();
 
-        // verify ranges were created and link to the correct job
+        // verify ranges were created and link to the correct job. Wrapped in 
loopAssert because the
+        // STAGE_READY PATCH triggers an asynchronous wake-up 
(CASSSIDECAR-454) on a worker thread; the
+        // wake-up and the synchronous tryExecuteDiscovery above race on 
isExecuting, and ranges may not
+        // be visible immediately after either path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        assertThat(ranges).isNotEmpty();
-        for (RestoreRange range : ranges)
-        {
-            assertThat(range.jobId()).isEqualTo(jobId);
-        }
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            assertThat(ranges).isNotEmpty();
+            for (RestoreRange range : ranges)
+            {
+                assertThat(range.jobId()).isEqualTo(jobId);
+            }
+        });
 
         // Re-read the job after discovery to confirm default importOptions 
are preserved
         RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
@@ -186,14 +192,19 @@ class RestoreJobDiscovererSAIOptionsIntTest extends 
SharedClusterSidecarIntegrat
         RestoreJobDiscoverer restoreJobDiscoverer = 
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
         restoreJobDiscoverer.tryExecuteDiscovery();
 
-        // verify ranges were created by the discoverer and link back to the 
correct job
+        // verify ranges were created by the discoverer and link back to the 
correct job. Wrapped in
+        // loopAssert because the STAGE_READY PATCH triggers an asynchronous 
wake-up (CASSSIDECAR-454)
+        // on a worker thread; the wake-up and the synchronous 
tryExecuteDiscovery above race on
+        // isExecuting, and ranges may not be visible immediately after either 
path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        assertThat(ranges).isNotEmpty();
-        for (RestoreRange range : ranges)
-        {
-            assertThat(range.jobId()).isEqualTo(jobId);
-        }
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            assertThat(ranges).isNotEmpty();
+            for (RestoreRange range : ranges)
+            {
+                assertThat(range.jobId()).isEqualTo(jobId);
+            }
+        });
 
         // Re-read the job after discovery to confirm importOptions are still 
intact.
         RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index e09a0920..98454185 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -121,7 +121,28 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor<RestoreJobsSche
     throws DataObjectMappingException
     {
         sidecarSchema.ensureInitialized();
-        RestoreJob.Builder updateBuilder = RestoreJob.builder();
+        return update(payload, jobId, RestoreJob.builder());
+    }
+
+    /**
+     * Update fields in the restore job and persist. Returns a complete 
updated job
+     * built from {@code existingJob} with the fields from {@code payload} 
applied.
+     *
+     * @param payload fields to be updated
+     * @param existingJob existing restore job to be updated
+     * @return the restore job object built from the existingJob and the 
updated fields
+     * @throws DataObjectMappingException when secrets json cannot be 
serialized
+     */
+    public RestoreJob update(UpdateRestoreJobRequestPayload payload, 
RestoreJob existingJob)
+    throws DataObjectMappingException
+    {
+        sidecarSchema.ensureInitialized();
+        return update(payload, existingJob.jobId, existingJob.unbuild());
+    }
+
+    private RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID 
jobId, RestoreJob.Builder updateBuilder)
+    throws DataObjectMappingException
+    {
         LocalDate createdAt = RestoreJob.toLocalDate(jobId);
         updateBuilder.createdAt(createdAt)
                      .jobId(jobId);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
index fd84fd01..c217512c 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.sidecar.handlers.AbstractHandler;
 import org.apache.cassandra.sidecar.handlers.AccessProtected;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.server.RestoreMetrics;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
 import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -59,17 +60,20 @@ import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio
 public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobRequestPayload> implements AccessProtected
 {
     private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
+    private final RestoreJobDiscoverer restoreJobDiscoverer;
     private final RestoreMetrics metrics;
 
     @Inject
     public UpdateRestoreJobHandler(ExecutorPools executorPools,
                                    InstanceMetadataFetcher 
instanceMetadataFetcher,
                                    RestoreJobDatabaseAccessor 
restoreJobDatabaseAccessor,
+                                   RestoreJobDiscoverer restoreJobDiscoverer,
                                    CassandraInputValidator validator,
                                    SidecarMetrics metrics)
     {
         super(instanceMetadataFetcher, executorPools, validator);
         this.restoreJobDatabaseAccessor = restoreJobDatabaseAccessor;
+        this.restoreJobDiscoverer = restoreJobDiscoverer;
         this.metrics = metrics.server().restore();
     }
 
@@ -88,46 +92,49 @@ public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobReq
     {
         RoutingContextUtils
         .getAsFuture(context, SC_RESTORE_JOB)
-        .compose(job -> {
-            if (job.status.isFinal())
+        .compose(existingJob -> {
+            if (existingJob.status.isFinal())
             {
                 // skip the update, since the job is in the final state already
-                logger.debug("The job has completed already. job={}", job);
+                logger.debug("The job has completed already. job={}", 
existingJob);
                 return 
Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT,
-                                                             "Job is already 
in final state: " + job.status));
+                                                             "Job is already 
in final state: " + existingJob.status));
             }
 
             // IAM jobs derive credentials from the instance profile / task 
role at runtime.
             // There are no static credentials to supply or rotate, so any 
attempt to update
             // credentials on an IAM job is always invalid.
-            if (job.credentialType == CredentialType.IAM && 
requestPayload.secrets() != null)
+            if (existingJob.credentialType == CredentialType.IAM && 
requestPayload.secrets() != null)
             {
-                logger.warn("Credential update rejected for IAM job. job={}", 
job);
+                logger.warn("Credential update rejected for IAM job. job={}", 
existingJob);
                 return 
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
                                                              "Credentials 
cannot be updated for IAM jobs"));
             }
 
             return executorPools.service()
-                                .executeBlocking(() -> 
restoreJobDatabaseAccessor.update(requestPayload, job.jobId));
+                                .executeBlocking(() -> 
restoreJobDatabaseAccessor.update(requestPayload, existingJob));
         })
-        .onSuccess(job -> {
+        .onSuccess(updatedJob -> {
             logger.info("Successfully updated restore job. job={}, request={}, 
remoteAddress={}, instance={}",
-                        job, requestPayload, remoteAddress, host);
-            if (job.status == RestoreJobStatus.SUCCEEDED)
+                        updatedJob, requestPayload, remoteAddress, host);
+            if (updatedJob.status == RestoreJobStatus.SUCCEEDED)
             {
                 metrics.successfulJobs.metric.update(1);
-                long startMillis = UUIDs.unixTimestamp(job.jobId);
+                long startMillis = UUIDs.unixTimestamp(updatedJob.jobId);
                 long durationMillis = System.currentTimeMillis() - startMillis;
                 // toNanos does not overflow. Nanos in `long` can at most 
represent 106,751 days.
                 metrics.jobCompletionTime.metric.update(durationMillis, 
TimeUnit.MILLISECONDS);
             }
 
-            if (job.secrets != null)
+            if (updatedJob.secrets != null)
             {
                 metrics.tokenRefreshed.metric.update(1);
             }
 
             
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+            // Fire-and-forget on the internal worker pool — notifying the 
restore system is internal work
+            // that shouldn't block the event loop or share the service pool 
with client-facing requests.
+            executorPools.internal().runBlocking(() -> 
notifyPhaseSignalMaybe(updatedJob));
         })
         .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, requestPayload));
     }
@@ -158,4 +165,22 @@ public class UpdateRestoreJobHandler extends 
AbstractHandler<UpdateRestoreJobReq
             throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid 
request payload", decodeException);
         }
     }
+
+    private void notifyPhaseSignalMaybe(RestoreJob updatedJob)
+    {
+        if (updatedJob.status != RestoreJobStatus.IMPORT_READY && 
updatedJob.status != RestoreJobStatus.STAGE_READY)
+        {
+            return;
+        }
+
+        try
+        {
+            restoreJobDiscoverer.processJobNow(updatedJob);
+        }
+        catch (Exception e)
+        {
+            logger.warn("Failed to immediately process phase signal. " +
+                        "The discovery loop will pick it up on the next cycle. 
jobId={}", updatedJob.jobId, e);
+        }
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index a720ad35..73c6cabe 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -628,6 +628,43 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
         }
     }
 
+    /**
+     * Immediately processes a restore job without waiting for the next 
discovery loop iteration.
+     * Called by UpdateRestoreJobHandler after a phase signal (STAGE_READY) is 
written to DB.
+     * This is safe to call concurrently with the discovery loop — the DB 
write is the durable
+     * source of truth, and duplicate processing is deduplicated by existing 
idempotency checks.
+     *
+     * <p>The body is gated by the same {@link #isExecuting} CAS used by
+     * {@link #tryExecuteDiscovery()} so the discovery loop and the wake-up 
path are mutually
+     * exclusive. The CAS provides the happens-before edge that makes the 
otherwise non-thread-safe
+     * {@link JobIdsByDay} state safe across both entry points. If a slow-loop 
pass is already
+     * running when a wake-up arrives, the wake-up is skipped — the slow loop 
or the next wake-up
+     * will pick the transition up.
+     *
+     * @param restoreJob the restore job to process immediately
+     */
+    public void processJobNow(RestoreJob restoreJob)
+    {
+        // Resolve local DC outside the CAS gate; it can block on JMX/CQL and 
the assignment itself is atomic.
+        initLocalDatacenterMaybe();
+        if (!isExecuting.compareAndSet(false, true))
+        {
+            LOGGER.debug("Another thread is executing the restore job 
discovery already. Skipping wake-up. jobId={}",
+                         restoreJob.jobId);
+            return;
+        }
+        try
+        {
+            RestoreJobManagerGroup restoreJobManagers = 
restoreJobManagerGroupSingleton.get();
+            restoreJobManagers.updateRestoreJob(restoreJob);
+            processSidecarManagedJobMaybe(restoreJob);
+        }
+        finally
+        {
+            isExecuting.set(false);
+        }
+    }
+
     @VisibleForTesting
     boolean hasInflightJobs()
     {
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
index f5e91725..89f37809 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
@@ -111,13 +111,18 @@ class RestoreJobDiscovererNodeJoinedIntTest extends 
IntegrationTestBase
                                              new TokenRange(1000, 1500)))
         .doesNotContainKey(NODE_JOINED);
 
-        // assert that no restore ranges are create
+        // assert that the expected restore ranges are created.
+        // Wrapped in loopAssert because the STAGE_READY PATCH triggers an 
asynchronous wake-up
+        // (CASSSIDECAR-454) that races on isExecuting with the synchronous 
tryExecuteDiscovery above;
+        // ranges may not be visible immediately after either path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
-        assertThat(ranges).hasSize(2);
-        assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0, 
1000));
-        assertThat(ranges.get(1).tokenRange()).isEqualTo(new TokenRange(1000, 
1500));
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
+            assertThat(ranges).hasSize(2);
+            assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0, 
1000));
+            assertThat(ranges.get(1).tokenRange()).isEqualTo(new 
TokenRange(1000, 1500));
+        });
 
         // Decommission
         IInstance seed = cluster.get(1);
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
index a916ce3b..99844097 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
@@ -121,11 +121,16 @@ class RestoreJobDiscovererNodeJoiningIntTest extends 
IntegrationTestBase
                                        new TokenRange(0, 1000),
                                        new TokenRange(1500, Long.MAX_VALUE)));
 
-        // assert that no restore ranges are create
+        // assert that the expected restore range is created.
+        // Wrapped in loopAssert because the STAGE_READY PATCH triggers an 
asynchronous wake-up
+        // (CASSSIDECAR-454) that races on isExecuting with the synchronous 
tryExecuteDiscovery above;
+        // ranges may not be visible immediately after either path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        assertThat(ranges).hasSize(1);
-        assertRestoreRange(ranges.get(0), 1500L, 1600L);
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            assertThat(ranges).hasSize(1);
+            assertRestoreRange(ranges.get(0), 1500L, 1600L);
+        });
 
         // start move in the background
         IInstance seed = cluster.get(1);
@@ -164,7 +169,7 @@ class RestoreJobDiscovererNodeJoiningIntTest extends 
IntegrationTestBase
             assertThat(restoreRanges)
             .describedAs("Local token ranges are effectively the same. 
Therefore restore ranges do not change")
             .hasSize(1);
-            assertRestoreRange(ranges.get(0), 1500L, 1600L);
+            assertRestoreRange(restoreRanges.get(0), 1500L, 1600L);
         });
     }
 
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
index 8b276947..81e78981 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
@@ -113,13 +113,18 @@ class RestoreJobDiscovererNodeLeftIntTest extends 
IntegrationTestBase
                                                new TokenRange(0, 1000),
                                                new TokenRange(2000, 
Long.MAX_VALUE)));
 
-        // assert that no restore ranges are create
+        // assert that the expected restore ranges are created.
+        // Wrapped in loopAssert because the STAGE_READY PATCH triggers an 
asynchronous wake-up
+        // (CASSSIDECAR-454) that races on isExecuting with the synchronous 
tryExecuteDiscovery above;
+        // ranges may not be visible immediately after either path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
-        assertThat(ranges).hasSize(2);
-        assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0, 
1000)); // node 2
-        assertThat(ranges.get(1).tokenRange()).isEqualTo(new TokenRange(1000, 
1600)); // node 1
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
+            assertThat(ranges).hasSize(2);
+            assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0, 
1000)); // node 2
+            assertThat(ranges.get(1).tokenRange()).isEqualTo(new 
TokenRange(1000, 1600)); // node 1
+        });
 
         // Decommission
         IInstance node = cluster.get(NODE_LEFT);
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
index 6cd8c55c..d0b5eb5c 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
@@ -120,13 +120,18 @@ class RestoreJobDiscovererNodeMovedIntTest extends 
IntegrationTestBase
                                                new TokenRange(0, 1000),
                                                new TokenRange(2000, 
Long.MAX_VALUE)));
 
-        // assert that no restore ranges are create
+        // assert that the expected restore range is created.
+        // Wrapped in loopAssert because the STAGE_READY PATCH triggers an 
asynchronous wake-up
+        // (CASSSIDECAR-454) that races on isExecuting with the synchronous 
tryExecuteDiscovery above;
+        // ranges may not be visible immediately after either path returns.
         RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
injector.getInstance(RestoreRangeDatabaseAccessor.class);
-        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
-        assertThat(ranges)
-        .describedAs("node 1 and only node 1 should create the restore range")
-        .hasSize(1);
-        assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(1000, 
1600));
+        loopAssert(10, 500, () -> {
+            List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+            assertThat(ranges)
+            .describedAs("node 1 and only node 1 should create the restore 
range")
+            .hasSize(1);
+            assertThat(ranges.get(0).tokenRange()).isEqualTo(new 
TokenRange(1000, 1600));
+        });
 
         // Move token
         IInstance movingNode = cluster.get(2);
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
index cb79bb43..67a67e31 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
@@ -36,6 +36,7 @@ import com.codahale.metrics.SharedMetricRegistries;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
@@ -73,12 +74,14 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
 import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobDatabaseAccessor;
+import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobDiscoverer;
 import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobManagerGroup;
 import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreRangeDatabaseAccessor;
 import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreSliceDatabaseAccessor;
 import 
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRingTopologyRefresher;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
 import org.apache.cassandra.sidecar.restore.RestoreJobManagerGroup;
 import org.apache.cassandra.sidecar.restore.RestoreJobProgressTracker;
 import org.apache.cassandra.sidecar.restore.RestoreProcessor;
@@ -105,6 +108,7 @@ public abstract class BaseRestoreJobTests
     protected TestRestoreSliceDatabaseAccessor testRestoreSlices;
     protected TestRestoreRangeDatabaseAccessor testRestoreRanges;
     protected TestRestoreJobManagerGroup testRestoreJobManagerGroup;
+    protected TestRestoreJobDiscoverer testRestoreJobDiscoverer;
     protected TestRingTopologyRefresher testRingTopologyRefresher;
 
     @BeforeEach
@@ -123,6 +127,7 @@ public abstract class BaseRestoreJobTests
         testRestoreSlices = (TestRestoreSliceDatabaseAccessor) 
injector.getInstance(RestoreSliceDatabaseAccessor.class);
         testRestoreRanges = (TestRestoreRangeDatabaseAccessor) 
injector.getInstance(RestoreRangeDatabaseAccessor.class);
         testRestoreJobManagerGroup = (TestRestoreJobManagerGroup) 
injector.getInstance(RestoreJobManagerGroup.class);
+        testRestoreJobDiscoverer = (TestRestoreJobDiscoverer) 
injector.getInstance(RestoreJobDiscoverer.class);
         testRingTopologyRefresher = (TestRingTopologyRefresher) 
injector.getInstance(RingTopologyRefresher.class);
         server.start()
               .onSuccess(s -> context.completeNow())
@@ -266,6 +271,13 @@ public abstract class BaseRestoreJobTests
                 return updateFunc.apply(payload);
             }
 
+            @Override
+            public RestoreJob update(UpdateRestoreJobRequestPayload payload,
+                                     RestoreJob existingJob)
+            {
+                return updateFunc.apply(payload);
+            }
+
             @Override
             public void abort(UUID jobId, String reason)
             {
@@ -351,6 +363,35 @@ public abstract class BaseRestoreJobTests
             }
         }
 
+        static class TestRestoreJobDiscoverer extends RestoreJobDiscoverer
+        {
+            Consumer<RestoreJob> processJobNowCallback;
+
+            public TestRestoreJobDiscoverer(SidecarConfiguration config,
+                                            SidecarSchema sidecarSchema,
+                                            RestoreJobDatabaseAccessor 
restoreJobDatabaseAccessor,
+                                            RestoreSliceDatabaseAccessor 
restoreSliceDatabaseAccessor,
+                                            RestoreRangeDatabaseAccessor 
restoreRangeDatabaseAccessor,
+                                            Provider<RestoreJobManagerGroup> 
restoreJobManagerGroupProvider,
+                                            InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                            RingTopologyRefresher 
ringTopologyRefresher,
+                                            SidecarMetrics metrics)
+            {
+                super(config, sidecarSchema, restoreJobDatabaseAccessor, 
restoreSliceDatabaseAccessor,
+                      restoreRangeDatabaseAccessor, 
restoreJobManagerGroupProvider, instanceMetadataFetcher,
+                      ringTopologyRefresher, metrics);
+            }
+
+            @Override
+            public void processJobNow(RestoreJob restoreJob)
+            {
+                if (processJobNowCallback != null)
+                {
+                    processJobNowCallback.accept(restoreJob);
+                }
+            }
+        }
+
         static class TestRingTopologyRefresher extends RingTopologyRefresher
         {
             Supplier<TokenRangeReplicasResponse> topologySupplier;
@@ -402,6 +443,24 @@ public abstract class BaseRestoreJobTests
                                                   restoreProcessor);
         }
 
+        @Provides
+        @Singleton
+        public RestoreJobDiscoverer restoreJobDiscoverer(SidecarConfiguration 
config,
+                                                         SidecarSchema 
sidecarSchema,
+                                                         
RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
+                                                         
RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor,
+                                                         
RestoreRangeDatabaseAccessor restoreRangeDatabaseAccessor,
+                                                         
Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider,
+                                                         
InstanceMetadataFetcher instanceMetadataFetcher,
+                                                         RingTopologyRefresher 
ringTopologyRefresher,
+                                                         SidecarMetrics 
metrics)
+        {
+            return new TestRestoreJobDiscoverer(config, sidecarSchema, 
restoreJobDatabaseAccessor,
+                                                restoreSliceDatabaseAccessor, 
restoreRangeDatabaseAccessor,
+                                                
restoreJobManagerGroupProvider, instanceMetadataFetcher,
+                                                ringTopologyRefresher, 
metrics);
+        }
+
         @Provides
         @Singleton
         public RingTopologyRefresher 
ringTopologyRefresher(InstanceMetadataFetcher metadataFetcher,
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
index 229a3144..6ace186b 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.sidecar.handlers.restore;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -200,6 +202,36 @@ class UpdateRestoreJobHandlerTest extends 
BaseRestoreJobTests
                                              payload, context, 
HttpResponseStatus.OK.code());
     }
 
+    @Test
+    void testUpdateToImportReadyNotifiesDiscoverer(VertxTestContext context) 
throws Throwable
+    {
+        String jobId = "8e5799a4-d277-11ed-8d85-6916bb9b8056";
+        CountDownLatch latch = new CountDownLatch(1);
+        testRestoreJobDiscoverer.processJobNowCallback = job -> 
latch.countDown();
+        mockLookupRestoreJob(id -> createTestJobWithStatus(jobId, 
RestoreJobStatus.IMPORT_READY));
+        mockUpdateRestoreJob(payload -> createTestJobWithStatus(jobId, 
RestoreJobStatus.IMPORT_READY));
+        JsonObject payload = new JsonObject();
+        payload.put("status", "IMPORT_READY");
+        sendUpdateRestoreJobRequestAndVerify("ks", "table", jobId,
+                                             payload, context, 
HttpResponseStatus.OK.code());
+        assertThat(Uninterruptibles.awaitUninterruptibly(latch, 5, 
TimeUnit.SECONDS)).isTrue();
+    }
+
+    @Test
+    void testUpdateToStageReadyNotifiesDiscoverer(VertxTestContext context) 
throws Throwable
+    {
+        String jobId = "8e5799a4-d277-11ed-8d85-6916bb9b8056";
+        CountDownLatch latch = new CountDownLatch(1);
+        testRestoreJobDiscoverer.processJobNowCallback = job -> 
latch.countDown();
+        mockLookupRestoreJob(id -> createTestJobWithStatus(jobId, 
RestoreJobStatus.STAGE_READY));
+        mockUpdateRestoreJob(payload -> createTestJobWithStatus(jobId, 
RestoreJobStatus.STAGE_READY));
+        JsonObject payload = new JsonObject();
+        payload.put("status", "STAGE_READY");
+        sendUpdateRestoreJobRequestAndVerify("ks", "table", jobId,
+                                             payload, context, 
HttpResponseStatus.OK.code());
+        assertThat(Uninterruptibles.awaitUninterruptibly(latch, 5, 
TimeUnit.SECONDS)).isTrue();
+    }
+
     private RestoreJob createTestNewJob(String jobId)
     {
         return RestoreJob.builder()
@@ -212,6 +244,18 @@ class UpdateRestoreJobHandlerTest extends 
BaseRestoreJobTests
                          .build();
     }
 
+    private RestoreJob createTestJobWithStatus(String jobId, RestoreJobStatus 
status)
+    {
+        return RestoreJob.builder()
+                         .jobId(UUID.fromString(jobId))
+                         .keyspace("ks").table("table")
+                         .jobAgent("agent")
+                         .jobStatus(status)
+                         .jobSecrets(SECRETS)
+                         .sstableImportOptions(SSTableImportOptions.defaults())
+                         .build();
+    }
+
     private JsonObject getRequestPayload()
     {
         JsonObject payload = new JsonObject();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to