This is an automated email from the ASF dual-hosted git repository.
sarankk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69638cff CASSSIDECAR-454: Sidecar should wake up immediately on the
instance receiving a phase signal instead of waiting for the discovery loop
(#346)
69638cff is described below
commit 69638cffdb06b756e8faedb1ec00ebee528d1933
Author: Mansi Khara <[email protected]>
AuthorDate: Fri May 29 15:10:39 2026 -0700
CASSSIDECAR-454: Sidecar should wake up immediately on the instance
receiving a phase signal instead of waiting for the discovery loop (#346)
Patch by Mansi Khara; reviewed by Yifan Cai, Saranya Krishnakumar for
CASSSIDECAR-454
---
CHANGES.txt | 1 +
.../RestoreJobDiscovererPhaseSignalIntTest.java | 165 +++++++++++++++++++++
.../RestoreJobDiscovererSAIOptionsIntTest.java | 39 +++--
.../sidecar/db/RestoreJobDatabaseAccessor.java | 23 ++-
.../handlers/restore/UpdateRestoreJobHandler.java | 49 ++++--
.../sidecar/restore/RestoreJobDiscoverer.java | 37 +++++
.../RestoreJobDiscovererNodeJoinedIntTest.java | 17 ++-
.../RestoreJobDiscovererNodeJoiningIntTest.java | 15 +-
.../RestoreJobDiscovererNodeLeftIntTest.java | 17 ++-
.../RestoreJobDiscovererNodeMovedIntTest.java | 17 ++-
.../handlers/restore/BaseRestoreJobTests.java | 59 ++++++++
.../restore/UpdateRestoreJobHandlerTest.java | 44 ++++++
12 files changed, 433 insertions(+), 50 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bdca46e1..589d6547 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Sidecar should wake up immediately on the instance receiving a phase signal
instead of waiting for the discovery loop (CASSSIDECAR-454)
* Fix schema initialization race, CI test reporting, and build stability
(CASSSIDECAR-468)
* Add FileBasedConfigurationProvider for file-based overlay persistence
(CASSSIDECAR-426)
* Avoid blocking the event loop in CdcPublisher event-bus handler
(CASSSIDECAR-452)
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
new file mode 100644
index 00000000..c335bc68
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.inject.AbstractModule;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
+import
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.db.RestoreRange;
+import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.assertRestoreRange;
+import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
+import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Verifies that {@code STAGE_READY} / {@code IMPORT_READY} phase signals
delivered via
+ * the {@code UpdateRestoreJobHandler} REST endpoint trigger immediate
processing on the
+ * receiving Sidecar instance, without waiting for the next discovery loop
cycle.
+ *
+ * <p>Both discovery loop delays are pinned to 1h via {@link
#configurationOverrides()},
+ * so any successful observation of restore ranges within seconds can only be
the work of
+ * the wake-up path added by CASSSIDECAR-454 — not the periodic discovery loop.
+ */
+class RestoreJobDiscovererPhaseSignalIntTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ private static final QualifiedName USER_KEYSPACE_TABLE = new
QualifiedName("restore_phase_signal_ks", "t");
+ private static final QualifiedTableName SIDECAR_QUALIFIED_TABLE =
+ new QualifiedTableName(USER_KEYSPACE_TABLE.keyspace(),
USER_KEYSPACE_TABLE.table());
+
+ @Override
+ protected Function<SidecarConfigurationImpl.Builder,
SidecarConfigurationImpl.Builder> configurationOverrides()
+ {
+ return builder -> builder.restoreJobConfiguration(
+ RestoreJobConfigurationImpl.builder()
+ // Pin both discovery loops to 1h. Any
range created within seconds
+ // is necessarily the wake-up path and not
the discovery loop.
+
.jobDiscoveryActiveLoopDelay(MillisecondBoundConfiguration.parse("1h"))
+
.jobDiscoveryIdleLoopDelay(MillisecondBoundConfiguration.parse("1h"))
+ .build());
+ }
+
+ @Override
+ protected void startSidecar(ICluster<? extends IInstance> cluster) throws
InterruptedException
+ {
+ // Disable the RestoreProcessor so range submission stops at the
database write,
+ // letting the test assert on RestoreRangeDatabaseAccessor without
S3/import side effects.
+ serverWrapper = startSidecarWithInstances(cluster, (AbstractModule)
disableRestoreProcessor());
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(USER_KEYSPACE_TABLE, Map.of("datacenter1", 1));
+ createTestTable(USER_KEYSPACE_TABLE, "CREATE TABLE %s (id text PRIMARY
KEY, name text);");
+ }
+
+ @Override
+ protected void beforeTestStart()
+ {
+ waitForSchemaReady(30, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testStageReadyImmediatelySubmitsSlices()
+ {
+ RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient();
+ UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE);
+ short bucketId = 0;
+ CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload(
+ "sliceId-stage", bucketId, "bucket", "key", "checksum",
+ BigInteger.valueOf(500L), BigInteger.valueOf(1500L), 100L, 100L);
+ testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId,
slicePayload);
+
+ RestoreRangeDatabaseAccessor rangeAccessor =
+ serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+ assertThat(rangeAccessor.findAll(jobId, bucketId)).isEmpty();
+
+ testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+ UpdateRestoreJobRequestPayload.builder()
+
.withStatus(RestoreJobStatus.STAGE_READY)
+ .build());
+
+ // Single-node cluster owns the entire ring, so the slice (500, 1500]
is not trimmed.
+ // Ranges are stored with exclusive start, so the slice's start of 500
surfaces as 499.
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeAccessor.findAll(jobId, bucketId);
+ assertThat(ranges)
+ .describedAs("STAGE_READY should immediately create restore ranges
via the wake-up path")
+ .hasSize(1);
+ assertRestoreRange(ranges.get(0), 499L, 1500L);
+ });
+ }
+
+ @Test
+ void testImportReadyAfterStageReadyDoesNotCreateDuplicateRanges()
+ {
+ RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient();
+ UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE);
+ short bucketId = 0;
+ CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload(
+ "sliceId-dup", bucketId, "bucket", "key", "checksum",
+ BigInteger.valueOf(1L), BigInteger.valueOf(1500L), 100L, 100L);
+ testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId,
slicePayload);
+
+ testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+ UpdateRestoreJobRequestPayload.builder()
+
.withStatus(RestoreJobStatus.STAGE_READY)
+ .build());
+
+ RestoreRangeDatabaseAccessor rangeAccessor =
+ serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+ loopAssert(10, 500, () -> assertThat(rangeAccessor.findAll(jobId,
bucketId)).isNotEmpty());
+ int rangeCountAfterStageReady = rangeAccessor.findAll(jobId,
bucketId).size();
+
+ testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId,
+ UpdateRestoreJobRequestPayload.builder()
+
.withStatus(RestoreJobStatus.IMPORT_READY)
+ .build());
+
+ loopAssert(5, 500, () -> assertThat(rangeAccessor.findAll(jobId,
bucketId))
+ .describedAs("IMPORT_READY after STAGE_READY
should not create duplicate ranges")
+ .hasSize(rangeCountAfterStageReady));
+ }
+
+ private RestoreJobTestUtils.RestoreJobClient restoreJobClient()
+ {
+ return RestoreJobTestUtils.client(trustedClient(), "localhost",
serverWrapper.serverPort);
+ }
+}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
index d2dcf31e..623e3636 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
@@ -50,6 +50,7 @@ import
org.apache.cassandra.testing.ClusterBuilderConfiguration;
import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
@@ -128,14 +129,19 @@ class RestoreJobDiscovererSAIOptionsIntTest extends
SharedClusterSidecarIntegrat
RestoreJobDiscoverer restoreJobDiscoverer =
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
restoreJobDiscoverer.tryExecuteDiscovery();
- // verify ranges were created and link to the correct job
+ // verify ranges were created and link to the correct job. Wrapped in
loopAssert because the
+ // STAGE_READY PATCH triggers an asynchronous wake-up
(CASSSIDECAR-454) on a worker thread; the
+ // wake-up and the synchronous tryExecuteDiscovery above race on
isExecuting, and ranges may not
+ // be visible immediately after either path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- assertThat(ranges).isNotEmpty();
- for (RestoreRange range : ranges)
- {
- assertThat(range.jobId()).isEqualTo(jobId);
- }
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges).isNotEmpty();
+ for (RestoreRange range : ranges)
+ {
+ assertThat(range.jobId()).isEqualTo(jobId);
+ }
+ });
// Re-read the job after discovery to confirm default importOptions
are preserved
RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
@@ -186,14 +192,19 @@ class RestoreJobDiscovererSAIOptionsIntTest extends
SharedClusterSidecarIntegrat
RestoreJobDiscoverer restoreJobDiscoverer =
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
restoreJobDiscoverer.tryExecuteDiscovery();
- // verify ranges were created by the discoverer and link back to the
correct job
+ // verify ranges were created by the discoverer and link back to the
correct job. Wrapped in
+ // loopAssert because the STAGE_READY PATCH triggers an asynchronous
wake-up (CASSSIDECAR-454)
+ // on a worker thread; the wake-up and the synchronous
tryExecuteDiscovery above race on
+ // isExecuting, and ranges may not be visible immediately after either
path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- assertThat(ranges).isNotEmpty();
- for (RestoreRange range : ranges)
- {
- assertThat(range.jobId()).isEqualTo(jobId);
- }
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges).isNotEmpty();
+ for (RestoreRange range : ranges)
+ {
+ assertThat(range.jobId()).isEqualTo(jobId);
+ }
+ });
// Re-read the job after discovery to confirm importOptions are still
intact.
RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index e09a0920..98454185 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -121,7 +121,28 @@ public class RestoreJobDatabaseAccessor extends
DatabaseAccessor<RestoreJobsSche
throws DataObjectMappingException
{
sidecarSchema.ensureInitialized();
- RestoreJob.Builder updateBuilder = RestoreJob.builder();
+ return update(payload, jobId, RestoreJob.builder());
+ }
+
+ /**
+ * Update fields in the restore job and persist. Returns a complete
updated job
+ * built from {@code existingJob} with the fields from {@code payload}
applied.
+ *
+ * @param payload fields to be updated
+ * @param existingJob existing restore job to be updated
+ * @return the restore job object built from the existingJob and the
updated fields
+ * @throws DataObjectMappingException when secrets json cannot be
serialized
+ */
+ public RestoreJob update(UpdateRestoreJobRequestPayload payload,
RestoreJob existingJob)
+ throws DataObjectMappingException
+ {
+ sidecarSchema.ensureInitialized();
+ return update(payload, existingJob.jobId, existingJob.unbuild());
+ }
+
+ private RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID
jobId, RestoreJob.Builder updateBuilder)
+ throws DataObjectMappingException
+ {
LocalDate createdAt = RestoreJob.toLocalDate(jobId);
updateBuilder.createdAt(createdAt)
.jobId(jobId);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
index fd84fd01..c217512c 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.sidecar.handlers.AbstractHandler;
import org.apache.cassandra.sidecar.handlers.AccessProtected;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.server.RestoreMetrics;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -59,17 +60,20 @@ import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio
public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobRequestPayload> implements AccessProtected
{
private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
+ private final RestoreJobDiscoverer restoreJobDiscoverer;
private final RestoreMetrics metrics;
@Inject
public UpdateRestoreJobHandler(ExecutorPools executorPools,
InstanceMetadataFetcher
instanceMetadataFetcher,
RestoreJobDatabaseAccessor
restoreJobDatabaseAccessor,
+ RestoreJobDiscoverer restoreJobDiscoverer,
CassandraInputValidator validator,
SidecarMetrics metrics)
{
super(instanceMetadataFetcher, executorPools, validator);
this.restoreJobDatabaseAccessor = restoreJobDatabaseAccessor;
+ this.restoreJobDiscoverer = restoreJobDiscoverer;
this.metrics = metrics.server().restore();
}
@@ -88,46 +92,49 @@ public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobReq
{
RoutingContextUtils
.getAsFuture(context, SC_RESTORE_JOB)
- .compose(job -> {
- if (job.status.isFinal())
+ .compose(existingJob -> {
+ if (existingJob.status.isFinal())
{
// skip the update, since the job is in the final state already
- logger.debug("The job has completed already. job={}", job);
+ logger.debug("The job has completed already. job={}",
existingJob);
return
Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT,
- "Job is already
in final state: " + job.status));
+ "Job is already
in final state: " + existingJob.status));
}
// IAM jobs derive credentials from the instance profile / task
role at runtime.
// There are no static credentials to supply or rotate, so any
attempt to update
// credentials on an IAM job is always invalid.
- if (job.credentialType == CredentialType.IAM &&
requestPayload.secrets() != null)
+ if (existingJob.credentialType == CredentialType.IAM &&
requestPayload.secrets() != null)
{
- logger.warn("Credential update rejected for IAM job. job={}",
job);
+ logger.warn("Credential update rejected for IAM job. job={}",
existingJob);
return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"Credentials
cannot be updated for IAM jobs"));
}
return executorPools.service()
- .executeBlocking(() ->
restoreJobDatabaseAccessor.update(requestPayload, job.jobId));
+ .executeBlocking(() ->
restoreJobDatabaseAccessor.update(requestPayload, existingJob));
})
- .onSuccess(job -> {
+ .onSuccess(updatedJob -> {
logger.info("Successfully updated restore job. job={}, request={},
remoteAddress={}, instance={}",
- job, requestPayload, remoteAddress, host);
- if (job.status == RestoreJobStatus.SUCCEEDED)
+ updatedJob, requestPayload, remoteAddress, host);
+ if (updatedJob.status == RestoreJobStatus.SUCCEEDED)
{
metrics.successfulJobs.metric.update(1);
- long startMillis = UUIDs.unixTimestamp(job.jobId);
+ long startMillis = UUIDs.unixTimestamp(updatedJob.jobId);
long durationMillis = System.currentTimeMillis() - startMillis;
// toNanos does not overflow. Nanos in `long` can at most
represent 106,751 days.
metrics.jobCompletionTime.metric.update(durationMillis,
TimeUnit.MILLISECONDS);
}
- if (job.secrets != null)
+ if (updatedJob.secrets != null)
{
metrics.tokenRefreshed.metric.update(1);
}
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+ // Fire-and-forget on the internal worker pool — notifying the
restore system is internal work
+ // that shouldn't block the event loop or share the service pool
with client-facing requests.
+ executorPools.internal().runBlocking(() ->
notifyPhaseSignalMaybe(updatedJob));
})
.onFailure(cause -> processFailure(cause, context, host,
remoteAddress, requestPayload));
}
@@ -158,4 +165,22 @@ public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobReq
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid
request payload", decodeException);
}
}
+
+ private void notifyPhaseSignalMaybe(RestoreJob updatedJob)
+ {
+ if (updatedJob.status != RestoreJobStatus.IMPORT_READY &&
updatedJob.status != RestoreJobStatus.STAGE_READY)
+ {
+ return;
+ }
+
+ try
+ {
+ restoreJobDiscoverer.processJobNow(updatedJob);
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to immediately process phase signal. " +
+ "The discovery loop will pick it up on the next cycle.
jobId={}", updatedJob.jobId, e);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index a720ad35..73c6cabe 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -628,6 +628,43 @@ public class RestoreJobDiscoverer implements PeriodicTask,
RingTopologyChangeLis
}
}
+ /**
+ * Immediately processes a restore job without waiting for the next
discovery loop iteration.
+ * Called by UpdateRestoreJobHandler after a phase signal (STAGE_READY) is
written to DB.
+ * This is safe to call concurrently with the discovery loop — the DB
write is the durable
+ * source of truth, and duplicate processing is deduplicated by existing
idempotency checks.
+ *
+ * <p>The body is gated by the same {@link #isExecuting} CAS used by
+ * {@link #tryExecuteDiscovery()} so the discovery loop and the wake-up
path are mutually
+ * exclusive. The CAS provides the happens-before edge that makes the
otherwise non-thread-safe
+ * {@link JobIdsByDay} state safe across both entry points. If a slow-loop
pass is already
+ * running when a wake-up arrives, the wake-up is skipped — the slow loop
or the next wake-up
+ * will pick the transition up.
+ *
+ * @param restoreJob the restore job to process immediately
+ */
+ public void processJobNow(RestoreJob restoreJob)
+ {
+ // Resolve local DC outside the CAS gate; it can block on JMX/CQL and
the assignment itself is atomic.
+ initLocalDatacenterMaybe();
+ if (!isExecuting.compareAndSet(false, true))
+ {
+ LOGGER.debug("Another thread is executing the restore job
discovery already. Skipping wake-up. jobId={}",
+ restoreJob.jobId);
+ return;
+ }
+ try
+ {
+ RestoreJobManagerGroup restoreJobManagers =
restoreJobManagerGroupSingleton.get();
+ restoreJobManagers.updateRestoreJob(restoreJob);
+ processSidecarManagedJobMaybe(restoreJob);
+ }
+ finally
+ {
+ isExecuting.set(false);
+ }
+ }
+
@VisibleForTesting
boolean hasInflightJobs()
{
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
index f5e91725..89f37809 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoinedIntTest.java
@@ -111,13 +111,18 @@ class RestoreJobDiscovererNodeJoinedIntTest extends
IntegrationTestBase
new TokenRange(1000, 1500)))
.doesNotContainKey(NODE_JOINED);
- // assert that no restore ranges are create
+ // assert that the expected restore ranges are created.
+ // Wrapped in loopAssert because the STAGE_READY PATCH triggers an
asynchronous wake-up
+ // (CASSSIDECAR-454) that races on isExecuting with the synchronous
tryExecuteDiscovery above;
+ // ranges may not be visible immediately after either path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
- assertThat(ranges).hasSize(2);
- assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0,
1000));
- assertThat(ranges.get(1).tokenRange()).isEqualTo(new TokenRange(1000,
1500));
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
+ assertThat(ranges).hasSize(2);
+ assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0,
1000));
+ assertThat(ranges.get(1).tokenRange()).isEqualTo(new
TokenRange(1000, 1500));
+ });
// Decommission
IInstance seed = cluster.get(1);
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
index a916ce3b..99844097 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeJoiningIntTest.java
@@ -121,11 +121,16 @@ class RestoreJobDiscovererNodeJoiningIntTest extends
IntegrationTestBase
new TokenRange(0, 1000),
new TokenRange(1500, Long.MAX_VALUE)));
- // assert that no restore ranges are create
+ // assert that the expected restore range is created.
+ // Wrapped in loopAssert because the STAGE_READY PATCH triggers an
asynchronous wake-up
+ // (CASSSIDECAR-454) that races on isExecuting with the synchronous
tryExecuteDiscovery above;
+ // ranges may not be visible immediately after either path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- assertThat(ranges).hasSize(1);
- assertRestoreRange(ranges.get(0), 1500L, 1600L);
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges).hasSize(1);
+ assertRestoreRange(ranges.get(0), 1500L, 1600L);
+ });
// start move in the background
IInstance seed = cluster.get(1);
@@ -164,7 +169,7 @@ class RestoreJobDiscovererNodeJoiningIntTest extends
IntegrationTestBase
assertThat(restoreRanges)
.describedAs("Local token ranges are effectively the same.
Therefore restore ranges do not change")
.hasSize(1);
- assertRestoreRange(ranges.get(0), 1500L, 1600L);
+ assertRestoreRange(restoreRanges.get(0), 1500L, 1600L);
});
}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
index 8b276947..81e78981 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeLeftIntTest.java
@@ -113,13 +113,18 @@ class RestoreJobDiscovererNodeLeftIntTest extends
IntegrationTestBase
new TokenRange(0, 1000),
new TokenRange(2000,
Long.MAX_VALUE)));
- // assert that no restore ranges are create
+ // assert that the expected restore ranges are created.
+ // Wrapped in loopAssert because the STAGE_READY PATCH triggers an
asynchronous wake-up
+ // (CASSSIDECAR-454) that races on isExecuting with the synchronous
tryExecuteDiscovery above;
+ // ranges may not be visible immediately after either path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
- assertThat(ranges).hasSize(2);
- assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0,
1000)); // node 2
- assertThat(ranges.get(1).tokenRange()).isEqualTo(new TokenRange(1000,
1600)); // node 1
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ Collections.sort(ranges, RestoreRange.TOKEN_BASED_NATURAL_ORDER);
+ assertThat(ranges).hasSize(2);
+ assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(0,
1000)); // node 2
+ assertThat(ranges.get(1).tokenRange()).isEqualTo(new
TokenRange(1000, 1600)); // node 1
+ });
// Decommission
IInstance node = cluster.get(NODE_LEFT);
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
index 6cd8c55c..d0b5eb5c 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererNodeMovedIntTest.java
@@ -120,13 +120,18 @@ class RestoreJobDiscovererNodeMovedIntTest extends
IntegrationTestBase
new TokenRange(0, 1000),
new TokenRange(2000,
Long.MAX_VALUE)));
- // assert that no restore ranges are create
+ // assert that the expected restore range is created.
+ // Wrapped in loopAssert because the STAGE_READY PATCH triggers an
asynchronous wake-up
+ // (CASSSIDECAR-454) that races on isExecuting with the synchronous
tryExecuteDiscovery above;
+ // ranges may not be visible immediately after either path returns.
RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
injector.getInstance(RestoreRangeDatabaseAccessor.class);
- List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
- assertThat(ranges)
- .describedAs("node 1 and only node 1 should create the restore range")
- .hasSize(1);
- assertThat(ranges.get(0).tokenRange()).isEqualTo(new TokenRange(1000,
1600));
+ loopAssert(10, 500, () -> {
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges)
+ .describedAs("node 1 and only node 1 should create the restore
range")
+ .hasSize(1);
+ assertThat(ranges.get(0).tokenRange()).isEqualTo(new
TokenRange(1000, 1600));
+ });
// Move token
IInstance movingNode = cluster.get(2);
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
index cb79bb43..67a67e31 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/BaseRestoreJobTests.java
@@ -36,6 +36,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
@@ -73,12 +74,14 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobDatabaseAccessor;
+import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobDiscoverer;
import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreJobManagerGroup;
import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreRangeDatabaseAccessor;
import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRestoreSliceDatabaseAccessor;
import
org.apache.cassandra.sidecar.handlers.restore.BaseRestoreJobTests.TestModuleOverride.TestRingTopologyRefresher;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
import org.apache.cassandra.sidecar.restore.RestoreJobManagerGroup;
import org.apache.cassandra.sidecar.restore.RestoreJobProgressTracker;
import org.apache.cassandra.sidecar.restore.RestoreProcessor;
@@ -105,6 +108,7 @@ public abstract class BaseRestoreJobTests
protected TestRestoreSliceDatabaseAccessor testRestoreSlices;
protected TestRestoreRangeDatabaseAccessor testRestoreRanges;
protected TestRestoreJobManagerGroup testRestoreJobManagerGroup;
+ protected TestRestoreJobDiscoverer testRestoreJobDiscoverer;
protected TestRingTopologyRefresher testRingTopologyRefresher;
@BeforeEach
@@ -123,6 +127,7 @@ public abstract class BaseRestoreJobTests
testRestoreSlices = (TestRestoreSliceDatabaseAccessor)
injector.getInstance(RestoreSliceDatabaseAccessor.class);
testRestoreRanges = (TestRestoreRangeDatabaseAccessor)
injector.getInstance(RestoreRangeDatabaseAccessor.class);
testRestoreJobManagerGroup = (TestRestoreJobManagerGroup)
injector.getInstance(RestoreJobManagerGroup.class);
+ testRestoreJobDiscoverer = (TestRestoreJobDiscoverer)
injector.getInstance(RestoreJobDiscoverer.class);
testRingTopologyRefresher = (TestRingTopologyRefresher)
injector.getInstance(RingTopologyRefresher.class);
server.start()
.onSuccess(s -> context.completeNow())
@@ -266,6 +271,13 @@ public abstract class BaseRestoreJobTests
return updateFunc.apply(payload);
}
+ @Override
+ public RestoreJob update(UpdateRestoreJobRequestPayload payload,
+ RestoreJob existingJob)
+ {
+ return updateFunc.apply(payload);
+ }
+
@Override
public void abort(UUID jobId, String reason)
{
@@ -351,6 +363,35 @@ public abstract class BaseRestoreJobTests
}
}
+ static class TestRestoreJobDiscoverer extends RestoreJobDiscoverer
+ {
+ Consumer<RestoreJob> processJobNowCallback;
+
+ public TestRestoreJobDiscoverer(SidecarConfiguration config,
+ SidecarSchema sidecarSchema,
+ RestoreJobDatabaseAccessor
restoreJobDatabaseAccessor,
+ RestoreSliceDatabaseAccessor
restoreSliceDatabaseAccessor,
+ RestoreRangeDatabaseAccessor
restoreRangeDatabaseAccessor,
+ Provider<RestoreJobManagerGroup>
restoreJobManagerGroupProvider,
+ InstanceMetadataFetcher
instanceMetadataFetcher,
+ RingTopologyRefresher
ringTopologyRefresher,
+ SidecarMetrics metrics)
+ {
+ super(config, sidecarSchema, restoreJobDatabaseAccessor,
restoreSliceDatabaseAccessor,
+ restoreRangeDatabaseAccessor,
restoreJobManagerGroupProvider, instanceMetadataFetcher,
+ ringTopologyRefresher, metrics);
+ }
+
+ @Override
+ public void processJobNow(RestoreJob restoreJob)
+ {
+ if (processJobNowCallback != null)
+ {
+ processJobNowCallback.accept(restoreJob);
+ }
+ }
+ }
+
static class TestRingTopologyRefresher extends RingTopologyRefresher
{
Supplier<TokenRangeReplicasResponse> topologySupplier;
@@ -402,6 +443,24 @@ public abstract class BaseRestoreJobTests
restoreProcessor);
}
+ @Provides
+ @Singleton
+ public RestoreJobDiscoverer restoreJobDiscoverer(SidecarConfiguration
config,
+ SidecarSchema
sidecarSchema,
+
RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
+
RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor,
+
RestoreRangeDatabaseAccessor restoreRangeDatabaseAccessor,
+
Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider,
+
InstanceMetadataFetcher instanceMetadataFetcher,
+ RingTopologyRefresher
ringTopologyRefresher,
+ SidecarMetrics
metrics)
+ {
+ return new TestRestoreJobDiscoverer(config, sidecarSchema,
restoreJobDatabaseAccessor,
+ restoreSliceDatabaseAccessor,
restoreRangeDatabaseAccessor,
+
restoreJobManagerGroupProvider, instanceMetadataFetcher,
+ ringTopologyRefresher,
metrics);
+ }
+
@Provides
@Singleton
public RingTopologyRefresher
ringTopologyRefresher(InstanceMetadataFetcher metadataFetcher,
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
index 229a3144..6ace186b 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandlerTest.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.sidecar.handlers.restore;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -200,6 +202,36 @@ class UpdateRestoreJobHandlerTest extends
BaseRestoreJobTests
payload, context,
HttpResponseStatus.OK.code());
}
+ @Test
+ void testUpdateToImportReadyNotifiesDiscoverer(VertxTestContext context)
throws Throwable
+ {
+ String jobId = "8e5799a4-d277-11ed-8d85-6916bb9b8056";
+ CountDownLatch latch = new CountDownLatch(1);
+ testRestoreJobDiscoverer.processJobNowCallback = job ->
latch.countDown();
+ mockLookupRestoreJob(id -> createTestJobWithStatus(jobId,
RestoreJobStatus.IMPORT_READY));
+ mockUpdateRestoreJob(payload -> createTestJobWithStatus(jobId,
RestoreJobStatus.IMPORT_READY));
+ JsonObject payload = new JsonObject();
+ payload.put("status", "IMPORT_READY");
+ sendUpdateRestoreJobRequestAndVerify("ks", "table", jobId,
+ payload, context,
HttpResponseStatus.OK.code());
+ assertThat(Uninterruptibles.awaitUninterruptibly(latch, 5,
TimeUnit.SECONDS)).isTrue();
+ }
+
+ @Test
+ void testUpdateToStageReadyNotifiesDiscoverer(VertxTestContext context)
throws Throwable
+ {
+ String jobId = "8e5799a4-d277-11ed-8d85-6916bb9b8056";
+ CountDownLatch latch = new CountDownLatch(1);
+ testRestoreJobDiscoverer.processJobNowCallback = job ->
latch.countDown();
+ mockLookupRestoreJob(id -> createTestJobWithStatus(jobId,
RestoreJobStatus.STAGE_READY));
+ mockUpdateRestoreJob(payload -> createTestJobWithStatus(jobId,
RestoreJobStatus.STAGE_READY));
+ JsonObject payload = new JsonObject();
+ payload.put("status", "STAGE_READY");
+ sendUpdateRestoreJobRequestAndVerify("ks", "table", jobId,
+ payload, context,
HttpResponseStatus.OK.code());
+ assertThat(Uninterruptibles.awaitUninterruptibly(latch, 5,
TimeUnit.SECONDS)).isTrue();
+ }
+
private RestoreJob createTestNewJob(String jobId)
{
return RestoreJob.builder()
@@ -212,6 +244,18 @@ class UpdateRestoreJobHandlerTest extends
BaseRestoreJobTests
.build();
}
+ private RestoreJob createTestJobWithStatus(String jobId, RestoreJobStatus
status)
+ {
+ return RestoreJob.builder()
+ .jobId(UUID.fromString(jobId))
+ .keyspace("ks").table("table")
+ .jobAgent("agent")
+ .jobStatus(status)
+ .jobSecrets(SECRETS)
+ .sstableImportOptions(SSTableImportOptions.defaults())
+ .build();
+ }
+
private JsonObject getRequestPayload()
{
JsonObject payload = new JsonObject();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]