This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 4341f1e0 CASSANDRASC-133: Allow optional reason to abort restore jobs (#124) 4341f1e0 is described below commit 4341f1e00f92b9da132e13acd4eb4760f1d89e3f Author: Yifan Cai <52585731+yifa...@users.noreply.github.com> AuthorDate: Fri May 24 18:11:14 2024 -0700 CASSANDRASC-133: Allow optional reason to abort restore jobs (#124) Abort restore job request can include the reason for the operation now optionally. The reason is logged and persisted for the restore job. Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-133 --- CHANGES.txt | 1 + .../sidecar/common/data/RestoreJobConstants.java | 1 + .../common/request/AbortRestoreJobRequest.java | 12 +++- .../data/AbortRestoreJobRequestPayload.java | 74 ++++++++++++++++++++++ .../data/AbortRestoreJobRequestPayloadTest.java | 72 +++++++++++++++++++++ .../cassandra/sidecar/client/SidecarClient.java | 6 +- .../client/SidecarClientBlobRestoreExtension.java | 14 +++- .../apache/cassandra/sidecar/db/RestoreJob.java | 42 ++++++++++-- .../sidecar/db/RestoreJobDatabaseAccessor.java | 12 +++- .../sidecar/restore/RestoreJobDiscoverer.java | 2 +- .../sidecar/restore/RestoreSliceTask.java | 2 +- .../routes/restore/AbortRestoreJobHandler.java | 44 +++++++++---- .../routes/restore/RestoreJobSummaryHandler.java | 2 +- .../cassandra/sidecar/server/MainModule.java | 2 +- .../db/RestoreJobsDatabaseAccessorIntTest.java | 53 +++++++++++++--- .../sidecar/restore/RestoreJobDiscovererTest.java | 3 +- .../routes/restore/AbortRestoreJobHandlerTest.java | 51 ++++++++++++--- .../routes/restore/BaseRestoreJobTests.java | 2 +- 18 files changed, 345 insertions(+), 50 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f53f0766..43de8233 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Allow optional reason to abort restore jobs (CASSANDRASC-133) * Fix SidecarLoadBalancingPolicy unexpectedly removing local node and improve CI stability (CASSANDRASC-131) * Reduce implementations accessible from client (CASSANDRASC-127) * Fix wait time acquired in SidecarRateLimiter (CASSANDRASC-124) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java index d18b0b32..48c82896 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java @@ -33,6 +33,7 @@ public class RestoreJobConstants public static final String JOB_KEYSPACE = "keyspace"; public static final String JOB_TABLE = "table"; public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel"; + public static final String JOB_OPERATION_REASON = "reason"; public static final String SLICE_ID = "sliceId"; public static final String BUCKET_ID = "bucketId"; public static final String SLICE_START_TOKEN = "startToken"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java index c692b180..88fe8351 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/AbortRestoreJobRequest.java @@ -22,12 +22,15 @@ import java.util.UUID; import io.netty.handler.codec.http.HttpMethod; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; /** * Represents a request to abort a restore job */ public class AbortRestoreJobRequest extends Request { + private final AbortRestoreJobRequestPayload requestPayload; + /** * Constructs a Sidecar request with the given {@code requestURI}. Defaults to {@code ssl} enabled. * @@ -35,9 +38,10 @@ public class AbortRestoreJobRequest extends Request * @param table the table name in Cassandra * @param jobId a unique identifier for the job */ - public AbortRestoreJobRequest(String keyspace, String table, UUID jobId) + public AbortRestoreJobRequest(String keyspace, String table, UUID jobId, AbortRestoreJobRequestPayload payload) { super(requestURI(keyspace, table, jobId)); + this.requestPayload = payload; } @Override @@ -46,6 +50,12 @@ public class AbortRestoreJobRequest extends Request return HttpMethod.POST; } + @Override + public Object requestBody() + { + return requestPayload; + } + static String requestURI(String keyspace, String table, UUID jobId) { return ApiEndpointsV1.ABORT_RESTORE_JOB_ROUTE diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java new file mode 100644 index 00000000..664451aa --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayload.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.regex.Pattern; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.jetbrains.annotations.Nullable; + +import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_OPERATION_REASON; + +/** + * Request payload for aborting a restore job. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AbortRestoreJobRequestPayload +{ + private static final Pattern ALPHANUMERIC_BLANK_ONLY = Pattern.compile("^[a-zA-Z0-9 ]*$"); + private final String reason; + + @JsonCreator + public AbortRestoreJobRequestPayload(@Nullable @JsonProperty(JOB_OPERATION_REASON) String reason) + { + this.reason = validateContent(reason); + } + + /** + * @return the reason to abort the job + */ + @JsonProperty(JOB_OPERATION_REASON) + public String reason() + { + return reason; + } + + /** + * As reason string is logged and persisted, the validation is performed to avoid any malicious behavior + * @param reason client-sent string content + * @return the same string if content is good + */ + private String validateContent(String reason) + { + if (reason == null) + { + return null; + } + + Preconditions.checkArgument(reason.length() <= 1024, "Reason string is too long"); + Preconditions.checkArgument(ALPHANUMERIC_BLANK_ONLY.matcher(reason).matches(), + "Reason string cannot contain non-alphanumeric-blank characters"); + return reason; + } +} diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java new file mode 100644 index 00000000..283cdb9a --- /dev/null +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/AbortRestoreJobRequestPayloadTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AbortRestoreJobRequestPayloadTest +{ + private static final ObjectMapper MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + + @Test + void testSerDeser() throws JsonProcessingException + { + AbortRestoreJobRequestPayload payload = new AbortRestoreJobRequestPayload("Expired"); + String json = MAPPER.writeValueAsString(payload); + assertThat(json).isEqualTo("{\"reason\":\"Expired\"}"); + AbortRestoreJobRequestPayload deser = MAPPER.readValue(json, AbortRestoreJobRequestPayload.class); + assertThat(deser.reason()).isEqualTo(payload.reason()); + + AbortRestoreJobRequestPayload nullPayload = new AbortRestoreJobRequestPayload(null); + json = MAPPER.writeValueAsString(nullPayload); + assertThat(json).isEqualTo("{}"); + deser = MAPPER.readValue(json, AbortRestoreJobRequestPayload.class); + assertThat(deser.reason()).isNull(); + } + + @Test + void testValidation() + { + String longString = Stream.generate(() -> "a").limit(2048).collect(Collectors.joining()); + assertThatThrownBy(() -> new AbortRestoreJobRequestPayload(longString)) + .hasMessage("Reason string is too long"); + + String disallowedChars = "! cat /super/secrets"; + assertThatThrownBy(() -> new AbortRestoreJobRequestPayload(disallowedChars)) + .hasMessage("Reason string cannot contain non-alphanumeric-blank characters"); + + assertThatThrownBy(() -> MAPPER.readValue(String.format("{\"reason\":\"%s\"}", longString), + AbortRestoreJobRequestPayload.class)) + .hasMessageContaining("Reason string is too long"); + + assertThatThrownBy(() -> MAPPER.readValue(String.format("{\"reason\":\"%s\"}", disallowedChars), + AbortRestoreJobRequestPayload.class)) + .hasMessageContaining("Reason string cannot contain non-alphanumeric-blank characters"); + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 6f098f9e..3d5c426e 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -41,6 +41,7 @@ import org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest; import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest; import org.apache.cassandra.sidecar.common.request.UpdateRestoreJobRequest; +import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobResponsePayload; import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; @@ -517,10 +518,11 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt * {@inheritDoc} */ @Override - public CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId) + public CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId, + AbortRestoreJobRequestPayload payload) { return executor.executeRequestAsync(requestBuilder() - .request(new AbortRestoreJobRequest(keyspace, table, jobId)) + .request(new AbortRestoreJobRequest(keyspace, table, jobId, payload)) .build()); } diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java index 7ce22e90..7b3aa8f6 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClientBlobRestoreExtension.java @@ -22,6 +22,7 @@ package org.apache.cassandra.sidecar.client; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobResponsePayload; import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; @@ -63,9 +64,20 @@ public interface SidecarClientBlobRestoreExtension * @param keyspace name of the keyspace in the cluster * @param table name of the table in the cluster * @param jobId job ID of the restore job to be updated + * @param payload request payload * @return a completable future */ - CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId); + CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId, + AbortRestoreJobRequestPayload payload); + + /** + * Abort an existing restore job with no reason + * See {@link #abortRestoreJob(String, String, UUID, AbortRestoreJobRequestPayload)} + */ + default CompletableFuture<Void> abortRestoreJob(String keyspace, String table, UUID jobId) + { + return abortRestoreJob(keyspace, table, jobId, null); + } /** * Get the summary of an existing restore job diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java index 475f9575..6f34b5ce 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java @@ -54,6 +54,8 @@ public class RestoreJob public final String consistencyLevel; public final Manager restoreJobManager; + private final String statusText; + public static Builder builder() { return new Builder(); @@ -71,7 +73,7 @@ public class RestoreJob builder.createdAt(row.getDate("created_at")) .jobId(row.getUUID("job_id")).jobAgent(row.getString("job_agent")) .keyspace(row.getString("keyspace_name")).table(row.getString("table_name")) - .jobStatus(decodeJobStatus(row.getString("status"))) + .jobStatusText(row.getString("status")) .jobSecrets(decodeJobSecrets(row.getBytes("blob_secrets"))) .expireAt(row.getTimestamp("expire_at")) .sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options"))) @@ -85,7 +87,13 @@ public class RestoreJob private static RestoreJobStatus decodeJobStatus(String status) { - return status == null ? null : RestoreJobStatus.valueOf(status.toUpperCase()); + if (status == null) + { + return null; + } + + String enumLiteral = status.split(":")[0]; + return RestoreJobStatus.valueOf(enumLiteral.toUpperCase()); } private static RestoreJobSecrets decodeJobSecrets(ByteBuffer secretsBytes) @@ -114,6 +122,7 @@ public class RestoreJob this.tableName = builder.tableName; this.jobAgent = builder.jobAgent; this.status = builder.status; + this.statusText = builder.statusText; this.secrets = builder.secrets; this.importOptions = builder.importOptions == null ? SSTableImportOptions.defaults() @@ -134,6 +143,11 @@ public class RestoreJob return restoreJobManager == Manager.SIDECAR; } + public String statusWithOptionalDescription() + { + return statusText; + } + /** * {@inheritDoc} */ @@ -145,7 +159,7 @@ public class RestoreJob "expireAt='%s', bucketCount='%s', consistencyLevel='%s'}", createdAt.toString(), jobId.toString(), keyspaceName, tableName, - status, secrets, importOptions, + statusText, secrets, importOptions, expireAt, bucketCount, consistencyLevel); } @@ -177,6 +191,7 @@ public class RestoreJob private String tableName; private String jobAgent; private RestoreJobStatus status; + private String statusText; private RestoreJobSecrets secrets; private SSTableImportOptions importOptions; private Date expireAt; @@ -197,6 +212,7 @@ public class RestoreJob this.tableName = restoreJob.tableName; this.jobAgent = restoreJob.jobAgent; this.status = restoreJob.status; + this.statusText = restoreJob.statusText; this.secrets = restoreJob.secrets; this.importOptions = restoreJob.importOptions; this.expireAt = restoreJob.expireAt; @@ -229,9 +245,25 @@ public class RestoreJob return update(b -> b.jobAgent = jobAgent); } - public Builder jobStatus(RestoreJobStatus jobStatus) + public Builder jobStatus(@NotNull RestoreJobStatus jobStatus) + { + return update(b -> { + b.status = jobStatus; + b.statusText = jobStatus.name(); + }); + } + + /** + * Assign the job status; primarily used when loading the restore job from database + * Note that the status text might contain additional description than the status enum + * @param statusText status text read from database + */ + public Builder jobStatusText(String statusText) { - return update(b -> b.status = jobStatus); + return update(b -> { + b.status = decodeJobStatus(statusText); + b.statusText = statusText; + }); } public Builder jobSecrets(RestoreJobSecrets jobSecrets) diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java index 43e5bc6b..19874808 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java @@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.db.schema.RestoreJobsSchema; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.jetbrains.annotations.Nullable; /** * RestoreJobs is the data accessor to Cassandra. @@ -92,7 +93,7 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor job.keyspaceName, job.tableName, job.jobAgent, - job.status.toString(), + job.status.name(), secrets, importOptions, job.consistencyLevel, @@ -162,13 +163,18 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor return updateBuilder.build(); } - public void abort(UUID jobId) + public void abort(UUID jobId, @Nullable String reason) { sidecarSchema.ensureInitialized(); LocalDate createdAt = RestoreJob.toLocalDate(jobId); + String status = RestoreJobStatus.ABORTED.name(); + if (reason != null) + { + status = status + ": " + reason; + } BoundStatement statement = restoreJobsSchema.updateStatus() - .bind(createdAt, jobId, RestoreJobStatus.ABORTED.name()); + .bind(createdAt, jobId, status); execute(statement); } diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index 2f865ff9..c5d007a8 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -288,7 +288,7 @@ public class RestoreJobDiscoverer implements PeriodicTask LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job); try { - restoreJobDatabaseAccessor.abort(job.jobId); + restoreJobDatabaseAccessor.abort(job.jobId, "Expired"); return true; } catch (Exception exception) // do not fail on the job. Continue to drain the entire list diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index 02c1a09c..cddd950d 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -156,7 +156,7 @@ public class RestoreSliceTask implements RestoreSliceHandler else { String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " + - "processing active slices. Found status: " + job.status; + "processing active slices. Found status: " + job.statusWithOptionalDescription(); Exception unexpectedState = new IllegalStateException(msg); return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status", slice, unexpectedState)); diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java index 52a513f0..0f5b8224 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java @@ -21,11 +21,12 @@ package org.apache.cassandra.sidecar.routes.restore; import com.google.inject.Inject; import com.google.inject.Singleton; import io.netty.handler.codec.http.HttpResponseStatus; -import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.Json; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor; import org.apache.cassandra.sidecar.metrics.RestoreMetrics; @@ -35,6 +36,7 @@ import org.apache.cassandra.sidecar.routes.AbstractHandler; import org.apache.cassandra.sidecar.routes.RoutingContextUtils; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.routes.RoutingContextUtils.SC_RESTORE_JOB; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -44,8 +46,10 @@ import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio * {@link org.apache.cassandra.sidecar.db.RestoreJob} */ @Singleton -public class AbortRestoreJobHandler extends AbstractHandler<String> +public class AbortRestoreJobHandler extends AbstractHandler<AbortRestoreJobRequestPayload> { + private static final AbortRestoreJobRequestPayload EMPTY_PAYLOAD = new AbortRestoreJobRequestPayload(null); + private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor; private final RestoreJobManagerGroup restoreJobManagerGroup; private final RestoreMetrics metrics; @@ -69,33 +73,47 @@ public class AbortRestoreJobHandler extends AbstractHandler<String> HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, - String jobId) + AbortRestoreJobRequestPayload payload) { RoutingContextUtils .getAsFuture(context, SC_RESTORE_JOB) - .compose(job -> { + .map(job -> { if (RestoreJobStatus.isFinalState(job.status)) { - return Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT, - "Job is already in final state: " + job.status)); + throw wrapHttpException(HttpResponseStatus.CONFLICT, + "Job is already in final state: " + job.status); } - restoreJobDatabaseAccessor.abort(job.jobId); + restoreJobDatabaseAccessor.abort(job.jobId, payload.reason()); + logger.info("Successfully aborted restore job. job={} remoteAddress={} instance={} reason='{}'", + job, remoteAddress, host, payload.reason()); restoreJobManagerGroup.signalRefreshRestoreJob(); - return Future.succeededFuture(job); + return job; }) .onSuccess(job -> { - logger.info("Successfully aborted restore job. job={}, remoteAddress={}, instance={}", - job, remoteAddress, host); metrics.failedJobs.metric.update(1); context.response().setStatusCode(HttpResponseStatus.OK.code()).end(); }) - .onFailure(cause -> processFailure(cause, context, host, remoteAddress, jobId)); + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, payload)); } + @NotNull @Override - protected String extractParamsOrThrow(RoutingContext context) + protected AbortRestoreJobRequestPayload extractParamsOrThrow(RoutingContext context) { - return context.pathParam("jobId"); + String bodyString = context.body().asString(); // nullable + + try + { + return Json.decodeValue(bodyString, AbortRestoreJobRequestPayload.class); + } + catch (Exception cause) + { + if (bodyString != null) + { + logger.warn("Failed to deserialize json string into AbortRestoreJobRequestPayload", cause); + } + return EMPTY_PAYLOAD; + } } } diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java index a48b314b..c2df4546 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandler.java @@ -82,7 +82,7 @@ public class RestoreJobSummaryHandler extends AbstractHandler<String> RestoreJobSummaryResponsePayload response = new RestoreJobSummaryResponsePayload(restoreJob.createdAt.toString(), restoreJob.jobId, restoreJob.jobAgent, restoreJob.keyspaceName, restoreJob.tableName, - restoreJob.secrets, restoreJob.status.toString()); + restoreJob.secrets, restoreJob.statusWithOptionalDescription()); return Future.succeededFuture(response); }); } diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index 6e7288d2..cdc51271 100644 --- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -321,8 +321,8 @@ public class MainModule extends AbstractModule .handler(validateRestoreJobRequest) .handler(updateRestoreJobHandler); - // we don't expect users to send body for abort requests, hence we don't use BodyHandler router.post(ApiEndpointsV1.ABORT_RESTORE_JOB_ROUTE) + .handler(BodyHandler.create()) .handler(validateTableExistence) .handler(validateRestoreJobRequest) .handler(abortRestoreJobHandler); diff --git a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java index 9fdb1490..4a02d170 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java @@ -42,6 +42,10 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(VertxExtension.class) class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase { + QualifiedTableName qualifiedTableName = new QualifiedTableName("ks", "tbl"); + RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); + long expiresAtMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); + @CassandraIntegrationTest void testCrudOperations() { @@ -53,16 +57,8 @@ class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase awaitLatchOrTimeout(latch, 10, TimeUnit.SECONDS); assertThat(accessor.findAllRecent(3)).isEmpty(); - QualifiedTableName qualifiedTableName = new QualifiedTableName("ks", "tbl"); - RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); - long expiresAtMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); - UUID jobId = UUIDs.timeBased(); - CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) - .jobId(jobId) - .jobAgent("agent") - .build(); - accessor.create(payload, qualifiedTableName); - + // update this job + UUID jobId = createJob(accessor); List<RestoreJob> foundJobs = accessor.findAllRecent(3); assertThat(foundJobs).hasSize(1); assertJob(foundJobs.get(0), jobId, RestoreJobStatus.CREATED, expiresAtMillis, secrets); @@ -71,10 +67,42 @@ class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase = new UpdateRestoreJobRequestPayload(null, null, RestoreJobStatus.SUCCEEDED, null); accessor.update(markSucceeded, jobId); assertJob(accessor.find(jobId), jobId, RestoreJobStatus.SUCCEEDED, expiresAtMillis, secrets); + + // abort this job with reason + jobId = createJob(accessor); + foundJobs = accessor.findAllRecent(3); + assertThat(foundJobs).hasSize(2); + accessor.abort(jobId, "Reason"); + assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, expiresAtMillis, secrets, "Reason"); + + // abort this job w/o reason + jobId = createJob(accessor); + foundJobs = accessor.findAllRecent(3); + assertThat(foundJobs).hasSize(3); + accessor.abort(jobId, null); + assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, expiresAtMillis, secrets, null); + } + + private UUID createJob(RestoreJobDatabaseAccessor accessor) + { + UUID jobId = UUIDs.timeBased(); + CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) + .jobId(jobId) + .jobAgent("agent") + .build(); + accessor.create(payload, qualifiedTableName); + + return jobId; } private void assertJob(RestoreJob job, UUID jobId, RestoreJobStatus status, long expiresAtMillis, RestoreJobSecrets secrets) + { + assertJob(job, jobId, status, expiresAtMillis, secrets, null); + } + + private void assertJob(RestoreJob job, UUID jobId, RestoreJobStatus status, long expiresAtMillis, + RestoreJobSecrets secrets, String abortReason) { assertThat(job).isNotNull(); assertThat(job.jobId).isEqualTo(jobId); @@ -82,6 +110,11 @@ class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase assertThat(job.keyspaceName).isEqualTo("ks"); assertThat(job.tableName).isEqualTo("tbl"); assertThat(job.status).isEqualTo(status); + if (abortReason != null) + { + assertThat(job.statusWithOptionalDescription()).isEqualTo(String.format("%s: %s", status, abortReason)); + } + assertThat(job.status).isEqualTo(status); assertThat(job.expireAt.getTime()).isEqualTo(expiresAtMillis); assertThat(job.secrets).isEqualTo(secrets); } diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java index 671cfc36..9a31cd9e 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java @@ -56,6 +56,7 @@ import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -247,7 +248,7 @@ class RestoreJobDiscovererTest new Date(System.currentTimeMillis() - 1000L))) .collect(Collectors.toList()); ArgumentCaptor<UUID> abortedJobs = ArgumentCaptor.forClass(UUID.class); - doNothing().when(mockJobAccessor).abort(abortedJobs.capture()); + doNothing().when(mockJobAccessor).abort(abortedJobs.capture(), eq("Expired")); when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult); loop.registerPeriodicTaskExecutor(executor); executeBlocking(); diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java index 833832ba..54c9d20f 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandlerTest.java @@ -24,12 +24,18 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; import io.vertx.ext.web.codec.BodyCodec; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload; import org.apache.cassandra.sidecar.db.RestoreJobTest; import static org.assertj.core.api.Assertions.assertThat; @@ -77,22 +83,49 @@ class AbortRestoreJobHandlerTest extends BaseRestoreJobTests context, HttpResponseStatus.CONFLICT.code()); } + @Test + void testAbortJobWithReason(VertxTestContext context) throws Throwable + { + mockLookupRestoreJob(RestoreJobTest::createNewTestingJob); + sendAbortRestoreJobRequestAndVerify("ks", "table", "8e5799a4-d277-11ed-8d85-6916bb9b8056", + context, HttpResponseStatus.OK.code(), + new AbortRestoreJobRequestPayload("Analytics job has failed")); + } + private void sendAbortRestoreJobRequestAndVerify(String keyspace, String table, String jobId, VertxTestContext context, int expectedStatusCode) throws Throwable + { + sendAbortRestoreJobRequestAndVerify(keyspace, table, jobId, context, expectedStatusCode, null); + } + + private void sendAbortRestoreJobRequestAndVerify(String keyspace, + String table, + String jobId, + VertxTestContext context, + int expectedStatusCode, + AbortRestoreJobRequestPayload requestPayload) throws Throwable { WebClient client = WebClient.create(vertx, new WebClientOptions()); - client.post(server.actualPort(), "localhost", String.format(RESTORE_JOB_ABORT_ENDPOINT, keyspace, table, jobId)) - .as(BodyCodec.buffer()) - .send(resp -> { - context.verify(() -> { - assertThat(resp.result().statusCode()).isEqualTo(expectedStatusCode); - }) - .completeNow(); - client.close(); - }); + HttpRequest<Buffer> request = client.post(server.actualPort(), + "localhost", + String.format(RESTORE_JOB_ABORT_ENDPOINT, keyspace, table, jobId)) + .as(BodyCodec.buffer()); + Handler<AsyncResult<HttpResponse<Buffer>>> responseVerifier = resp -> { + context.verify(() -> assertThat(resp.result().statusCode()).isEqualTo(expectedStatusCode)) + .completeNow(); + client.close(); + }; + if (requestPayload != null) + { + request.sendJson(requestPayload, responseVerifier); + } + else + { + request.send(responseVerifier); + } context.awaitCompletion(10, TimeUnit.SECONDS); } } diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java index a58cdb84..3eb94a0b 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java @@ -190,7 +190,7 @@ public abstract class BaseRestoreJobTests } @Override - public void abort(UUID jobId) + public void abort(UUID jobId, String reason) { // do nothing } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org