This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new ee45474 CASSANDRASC-99 Break restore job into stage and import phases and persist restore slice status on phase completion ee45474 is described below commit ee454741363f3f693726af242c5ec37ad1480d60 Author: Yifan Cai <y...@apache.org> AuthorDate: Mon Jan 29 16:09:25 2024 -0800 CASSANDRASC-99 Break restore job into stage and import phases and persist restore slice status on phase completion patch by Yifan Cai; reviewed by Doug Rohrer, Francisco Guerrero for CASSANDRASC-99 --- .../data/CreateRestoreJobRequestPayload.java | 28 ++- .../sidecar/common/data/RestoreJobConstants.java | 1 + .../sidecar/common/data/RestoreJobStatus.java | 1 + .../sidecar/common/data/RestoreSliceStatus.java | 37 ++- .../data/CreateRestoreJobRequestPayloadTest.java | 6 +- .../common/data/RestoreSliceStatusTest.java | 83 +++++++ spotbugs-exclude.xml | 1 + .../config/yaml/RestoreJobConfigurationImpl.java | 16 +- .../apache/cassandra/sidecar/db/RestoreJob.java | 85 ++++--- .../sidecar/db/RestoreJobDatabaseAccessor.java | 21 +- .../apache/cassandra/sidecar/db/RestoreSlice.java | 97 ++++++-- .../sidecar/db/RestoreSliceDatabaseAccessor.java | 47 ++-- .../sidecar/db/schema/RestoreJobsSchema.java | 5 +- .../sidecar/db/schema/RestoreSlicesSchema.java | 2 +- .../sidecar/locator/CachedLocalTokenRanges.java | 276 +++++++++++++++++++++ .../sidecar/locator/LocalTokenRangesProvider.java | 41 +++ .../sidecar/restore/RestoreJobDiscoverer.java | 55 +++- .../cassandra/sidecar/restore/RestoreJobUtil.java | 2 +- .../sidecar/restore/RestoreProcessor.java | 36 ++- .../sidecar/restore/RestoreSliceTask.java | 118 +++++++-- .../cassandra/sidecar/restore/StorageClient.java | 2 +- .../routes/restore/AbortRestoreJobHandler.java | 6 +- .../routes/restore/CreateRestoreJobHandler.java | 2 +- .../routes/restore/CreateRestoreSliceHandler.java | 2 +- .../routes/restore/UpdateRestoreJobHandler.java | 17 +- .../db/RestoreJobsDatabaseAccessorIntTest.java | 12 +- .../testing/ConfigurableCassandraTestContext.java | 43 +++- .../cassandra/sidecar/db/RestoreJobTest.java | 16 ++ .../cassandra/sidecar/db/SidecarSchemaTest.java | 53 +++- .../sidecar/restore/RestoreJobDiscovererTest.java | 84 ++++--- .../sidecar/restore/RestoreJobManagerTest.java | 7 +- .../sidecar/restore/RestoreProcessorTest.java | 3 +- .../sidecar/restore/RestoreSliceTaskTest.java | 113 +++++++-- .../sidecar/restore/RestoreSliceTest.java | 2 +- .../routes/restore/BaseRestoreJobTests.java | 1 - .../restore/RestoreJobSummaryHandlerTest.java | 29 ++- .../restore/UpdateRestoreJobHandlerTest.java | 10 +- .../sidecar/utils/AsyncFileSystemUtilsTest.java | 111 +++++++++ 38 files changed, 1255 insertions(+), 216 deletions(-) diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java index 12858d8..0e5a9a0 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java @@ -26,8 +26,10 @@ import java.util.function.Consumer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_AGENT; +import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_CONSISTENCY_LEVEL; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_EXPIRE_AT; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_ID; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_IMPORT_OPTIONS; @@ -43,6 +45,8 @@ public class CreateRestoreJobRequestPayload private final RestoreJobSecrets secrets; private final SSTableImportOptions importOptions; private final long expireAtInMillis; + @Nullable + private final String consistencyLevel; // optional field /** * Builder to build a CreateRestoreJobRequest @@ -65,13 +69,15 @@ public class CreateRestoreJobRequestPayload * @param secrets secrets to be used by restore job to download data * @param importOptions the configured options for SSTable import * @param expireAtInMillis a timestamp in the future when the job is considered expired + * @param consistencyLevel consistency level a job should satisfy */ @JsonCreator public CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId, @JsonProperty(JOB_AGENT) String jobAgent, @JsonProperty(JOB_SECRETS) RestoreJobSecrets secrets, @JsonProperty(JOB_IMPORT_OPTIONS) SSTableImportOptions importOptions, - @JsonProperty(JOB_EXPIRE_AT) long expireAtInMillis) + @JsonProperty(JOB_EXPIRE_AT) long expireAtInMillis, + @JsonProperty(JOB_CONSISTENCY_LEVEL) String consistencyLevel) { Preconditions.checkArgument(jobId == null || jobId.version() == 1, "Only time based UUIDs allowed for jobId"); @@ -85,6 +91,7 @@ public class CreateRestoreJobRequestPayload ? SSTableImportOptions.defaults() : importOptions; this.expireAtInMillis = expireAtInMillis; + this.consistencyLevel = consistencyLevel; } private CreateRestoreJobRequestPayload(Builder builder) @@ -94,6 +101,7 @@ public class CreateRestoreJobRequestPayload this.secrets = builder.secrets; this.importOptions = builder.importOptions; this.expireAtInMillis = builder.expireAtInMillis; + this.consistencyLevel = builder.consistencyLevel; } /** @@ -151,6 +159,16 @@ public class CreateRestoreJobRequestPayload return new Date(expireAtInMillis); } + /** + * @return the consistency level a job should satisfy + */ + @JsonProperty(JOB_CONSISTENCY_LEVEL) + @Nullable + public String consistencyLevel() + { + return consistencyLevel; + } + @Override public String toString() { @@ -159,6 +177,7 @@ public class CreateRestoreJobRequestPayload JOB_AGENT + "='" + jobAgent + "', " + JOB_SECRETS + "='" + secrets + "', " + JOB_EXPIRE_AT + "='" + expireAtInMillis + "', " + + JOB_CONSISTENCY_LEVEL + "='" + consistencyLevel + "', " + JOB_IMPORT_OPTIONS + "='" + importOptions + "'}"; } @@ -173,6 +192,7 @@ public class CreateRestoreJobRequestPayload private UUID jobId = null; private String jobAgent = null; + private String consistencyLevel = null; Builder(RestoreJobSecrets secrets, long expireAtInMillis) { @@ -198,6 +218,12 @@ public class CreateRestoreJobRequestPayload return this; } + public Builder consistencyLevel(String consistencyLevel) + { + this.consistencyLevel = consistencyLevel; + return this; + } + public CreateRestoreJobRequestPayload build() { return new CreateRestoreJobRequestPayload(this); diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java index 78707ae..d18b0b3 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java @@ -32,6 +32,7 @@ public class RestoreJobConstants public static final String JOB_CREATED_AT = "createdAt"; public static final String JOB_KEYSPACE = "keyspace"; public static final String JOB_TABLE = "table"; + public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel"; public static final String SLICE_ID = "sliceId"; public static final String BUCKET_ID = "bucketId"; public static final String SLICE_START_TOKEN = "startToken"; diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java index 1d691da..4694510 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java @@ -24,6 +24,7 @@ package org.apache.cassandra.sidecar.common.data; public enum RestoreJobStatus { CREATED, + STAGED, @Deprecated // replaced by ABORTED FAILED, ABORTED, diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java index fa874bb..379be71 100644 --- a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java +++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java @@ -18,15 +18,44 @@ package org.apache.cassandra.sidecar.common.data; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.sidecar.common.utils.Preconditions; + /** * Holds all possible restore slice statues */ public enum RestoreSliceStatus { - EMPTY, - PROCESSING, - COMMITTING, SUCCEEDED, FAILED, - ABORTED + ABORTED, + COMMITTING(SUCCEEDED, FAILED, ABORTED), + STAGED(COMMITTING, FAILED, ABORTED), + PROCESSING(STAGED, FAILED, ABORTED), + EMPTY(PROCESSING, FAILED, ABORTED); + + // Do not use EnumSet, since validTargetStatuses is assigned on constructing and enums are not available yet. + private final Set<RestoreSliceStatus> validTargetStatusSet; + + RestoreSliceStatus(RestoreSliceStatus... targetStatuses) + { + this.validTargetStatusSet = new HashSet<>(); + Collections.addAll(validTargetStatusSet, targetStatuses); + } + + /** + * Advance the status with validation + * @param targetStatus target status to advance to + * @return new status + */ + public RestoreSliceStatus advanceTo(RestoreSliceStatus targetStatus) + { + Preconditions.checkArgument(validTargetStatusSet.contains(targetStatus), + name() + " status can only advance to one of the follow statuses: " + + validTargetStatusSet); + return targetStatus; + } } diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java index e7f47b9..08ca103 100644 --- a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java +++ b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java @@ -46,6 +46,7 @@ class CreateRestoreJobRequestPayloadTest Date date = Date.from(Instant.ofEpochMilli(time)); CreateRestoreJobRequestPayload req = CreateRestoreJobRequestPayload.builder(secrets, time) .jobId(UUID.fromString(id)) + .consistencyLevel("QUORUM") .jobAgent("agent") .build(); String json = MAPPER.writeValueAsString(req); @@ -56,6 +57,7 @@ class CreateRestoreJobRequestPayloadTest assertThat(test.expireAtInMillis()).isEqualTo(time); assertThat(test.expireAtAsDate()).isEqualTo(date); assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults()); + assertThat(test.consistencyLevel()).isEqualTo("QUORUM"); } @Test @@ -157,9 +159,9 @@ class CreateRestoreJobRequestPayloadTest assertThat(test.expireAtInMillis()).isEqualTo(time); assertThat(test.expireAtAsDate()).isEqualTo(date); assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults()); + assertThat(test.consistencyLevel()).isNull(); } - @Test void testBuilder() { @@ -172,11 +174,13 @@ class CreateRestoreJobRequestPayloadTest .resetLevel(false) .clearRepaired(false); }) + .consistencyLevel("QUORUM") .build(); assertThat(req.secrets()).isEqualTo(secrets); assertThat(req.jobAgent()).isEqualTo("agent"); assertThat(req.importOptions()).isEqualTo(SSTableImportOptions.defaults() .resetLevel(false) .clearRepaired(false)); + assertThat(req.consistencyLevel()).isEqualTo("QUORUM"); } } diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java new file mode 100644 index 0000000..27607cf --- /dev/null +++ b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.data; + +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.ABORTED; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.COMMITTING; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.EMPTY; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.FAILED; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.PROCESSING; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.STAGED; +import static org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class RestoreSliceStatusTest +{ + @Test + void testStatusAdvancing() + { + assertAdvanceTo(EMPTY, PROCESSING); + assertAdvanceTo(EMPTY, FAILED); + assertAdvanceTo(EMPTY, ABORTED); + assertAdvanceTo(PROCESSING, STAGED); + assertAdvanceTo(PROCESSING, FAILED); + assertAdvanceTo(PROCESSING, ABORTED); + assertAdvanceTo(STAGED, COMMITTING); + assertAdvanceTo(STAGED, FAILED); + assertAdvanceTo(STAGED, ABORTED); + assertAdvanceTo(COMMITTING, SUCCEEDED); + assertAdvanceTo(COMMITTING, FAILED); + assertAdvanceTo(COMMITTING, ABORTED); + } + + @Test + void testInvalidStatusAdvancing() + { + String commonErrorMsg = "status can only advance to one of the follow statuses"; + + Stream + .of(new RestoreSliceStatus[][] + { // define test cases of invalid status advancing, e.g. it is invalid to advance from EMPTY to STAGED + { EMPTY, STAGED }, + { STAGED, EMPTY }, + { EMPTY, COMMITTING }, + { STAGED, SUCCEEDED }, + { COMMITTING, STAGED }, + { STAGED, STAGED }, + { SUCCEEDED, FAILED }, + { FAILED, SUCCEEDED } + }) + .forEach(testCase -> { + assertThatThrownBy(() -> testCase[0].advanceTo(testCase[1])) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasNoCause() + .hasMessageContaining(commonErrorMsg); + }); + } + + private void assertAdvanceTo(RestoreSliceStatus from, RestoreSliceStatus to) + { + assertThat(from.advanceTo(to)).isEqualTo(to); + } +} diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 2439930..03be65c 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -44,6 +44,7 @@ <Class name="org.apache.cassandra.sidecar.CassandraSidecarDaemon" /> <Class name="org.apache.cassandra.sidecar.utils.SSTableImporter" /> <Class name="org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask" /> + <Class name="org.apache.cassandra.sidecar.restore.RestoreSliceTaskTest" /> </Or> </Match> diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java index 6048d26..9ce5efb 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java @@ -167,8 +167,7 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public Builder jobDiscoveryActiveLoopDelayMillis(long jobDiscoveryActiveLoopDelayMillis) { - this.jobDiscoveryActiveLoopDelayMillis = jobDiscoveryActiveLoopDelayMillis; - return this; + return update(b -> b.jobDiscoveryActiveLoopDelayMillis = jobDiscoveryActiveLoopDelayMillis); } /** @@ -180,8 +179,7 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public Builder jobDiscoveryIdleLoopDelayMillis(long jobDiscoveryIdleLoopDelayMillis) { - this.jobDiscoveryIdleLoopDelayMillis = jobDiscoveryIdleLoopDelayMillis; - return this; + return update(b -> b.jobDiscoveryIdleLoopDelayMillis = jobDiscoveryIdleLoopDelayMillis); } /** @@ -193,8 +191,7 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public Builder jobDiscoveryRecencyDays(int jobDiscoveryRecencyDays) { - this.jobDiscoveryRecencyDays = jobDiscoveryRecencyDays; - return this; + return update(b -> b.jobDiscoveryRecencyDays = jobDiscoveryRecencyDays); } /** @@ -206,8 +203,7 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public Builder processMaxConcurrency(int processMaxConcurrency) { - this.processMaxConcurrency = processMaxConcurrency; - return this; + return update(b -> b.processMaxConcurrency = processMaxConcurrency); } /** @@ -219,11 +215,9 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public Builder restoreJobTablesTtlSeconds(long restoreJobTablesTtlSeconds) { - this.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds; - return this; + return update(b -> b.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds); } - @Override public RestoreJobConfigurationImpl build() { diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java index a7cee35..475f957 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; import java.util.Date; import java.util.UUID; -import com.google.common.annotations.VisibleForTesting; - import com.datastax.driver.core.LocalDate; import com.datastax.driver.core.Row; import com.datastax.driver.core.utils.Bytes; @@ -54,6 +52,7 @@ public class RestoreJob public final Date expireAt; public final short bucketCount; public final String consistencyLevel; + public final Manager restoreJobManager; public static Builder builder() { @@ -75,46 +74,12 @@ public class RestoreJob .jobStatus(decodeJobStatus(row.getString("status"))) .jobSecrets(decodeJobSecrets(row.getBytes("blob_secrets"))) .expireAt(row.getTimestamp("expire_at")) - .sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options"))); + .sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options"))) + .consistencyLevel(row.getString("consistency_level")); + // todo: Yifan, add them back when the cql statement is updated to reflect the new columns. // Add new fields to CreateRestoreJobRequestPayload too // .bucketCount(row.getShort("bucket_count")) -// .consistencyLevel(row.getString("consistency_level")); - return builder.build(); - } - - // todo: candidate to be removed - public static RestoreJob forUpdates(UUID jobId, String jobAgent, - RestoreJobStatus status, - RestoreJobSecrets secrets, - Date expireAt) - throws DataObjectMappingException - { - Builder builder = new Builder(); - builder.createdAt(toLocalDate(jobId)) - .jobId(jobId).jobAgent(jobAgent) - .jobStatus(status) - .jobSecrets(secrets) - .expireAt(expireAt); - return builder.build(); - } - - // todo: candidate to be removed - @VisibleForTesting - public static RestoreJob create(LocalDate createdAt, - UUID jobId, - String keyspaceName, - String tableName, - String jobAgent, - RestoreJobStatus status, - RestoreJobSecrets secrets, - SSTableImportOptions importOptions) - { - Builder builder = new Builder(); - builder.createdAt(createdAt) - .jobId(jobId).jobAgent(jobAgent) - .keyspace(keyspaceName).table(tableName) - .jobStatus(status).jobSecrets(secrets).sstableImportOptions(importOptions); return builder.build(); } @@ -156,6 +121,7 @@ public class RestoreJob this.expireAt = builder.expireAt; this.bucketCount = builder.bucketCount; this.consistencyLevel = builder.consistencyLevel; + this.restoreJobManager = builder.manager; } public Builder unbuild() @@ -163,6 +129,11 @@ public class RestoreJob return new Builder(this); } + public boolean isManagedBySidecar() + { + return restoreJobManager == Manager.SIDECAR; + } + /** * {@inheritDoc} */ @@ -211,6 +182,7 @@ public class RestoreJob private Date expireAt; private short bucketCount; private String consistencyLevel; + private Manager manager; private Builder() { @@ -284,7 +256,10 @@ public class RestoreJob public Builder consistencyLevel(String consistencyLevel) { - return update(b -> b.consistencyLevel = consistencyLevel); + return update(b -> { + b.consistencyLevel = consistencyLevel; + b.manager = resolveManager(consistencyLevel); + }); } @Override @@ -298,5 +273,35 @@ public class RestoreJob { return new RestoreJob(this); } + + /** + * Resolve the manager of the restore job based on the existence of consistencyLevel + * @return the resolved Manager + */ + private Manager resolveManager(String consistencyLevel) + { + // If spark is the manager, the restore job is created w/o specifying consistency level + // If the manager of the restore job is sidecar, consistency level must present + return consistencyLevel == null ? Manager.SPARK : Manager.SIDECAR; + } + } + + /** + * The manager of the restore job. The variant could change the code path a restore job runs. + * It is a feature switch essentially. + */ + public enum Manager + { + /** + * The restore job is managed by Spark. Sidecar instances are just simple workers. They rely on client/Spark + * for decision-making. + */ + SPARK, + + /** + * The restore job is managed by Sidecar. Sidecar instances should assign slices to sidecar instances + * and check whether the job has met the consistency level to complete the job. + */ + SIDECAR, } } diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java index 3a06e08..99ba533 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java @@ -82,6 +82,7 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor .jobSecrets(payload.secrets()) .sstableImportOptions(payload.importOptions()) .expireAt(payload.expireAtAsDate()) + .consistencyLevel(payload.consistencyLevel()) .build(); ByteBuffer secrets = serializeValue(job.secrets, "secrets"); ByteBuffer importOptions = serializeValue(job.importOptions, "sstable import options"); @@ -94,17 +95,29 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor job.status.toString(), secrets, importOptions, + job.consistencyLevel, job.expireAt); execute(statement); return job; } - public RestoreJob update(UpdateRestoreJobRequestPayload payload, QualifiedTableName qualifiedTableName, UUID jobId) + /** + * Update fields in the restore job and persist + * + * @param payload fields to be updated + * @param jobId job ID + * @return the restore job object with only the updated fields + * @throws DataObjectMappingException when secrets json cannot be serialized + */ + public RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID jobId) throws DataObjectMappingException { sidecarSchema.ensureInitialized(); + RestoreJob.Builder updateBuilder = RestoreJob.builder(); LocalDate createdAt = RestoreJob.toLocalDate(jobId); + updateBuilder.createdAt(createdAt) + .jobId(jobId); RestoreJobSecrets secrets = payload.secrets(); RestoreJobStatus status = payload.status(); @@ -127,22 +140,26 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor { throw new DataObjectMappingException("Failed to serialize secrets", e); } + updateBuilder.jobSecrets(secrets); } if (status != null) { batchStatement.add(restoreJobsSchema.updateStatus().bind(createdAt, jobId, status.name())); + updateBuilder.jobStatus(status); } if (jobAgent != null) { batchStatement.add(restoreJobsSchema.updateJobAgent().bind(createdAt, jobId, jobAgent)); + updateBuilder.jobAgent(jobAgent); } if (expireAt != null) { batchStatement.add(restoreJobsSchema.updateExpireAt().bind(createdAt, jobId, expireAt)); + updateBuilder.expireAt(expireAt); } execute(batchStatement); - return RestoreJob.forUpdates(jobId, jobAgent, status, secrets, expireAt); + return updateBuilder.build(); } public void abort(UUID jobId) diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java index fdf45c5..6353894 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java @@ -21,6 +21,8 @@ package org.apache.cassandra.sidecar.db; import java.math.BigInteger; import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -46,7 +48,14 @@ import org.apache.cassandra.sidecar.utils.SSTableImporter; import org.jetbrains.annotations.NotNull; /** - * Data object that contains all values that matter to the restore job slice + * <p>Data object that contains all values that matter to the restore job slice.</p> + * + * <p>How the staged files are organized on disk? For each slice,</p> + * <ol> + * <li>the S3 object is downloaded to the path at "stageDirectory/key". It is a zip file.</li> + * <li>the zip is then extracted to the directory at "stageDirectory/keyspace/table/". + * The extracted sstables are imported into Cassandra.</li> + * </ol> */ public class RestoreSlice { @@ -58,7 +67,12 @@ public class RestoreSlice private final String bucket; private final String key; private final String checksum; // etag - private final Path targetPathInStaging; // the path to store the s3 object of the slice + // The path to the directory that stores the s3 object of the slice and the sstables after unzipping. + // Its value is "baseStageDirectory/uploadId" + private final Path stageDirectory; + // The path to the staged s3 object (file). The path is inside stageDirectory. + // Its value is "stageDirectory/key" + private final Path stagedObjectPath; private final String uploadId; private final InstanceMetadata owner; private final BigInteger startToken; @@ -69,7 +83,11 @@ public class RestoreSlice private final long compressedSize; private final long uncompressedSize; private RestoreSliceTracker tracker; + + // mutable states private boolean existsOnS3 = false; + private boolean hasStaged = false; + private boolean hasImported = false; private int downloadAttempt = 0; private volatile boolean isCancelled = false; @@ -88,7 +106,8 @@ public class RestoreSlice this.bucket = builder.bucket; this.key = builder.key; this.checksum = builder.checksum; - this.targetPathInStaging = builder.targetPathInStaging; + this.stageDirectory = builder.stageDirectory; + this.stagedObjectPath = builder.stagedObjectPath; this.uploadId = builder.uploadId; this.owner = builder.owner; this.startToken = builder.startToken; @@ -151,13 +170,29 @@ public class RestoreSlice } /** - * Make the slice as completed + * Mark the slice as completed */ public void complete() { tracker.completeSlice(this); } + /** + * Mark the slice has completed the stage phase + */ + public void completeStagePhase() + { + this.hasStaged = true; + } + + /** + * Mark the slice has completed the import phase + */ + public void completeImportPhase() + { + this.hasImported = true; + } + public void failAtInstance(int instanceId) { statusByReplica.put(String.valueOf(instanceId), RestoreSliceStatus.FAILED); @@ -169,6 +204,7 @@ public class RestoreSlice public void fail(RestoreJobFatalException exception) { tracker.fail(exception); + failAtInstance(owner().id()); } public void setExistsOnS3() @@ -196,6 +232,7 @@ public class RestoreSlice ExecutorPools.TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats) { if (isCancelled) @@ -204,10 +241,12 @@ public class RestoreSlice try { - RestoreJob restoreJob = job(); - StorageClient s3Client = s3ClientPool.storageClient(restoreJob); - return new RestoreSliceTask(restoreJob, this, s3Client, - executorPool, importer, requiredUsableSpacePercentage, stats); + StorageClient s3Client = s3ClientPool.storageClient(job()); + return new RestoreSliceTask(this, s3Client, + executorPool, importer, + requiredUsableSpacePercentage, + sliceDatabaseAccessor, + stats); } catch (IllegalStateException illegalState) { @@ -300,9 +339,21 @@ public class RestoreSlice return this.replicas; } - public Path targetPathInStaging() + /** + * @return the path to the directory that stores the s3 object of the slice + * and the sstables after unzipping + */ + public Path stageDirectory() + { + return stageDirectory; + } + + /** + * @return the path to the staged s3 object + */ + public Path stagedObjectPath() { - return targetPathInStaging; + return stagedObjectPath; } public long compressedSize() @@ -330,6 +381,16 @@ public class RestoreSlice return existsOnS3; } + public boolean hasStaged() + { + return hasStaged; + } + + public boolean hasImported() + { + return hasImported; + } + public int downloadAttempt() { return downloadAttempt; @@ -350,6 +411,7 @@ public class RestoreSlice public static RestoreSlice from(Row row) { Builder builder = new Builder(); + builder.jobId(row.getUUID("job_id")); builder.sliceId(row.getString("slice_id")); builder.bucketId(row.getShort("bucket_id")); builder.storageBucket(row.getString("bucket")); @@ -377,7 +439,8 @@ public class RestoreSlice private String bucket; private String key; private String checksum; // etag - private Path targetPathInStaging; // the path to store the s3 object of the slice + private Path stageDirectory; + private Path stagedObjectPath; private String uploadId; private InstanceMetadata owner; private BigInteger startToken; @@ -401,7 +464,7 @@ public class RestoreSlice this.bucket = slice.bucket; this.key = slice.key; this.checksum = slice.checksum; - this.targetPathInStaging = slice.targetPathInStaging; + this.stageDirectory = slice.stageDirectory; this.uploadId = slice.uploadId; this.owner = slice.owner; this.startToken = slice.startToken; @@ -450,10 +513,10 @@ public class RestoreSlice return update(b -> b.checksum = checksum); } - public Builder targetPathInStaging(Path basePath, String uploadId) + public Builder stageDirectory(Path basePath, String uploadId) { return update(b -> { - b.targetPathInStaging = basePath.resolve(uploadId); + b.stageDirectory = basePath.resolve(uploadId); b.uploadId = uploadId; }); } @@ -485,12 +548,12 @@ public class RestoreSlice public Builder replicaStatus(Map<String, RestoreSliceStatus> statusByReplica) { - return update(b -> b.statusByReplica = Collections.unmodifiableMap(statusByReplica)); + return update(b -> b.statusByReplica = new HashMap<>(statusByReplica)); } public Builder replicas(Set<String> replicas) { - return update(b -> b.replicas = Collections.unmodifiableSet(replicas)); + return update(b -> b.replicas = new HashSet<>(replicas)); } /** @@ -525,6 +588,8 @@ public class RestoreSlice @Override public RestoreSlice build() { + // precompute the path to the to-be-staged object on disk + stagedObjectPath = stageDirectory.resolve(key); return new RestoreSlice(this); } diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java index c3624a9..6416ce0 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java @@ -51,18 +51,19 @@ public class RestoreSliceDatabaseAccessor extends DatabaseAccessor public RestoreSlice create(RestoreSlice slice) { - BoundStatement statement = restoreSlicesSchema.insertSlice().bind(slice.jobId(), - slice.bucketId(), - slice.sliceId(), - slice.bucket(), - slice.key(), - slice.checksum(), - slice.startToken(), - slice.endToken(), - slice.compressedSize(), - slice.uncompressedSize(), - slice.statusByReplica(), - slice.replicas()); + BoundStatement statement = restoreSlicesSchema.insertSlice() + .bind(slice.jobId(), + slice.bucketId(), + slice.sliceId(), + slice.bucket(), + slice.key(), + slice.checksum(), + slice.startToken(), + slice.endToken(), + slice.compressedSize(), + slice.uncompressedSize(), + slice.statusByReplica(), + slice.replicas()); execute(statement); return slice; } @@ -71,12 +72,13 @@ public class RestoreSliceDatabaseAccessor extends DatabaseAccessor { sidecarSchema.ensureInitialized(); - BoundStatement statement = restoreSlicesSchema.updateStatus().bind(slice.statusByReplica(), - slice.replicas(), - slice.jobId(), - slice.bucketId(), - slice.startToken(), - slice.sliceId()); + BoundStatement statement = restoreSlicesSchema.updateStatus() + .bind(slice.statusByReplica(), + slice.replicas(), + slice.jobId(), + slice.bucketId(), + slice.startToken(), + slice.sliceId()); Row row = execute(statement).one(); if (row == null) { @@ -91,10 +93,11 @@ public class RestoreSliceDatabaseAccessor extends DatabaseAccessor { sidecarSchema.ensureInitialized(); - BoundStatement statement = restoreSlicesSchema.findAllByTokenRange().bind(jobId, - bucketId, - startToken, - endToken); + BoundStatement statement = restoreSlicesSchema.findAllByTokenRange() + .bind(jobId, + bucketId, + startToken, + endToken); ResultSet result = execute(statement); List<RestoreSlice> slices = new ArrayList<>(); for (Row row : result) diff --git a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java index 4476a61..8df27bf 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java @@ -141,8 +141,9 @@ public class RestoreJobsSchema extends AbstractSchema.TableSchema " status," + " blob_secrets," + " import_options," + + " consistency_level," + " expire_at" + - ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", config); + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", config); } static String updateBlobSecrets(SchemaKeyspaceConfiguration config) @@ -192,6 +193,7 @@ public class RestoreJobsSchema extends AbstractSchema.TableSchema "status, " + "blob_secrets, " + "import_options, " + + "consistency_level, " + "expire_at " + "FROM %s.%s " + "WHERE created_at = ? AND job_id = ?", config); @@ -207,6 +209,7 @@ public class RestoreJobsSchema extends AbstractSchema.TableSchema "status, " + "blob_secrets, " + "import_options, " + + "consistency_level, " + "expire_at " + "FROM %s.%s " + "WHERE created_at = ?", config); diff --git a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java index c016d2b..0602a9e 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java @@ -78,7 +78,7 @@ public class RestoreSlicesSchema extends AbstractSchema.TableSchema " end_token varint," + " compressed_size bigint," + " uncompressed_size bigint," + - " status_by_replica map<text, text>," + + " status_by_replica map<text, text>," + // key is instance ID; value is RestoreSliceStatus " all_replicas set<text>," + " PRIMARY KEY ((job_id, bucket_id), start_token, slice_id)" + ") WITH default_time_to_live = %s", diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java new file mode 100644 index 0000000..a171594 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.locator; + +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.dns.DnsResolver; +import org.jetbrains.annotations.NotNull; + +/** + * Get token ranges owned and replicated to the local Cassandra instance(s) by keyspace + * The results are cached and gets invalidated when local instances or cluster topology changed + */ +@Singleton +public class CachedLocalTokenRanges implements LocalTokenRangesProvider +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CachedLocalTokenRanges.class); + private final InstancesConfig instancesConfig; + private final DnsResolver dnsResolver; + + @GuardedBy("this") + private Set<Integer> localInstanceIdsCache; + @GuardedBy("this") + private Set<Host> allInstancesCache; + @GuardedBy("this") + private Set<Host> localInstancesCache; + @GuardedBy("this") + private ImmutableMap<String, Map<Integer, Set<TokenRange>>> localTokenRangesCache; + + @Inject + public CachedLocalTokenRanges(InstancesConfig instancesConfig, DnsResolver dnsResolver) + { + this.instancesConfig = instancesConfig; + this.dnsResolver = dnsResolver; + this.localTokenRangesCache = null; + this.localInstanceIdsCache = null; + this.allInstancesCache = null; + this.localInstancesCache = null; + } + + @Override + @Nullable + public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace) + { + List<InstanceMetadata> localInstances = instancesConfig.instances(); + + if (localInstances.isEmpty()) + { + LOGGER.warn("No local instances found"); + return Collections.emptyMap(); + } + + CassandraAdapterDelegate delegate = localInstances.get(0).delegate(); + Metadata metadata = delegate == null ? null : delegate.metadata(); + if (metadata == null) + { + LOGGER.debug("Not yet connect to Cassandra cluster"); + return Collections.emptyMap(); + } + + if (metadata.getKeyspace(keyspace) == null) + { + throw new NoSuchElementException("Keyspace does not exist. keyspace: " + keyspace); + } + + Set<Integer> localInstanceIds = localInstances.stream() + .map(InstanceMetadata::id) + .collect(Collectors.toSet()); + Set<Host> allInstances = metadata.getAllHosts(); + return getCacheOrReload(metadata, keyspace, localInstanceIds, localInstances, allInstances); + } + + /** + * Return the token ranges owned and replicated to the host according to the replication strategy of the keyspace + * The result set is unmodifiable. + */ + @Nullable + private Pair<Host, Set<TokenRange>> tokenRangesOfHost(Metadata metadata, + String keyspace, + InstanceMetadata instance, + Map<IpAddressAndPort, Host> allHosts) + { + Host host; + try + { + final IpAddressAndPort ip = IpAddressAndPort.of(dnsResolver.resolve(instance.host()), instance.port()); + host = allHosts.get(ip); + if (host == null) + { + LOGGER.warn("Could not map InstanceMetadata to Host host={} port={} ip={}", + instance.host(), instance.port(), ip.ipAddress); + return null; + } + } + catch (UnknownHostException e) + { + throw new RuntimeException("Failed to resolve hostname to ip. hostname: " + instance.host(), e); + } + return Pair.of(host, tokenRangesOfHost(metadata, keyspace, host)); + } + + public Set<TokenRange> tokenRangesOfHost(Metadata metadata, String keyspace, Host host) + { + return metadata.getTokenRanges(keyspace, host) + .stream() + .flatMap(range -> TokenRange.from(range).stream()) + .collect(Collectors.toSet()); + } + + /** + * Reload the locally cached token ranges when needed + */ + @Nullable + private synchronized Map<Integer, Set<TokenRange>> getCacheOrReload(Metadata metadata, + String keyspace, + Set<Integer> localInstanceIds, + List<InstanceMetadata> localInstances, + Set<Host> allInstances) + { + // exit early if no change is found + boolean isClusterTheSame = allInstances.equals(allInstancesCache) + && localInstanceIds.equals(localInstanceIdsCache); + if (localTokenRangesCache != null + && localTokenRangesCache.containsKey(keyspace) + && isClusterTheSame) + { + return localTokenRangesCache.get(keyspace); + } + + // otherwise, reload the token ranges + localInstanceIdsCache = localInstanceIds; + allInstancesCache = allInstances; + if (allInstances.isEmpty()) + { + LOGGER.warn("No instances found in client session"); + } + Map<IpAddressAndPort, Host> allHosts = new HashMap<>(allInstancesCache.size()); + BiConsumer<InetSocketAddress, Host> putNullSafe = (endpoint, host) -> { + if (endpoint != null) + { + allHosts.put(IpAddressAndPort.of(endpoint), host); + } + }; + for (Host host : allInstancesCache) + { + putNullSafe.accept(host.getSocketAddress(), host); + putNullSafe.accept(host.getListenSocketAddress(), host); + putNullSafe.accept(host.getBroadcastSocketAddress(), host); + } + + ImmutableMap.Builder<String, Map<Integer, Set<TokenRange>>> perKeyspaceBuilder = ImmutableMap.builder(); + ImmutableSet.Builder<Host> hostBuilder = ImmutableSet.builder(); + if (isClusterTheSame && localInstancesCache != null) + { + hostBuilder.addAll(localInstancesCache); + } + + for (KeyspaceMetadata ks : metadata.getKeyspaces()) + { + if (isClusterTheSame && localTokenRangesCache != null && localTokenRangesCache.containsKey(ks.getName())) + { + // we don't need to rebuild if already cached + perKeyspaceBuilder.put(ks.getName(), localTokenRangesCache.get(ks.getName())); + } + else + { + ImmutableMap.Builder<Integer, Set<TokenRange>> resultBuilder = ImmutableMap.builder(); + for (InstanceMetadata instance : localInstances) + { + Pair<Host, Set<TokenRange>> pair = tokenRangesOfHost(metadata, keyspace, instance, allHosts); + if (pair != null) + { + hostBuilder.add(pair.getKey()); + resultBuilder.put(instance.id(), Collections.unmodifiableSet(pair.getValue())); + } + } + perKeyspaceBuilder.put(ks.getName(), resultBuilder.build()); + } + } + localTokenRangesCache = perKeyspaceBuilder.build(); + localInstancesCache = hostBuilder.build(); + if (localInstancesCache.isEmpty()) + { + LOGGER.warn("Unable to determine local instances from client meta-data!"); + } + return localTokenRangesCache.get(keyspace); + } + + private static class IpAddressAndPort + { + final String ipAddress; + final int port; + + static IpAddressAndPort of(@NotNull InetSocketAddress endpoint) + { + return IpAddressAndPort.of(endpoint.getAddress().getHostAddress(), + endpoint.getPort()); + } + + static IpAddressAndPort of(String ipAddress, int port) + { + return new IpAddressAndPort(ipAddress, port); + } + + IpAddressAndPort(String ipAddress, int port) + { + this.ipAddress = ipAddress; + this.port = port; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + IpAddressAndPort that = (IpAddressAndPort) o; + return port == that.port && Objects.equals(ipAddress, that.ipAddress); + } + + @Override + public int hashCode() + { + return Objects.hash(ipAddress, port); + } + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java new file mode 100644 index 0000000..cef04f6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.locator; + +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * Provides the token ranges of the local Cassandra instance(s) + */ +public interface LocalTokenRangesProvider +{ + /** + * Calculate the token ranges owned and replicated to the local Cassandra instance(s). + * When Sidecar is paired with multiple Cassandra instance, the ranges of each Cassandra instance is captured + * in the form of map, where the key is the instance id and the value is the ranges of the instance. When Sidecar + * is paired with a single Cassandra instance, the result map has a single entry. + * + * @param keyspace keyspace to determine replication + * @return token ranges of the local Cassandra instances + */ + @Nullable + Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace); +} diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index 98f496b..e805df3 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -30,12 +30,17 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; +import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; +import org.apache.cassandra.sidecar.locator.TokenRange; import org.apache.cassandra.sidecar.stats.RestoreJobStats; import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; @@ -54,6 +59,7 @@ public class RestoreJobDiscoverer implements PeriodicTask private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor; private final RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor; private final Provider<RestoreJobManagerGroup> restoreJobManagerGroupSingleton; + private final LocalTokenRangesProvider localTokenRangesProvider; private final InstanceMetadataFetcher instanceMetadataFetcher; private final RestoreJobStats stats; private volatile boolean refreshSignaled = true; @@ -67,6 +73,7 @@ public class RestoreJobDiscoverer implements PeriodicTask RestoreJobDatabaseAccessor restoreJobDatabaseAccessor, RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor, Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider, + CachedLocalTokenRanges cachedLocalTokenRanges, InstanceMetadataFetcher instanceMetadataFetcher, RestoreJobStats stats) { @@ -75,6 +82,7 @@ public class RestoreJobDiscoverer implements PeriodicTask restoreJobDatabaseAccessor, restoreSliceDatabaseAccessor, restoreJobManagerGroupProvider, + cachedLocalTokenRanges, instanceMetadataFetcher, stats); } @@ -85,6 +93,7 @@ public class RestoreJobDiscoverer implements PeriodicTask RestoreJobDatabaseAccessor restoreJobDatabaseAccessor, RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor, Provider<RestoreJobManagerGroup> restoreJobManagerGroupProvider, + LocalTokenRangesProvider cachedLocalTokenRanges, InstanceMetadataFetcher instanceMetadataFetcher, RestoreJobStats stats) { @@ -94,6 +103,7 @@ public class RestoreJobDiscoverer implements PeriodicTask this.restoreSliceDatabaseAccessor = restoreSliceDatabaseAccessor; this.jobDiscoveryRecencyDays = restoreJobConfig.jobDiscoveryRecencyDays(); this.restoreJobManagerGroupSingleton = restoreJobManagerGroupProvider; + this.localTokenRangesProvider = cachedLocalTokenRanges; this.instanceMetadataFetcher = instanceMetadataFetcher; this.stats = stats; } @@ -153,6 +163,7 @@ public class RestoreJobDiscoverer implements PeriodicTask switch (job.status) { case CREATED: + case STAGED: if (job.expireAt == null // abort all old jobs that has no expireAt value || job.expireAt.getTime() < nowMillis) { @@ -164,6 +175,11 @@ public class RestoreJobDiscoverer implements PeriodicTask // find the oldest non-completed job days = Math.max(days, delta(today, job.createdAt)); restoreJobManagers.updateRestoreJob(job); + if (job.isManagedBySidecar()) + { + // todo: potential exceedingly number of queries + findSlicesAndSubmit(job); + } inflightJobsCount += 1; break; case FAILED: @@ -208,7 +224,6 @@ public class RestoreJobDiscoverer implements PeriodicTask } /** - * TODO: remove the method on phase 2 completion * Signal the job discovery loop to refresh in the next execution */ public void signalRefresh() @@ -216,6 +231,44 @@ public class RestoreJobDiscoverer implements PeriodicTask refreshSignaled = true; } + // find all slices of the job that should be downloaded to the local instances, + // according to the cluster token ownership + private void findSlicesAndSubmit(RestoreJob restoreJob) + { + localTokenRangesProvider.localTokenRanges(restoreJob.keyspaceName) + .forEach((key, ranges) -> { + int instanceId = key; + InstanceMetadata instance = instanceMetadataFetcher.instance(instanceId); + ranges.forEach(range -> findSlicesOfRangeAndSubmit(instance, restoreJob, range)); + }); + } + + // try to submit the slice. + // If it is already exist, it is a no-op. + // If the submission fails, the slice status of the instance is updated. + private void findSlicesOfRangeAndSubmit(InstanceMetadata instance, RestoreJob restoreJob, TokenRange range) + { + short bucketId = 0; // TODO: update the implementation to pick proper bucketId + restoreSliceDatabaseAccessor + .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId, range.start, range.end) + .forEach(slice -> { + // set the owner instance, which is not read from database + slice = slice.unbuild().ownerInstance(instance).build(); + try + { + // todo: do not re-submit for download if the slice is staged (when job status is before staged) + // or imported (when job status is staged) on the instance already + restoreJobManagerGroupSingleton.get().trySubmit(instance, slice, restoreJob); + } + catch (RestoreJobFatalException e) + { + slice.fail(e); // TODO: is it still needed? no, remove it later. + slice.failAtInstance(instance.id()); + restoreSliceDatabaseAccessor.updateStatus(slice); + } + }); + } + private boolean abortJob(RestoreJob job) { LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java index 8828c8f..d33ff37 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java @@ -61,7 +61,7 @@ public class RestoreJobUtil */ public static void unzip(File zipFile, File targetDir) throws IOException, RestoreJobException { - try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile))) + try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(zipFile.toPath()))) { ZipEntry zipEntry = zis.getNextEntry(); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java index d48b705..af6332f 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java @@ -36,6 +36,7 @@ import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.RestoreSlice; +import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; @@ -58,6 +59,7 @@ public class RestoreProcessor implements PeriodicTask private final ConcurrencyLimiter processMaxConcurrency; private final SliceQueue sliceQueue = new SliceQueue(); private final double requiredUsableSpacePercentage; // value range: [0.0, 1.0) + private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor; private final RestoreJobStats stats; private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile @@ -67,6 +69,7 @@ public class RestoreProcessor implements PeriodicTask SidecarSchema sidecarSchema, StorageClientPool s3ClientPool, SSTableImporter importer, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats) { this.pool = executorPools.internal(); @@ -77,6 +80,7 @@ public class RestoreProcessor implements PeriodicTask this.requiredUsableSpacePercentage = config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired() / 100.0; this.importer = importer; + this.sliceDatabaseAccessor = sliceDatabaseAccessor; this.stats = stats; } @@ -126,12 +130,29 @@ public class RestoreProcessor implements PeriodicTask // capture the new queue length after polling sliceQueue.captureImportQueueLength(); pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool, importer, - requiredUsableSpacePercentage, stats), + requiredUsableSpacePercentage, + sliceDatabaseAccessor, stats), false) // unordered .onSuccess(restoreSlice -> { - stats.captureSliceCompletionTime(slice.owner().id(), System.nanoTime() - slice.creationTimeNanos()); - LOGGER.info("Slice completes successfully. sliceKey={}", restoreSlice.key()); - restoreSlice.complete(); + if (slice.hasImported()) + { + stats.captureSliceCompletionTime(slice.owner().id(), System.nanoTime() - slice.creationTimeNanos()); + LOGGER.info("Slice completes successfully. sliceKey={}", slice.key()); + slice.complete(); + } + else if (slice.hasStaged()) + { + // todo: report stat of time taken to stage + LOGGER.info("Slice has been staged successfully. sliceKey={}", slice.key()); + // the slice is not fully complete yet. Re-enqueue the slice. + sliceQueue.offer(slice); + } + else // log a warning and retry. It should not reach here. + { + LOGGER.warn("Unexpected state of slice. It is neither staged nor imported. sliceKey={}", + slice.key()); + sliceQueue.offer(slice); + } }) .onFailure(cause -> { if (cause instanceof RestoreJobException && ((RestoreJobException) cause).retryable()) @@ -143,8 +164,13 @@ public class RestoreProcessor implements PeriodicTask else { LOGGER.error("Slice failed with unrecoverable failure. sliceKey={}", slice.key(), cause); - // fail the slice. In the current implementation, all slices of the job get aborted + // fail the slice and mark the slice has failed on its owning instance. + // In the phase 1 implementation, all slices of the job get aborted slice.fail(RestoreJobExceptions.toFatal(cause)); + if (slice.job().isManagedBySidecar()) + { + sliceDatabaseAccessor.updateStatus(slice); + } // revoke the s3 credentials of the job too s3ClientPool.revokeCredentials(slice.jobId()); } diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index b0214d9..0cec6f2 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; +import java.nio.file.Files; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -30,10 +31,13 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; +import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreSlice; +import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; @@ -59,27 +63,31 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> { private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTask.class); - private final RestoreJob job; private final RestoreSlice slice; private final StorageClient s3Client; private final ExecutorPools.TaskExecutorPool executorPool; private final SSTableImporter importer; private final double requiredUsableSpacePercentage; + private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor; private final RestoreJobStats stats; - public RestoreSliceTask(RestoreJob job, RestoreSlice slice, + public RestoreSliceTask(RestoreSlice slice, StorageClient s3Client, ExecutorPools.TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats) { - this.job = job; + Preconditions.checkArgument(!slice.job().isManagedBySidecar() + || sliceDatabaseAccessor != null, + "sliceDatabaseAccessor cannot be null"); this.slice = slice; this.s3Client = s3Client; this.executorPool = executorPool; this.importer = importer; this.requiredUsableSpacePercentage = requiredUsableSpacePercentage; + this.sliceDatabaseAccessor = sliceDatabaseAccessor; this.stats = stats; } @@ -92,11 +100,58 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> // The slice, when being process, requires a total of slice size (download) + uncompressed (unzip) to use. // The protection below guards the slice being process, if the usable disk space falls below the threshold // after considering the slice - ensureSufficientStorage(slice.targetPathInStaging().toString(), + ensureSufficientStorage(slice.stageDirectory().toString(), slice.compressedSize() + slice.uncompressedSize(), requiredUsableSpacePercentage, executorPool) - .onSuccess(ignored -> downloadSliceAndImport(event)) + .onSuccess(ignored -> { + RestoreJob job = slice.job(); + if (job.isManagedBySidecar()) + { + if (job.status == RestoreJobStatus.CREATED) + { + if (Files.exists(slice.stagedObjectPath())) + { + LOGGER.debug("The slice has been staged already. sliceKey={} stagedFilePath={}", + slice.key(), slice.stagedObjectPath()); + slice.completeStagePhase(); // update the flag if missed + sliceDatabaseAccessor.updateStatus(slice); + event.tryComplete(slice); + return; + } + + // 1. check object existence and validate eTag / checksum + checkObjectExistence(event) + // 2. download slice/object when the remote object exists + .thenCompose(headObject -> downloadSlice(event)) + // 3. persist status + .thenAccept(x -> { + slice.completeStagePhase(); + sliceDatabaseAccessor.updateStatus(slice); + // completed staging. A new task is produced when it comes to import + event.tryComplete(slice); + }); + } + else if (job.status == RestoreJobStatus.STAGED) + { + unzipAndImport(event, slice.stagedObjectPath().toFile(), + // persist status + () -> sliceDatabaseAccessor.updateStatus(slice)); + } + else + { + String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " + + "processing active slices. Found status: " + job.status; + Exception unexpectedState = new IllegalStateException(msg); + event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status", + slice, unexpectedState)); + } + } + else + { + downloadSliceAndImport(event); + } + }) .onFailure(cause -> { String msg = "Unable to ensure enough space for the slice. Retry later"; event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause)); @@ -184,6 +239,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> { RestoreJobFatalException ex = RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", slice, null); + event.tryFail(ex); CompletableFuture<File> failedFuture = new CompletableFuture<>(); failedFuture.completeExceptionally(ex); return failedFuture; @@ -223,6 +279,11 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> @VisibleForTesting void unzipAndImport(Promise<RestoreSlice> event, File file) + { + unzipAndImport(event, file, null); + } + + void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) { if (file == null) // the condition should never happen. Having it here for logic completeness { @@ -234,7 +295,21 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> unzip(file) .compose(this::validateFiles) .compose(this::commit) - .onSuccess(x -> event.tryComplete(slice)) + .compose(x -> { + if (onSuccessCommit == null) + { + return Future.succeededFuture(); + } + + return executorPool.executeBlocking(promise -> { + onSuccessCommit.run(); + promise.tryComplete(); + }); + }) + .onSuccess(x -> { + slice.completeImportPhase(); + event.tryComplete(slice); + }) .onFailure(failure -> { logWarnIfHasHttpExceptionCauseOnCommit(failure, slice); event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. " @@ -248,24 +323,33 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> if (failOnCancelled(promise)) return; - if (!zipFile.exists()) - { - promise.tryFail(new RestoreJobException("Object not found from disk. File: " + zipFile)); - return; - } - // targetPathInStaging points to the directory named after uploadId // SSTableImporter expects the file system structure to be uploadId/keyspace/table/sstables - File targetDir = slice.targetPathInStaging() + File targetDir = slice.stageDirectory() .resolve(slice.keyspace()) .resolve(slice.table()) .toFile(); - if (!targetDir.mkdirs()) + + boolean targetDirExist = targetDir.isDirectory(); + + if (!zipFile.exists()) { - LOGGER.warn("Error occurred while creating directory for holding SSTables for SSTableImporter"); + if (targetDirExist) + { + LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task?"); + promise.complete(targetDir); + } + else + { + promise.tryFail(new RestoreJobException("Object not found from disk. File: " + zipFile)); + } + // return early + return; } + try { + Files.createDirectories(targetDir.toPath()); // Remove all existing files under the target directory // The validation step later expects only the files registered in the manifest. RestoreJobUtil.cleanDirectory(targetDir.toPath()); @@ -275,7 +359,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> // Then, delete the downloaded zip file if (!zipFile.delete()) { - LOGGER.warn("Error while deleting file {}, please note for space wastage", + LOGGER.warn("File deletion attempt failed. file={}", zipFile.getAbsolutePath()); } } @@ -383,7 +467,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key()); - SSTableImportOptions options = job.importOptions; + SSTableImportOptions options = slice.job().importOptions; SSTableImporter.ImportOptions importOptions = new SSTableImporter.ImportOptions.Builder() .host(slice.owner().host()) .keyspace(slice.keyspace()) diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java index 621c1dc..f7825f6 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java @@ -152,7 +152,7 @@ public class StorageClient .bucket(slice.bucket()) .key(slice.key()) .build(); - Path objectPath = slice.targetPathInStaging().resolve(slice.key()); + Path objectPath = slice.stagedObjectPath(); File object = objectPath.toFile(); if (object.exists()) { diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java index 1c190d9..28ecd8b 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java @@ -81,9 +81,11 @@ public class AbortRestoreJobHandler extends AbstractHandler<String> restoreJobDatabaseAccessor.abort(job.jobId); restoreJobManagerGroup.signalRefreshRestoreJob(); - return Future.succeededFuture(); + return Future.succeededFuture(job); }) - .onSuccess(ignored -> { + .onSuccess(job -> { + logger.info("Successfully aborted restore job. job={}, remoteAddress={}, instance={}", + job, remoteAddress, host); stats.captureFailedJob(); context.response().setStatusCode(HttpResponseStatus.OK.code()).end(); }) diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java index d6a36fb..5820a9b 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java @@ -86,7 +86,7 @@ public class CreateRestoreJobHandler extends AbstractHandler<CreateRestoreJobReq @Override protected CreateRestoreJobRequestPayload extractParamsOrThrow(RoutingContext context) { - String bodyString = context.getBodyAsString(); + String bodyString = context.body().asString(); if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json encoder writes null as "null" { logger.warn("Bad request to create restore job. Received null payload."); diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java index e7c005a..7269ef6 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java @@ -98,7 +98,7 @@ public class CreateRestoreSliceHandler extends AbstractHandler<CreateSliceReques .qualifiedTableName(tableName) .createSliceRequestPayload(request) .ownerInstance(instance) - .targetPathInStaging(Paths.get(instance.stagingDir()), uploadId) + .stageDirectory(Paths.get(instance.stagingDir()), uploadId) .replicaStatus(Collections.singletonMap(String.valueOf(instance.id()), RestoreSliceStatus.COMMITTING)) .replicas(Collections.singleton(String.valueOf(instance.id()))) diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java index 6f8add6..9e2a5e7 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java @@ -30,7 +30,6 @@ import io.vertx.core.json.DecodeException; import io.vertx.core.json.Json; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.common.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.UpdateRestoreJobRequestPayload; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -77,10 +76,9 @@ public class UpdateRestoreJobHandler extends AbstractHandler<UpdateRestoreJobReq SocketAddress remoteAddress, UpdateRestoreJobRequestPayload requestPayload) { - QualifiedTableName qualifiedTableName = qualifiedTableName(context); - - RoutingContextUtils.getAsFuture(context, SC_RESTORE_JOB) - .compose(job -> { + RoutingContextUtils + .getAsFuture(context, SC_RESTORE_JOB) + .compose(job -> { if (RestoreJobStatus.isFinalState(job.status)) { // skip the update, since the job is in the final state already @@ -91,12 +89,11 @@ public class UpdateRestoreJobHandler extends AbstractHandler<UpdateRestoreJobReq return executorPools.service().<RestoreJob>executeBlocking(promise -> { promise.complete(restoreJobDatabaseAccessor.update(requestPayload, - qualifiedTableName, job.jobId)); }); }) - .onSuccess(job -> { - logger.info("Successfully update restore job. job={}, request={}, remoteAddress={}, instance={}", + .onSuccess(job -> { + logger.info("Successfully updated restore job. job={}, request={}, remoteAddress={}, instance={}", job, requestPayload, remoteAddress, host); if (job.status == RestoreJobStatus.SUCCEEDED) { @@ -115,13 +112,13 @@ public class UpdateRestoreJobHandler extends AbstractHandler<UpdateRestoreJobReq restoreJobManagerGroup.signalRefreshRestoreJob(); context.response().setStatusCode(HttpResponseStatus.OK.code()).end(); }) - .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestPayload)); + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestPayload)); } @Override protected UpdateRestoreJobRequestPayload extractParamsOrThrow(RoutingContext context) { - String bodyString = context.getBodyAsString(); + String bodyString = context.body().asString(); if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json encoder writes null as "null" { logger.warn("Bad request to update restore job. Received null payload."); diff --git a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java index abe6673..a1b4442 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java @@ -58,11 +58,11 @@ class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); long expiresAtMillis = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); UUID jobId = UUIDs.timeBased(); - accessor.create(CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) - .jobId(jobId) - .jobAgent("agent") - .build(), - qualifiedTableName); + CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) + .jobId(jobId) + .jobAgent("agent") + .build(); + accessor.create(payload, qualifiedTableName); List<RestoreJob> foundJobs = accessor.findAllRecent(3); assertThat(foundJobs).hasSize(1); @@ -70,7 +70,7 @@ class RestoreJobsDatabaseAccessorIntTest extends IntegrationTestBase assertJob(accessor.find(jobId), jobId, RestoreJobStatus.CREATED, expiresAtMillis, secrets); UpdateRestoreJobRequestPayload markSucceeded = new UpdateRestoreJobRequestPayload(null, null, RestoreJobStatus.SUCCEEDED, null); - accessor.update(markSucceeded, qualifiedTableName, jobId); + accessor.update(markSucceeded, jobId); assertJob(accessor.find(jobId), jobId, RestoreJobStatus.SUCCEEDED, expiresAtMillis, secrets); } diff --git a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java index 9d4e0d5..c1c0187 100644 --- a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java +++ b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java @@ -19,9 +19,16 @@ package org.apache.cassandra.testing; import java.io.IOException; +import java.net.BindException; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.utils.Throwables; /** * A Cassandra Test Context implementation that allows advanced cluster configuration before cluster creation @@ -29,6 +36,8 @@ import org.apache.cassandra.distributed.UpgradeableCluster; */ public class ConfigurableCassandraTestContext extends AbstractCassandraTestContext { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurableCassandraTestContext.class); + public static final String BUILT_CLUSTER_CANNOT_BE_CONFIGURED_ERROR = "Cannot configure a cluster after it is built. Please set the buildCluster annotation attribute to false, " + "and do not call `getCluster` before calling this method."; @@ -57,9 +66,31 @@ public class ConfigurableCassandraTestContext extends AbstractCassandraTestConte public UpgradeableCluster configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator) throws IOException { - cluster = configureCluster(configurator); - cluster.startup(); - return cluster; + RuntimeException exception = null; + for (int i = 0; i < 3; i++) + { + try + { + cluster = configureCluster(configurator); + cluster.startup(); + return cluster; + } + catch (RuntimeException ret) + { + exception = ret; + boolean addressAlreadyInUse = Throwables.anyCauseMatches(exception, this::portNotAvailableToBind); + if (addressAlreadyInUse) + { + LOGGER.warn("Failed to provision cluster after {} retries", i, exception); + } + else + { + throw exception; + } + + } + } + throw exception; } @Override @@ -70,4 +101,10 @@ public class ConfigurableCassandraTestContext extends AbstractCassandraTestConte + ", builder=" + builder + '}'; } + + private boolean portNotAvailableToBind(Throwable ex) + { + return ex instanceof BindException && + StringUtils.contains(ex.getMessage(), "Address already in use"); + } } diff --git a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java index b5d4940..eb20157 100644 --- a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import com.datastax.driver.core.utils.UUIDs; +import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; @@ -51,6 +52,21 @@ public class RestoreJobTest return builder.build(); } + public static RestoreJob createUpdatedJob(UUID jobId, String jobAgent, + RestoreJobStatus status, + RestoreJobSecrets secrets, + Date expireAt) + throws DataObjectMappingException + { + RestoreJob.Builder builder = RestoreJob.builder(); + builder.createdAt(RestoreJob.toLocalDate(jobId)) + .jobId(jobId).jobAgent(jobAgent) + .jobStatus(status) + .jobSecrets(secrets) + .expireAt(expireAt); + return builder.build(); + } + @Test void testDefaultImportOptionsWhenNotSetInDb() { diff --git a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java index 86db504..b5c93de 100644 --- a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java @@ -19,11 +19,15 @@ package org.apache.cassandra.sidecar.db; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -116,7 +120,9 @@ public class SidecarSchemaTest sidecarSchema.startSidecarSchemaInitializer(); context.verify(() -> { int maxWaitTime = 20; // about 10 seconds - while (interceptedExecStmts.size() < 1 || !sidecarSchema.isInitialized()) + while (interceptedPrepStmts.size() < 10 + || interceptedExecStmts.size() < 3 + || !sidecarSchema.isInitialized()) { if (maxWaitTime-- <= 0) { @@ -129,6 +135,51 @@ public class SidecarSchemaTest assertEquals(3, interceptedExecStmts.size()); assertTrue(interceptedExecStmts.get(0).contains("CREATE KEYSPACE IF NOT EXISTS sidecar_internal"), "Create keyspace should be executed the first"); + assertTrue(hasElementContains(interceptedExecStmts, + "CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v2"), + "Create table should be executed the next for job table"); + assertTrue(hasElementContains(interceptedExecStmts, + "CREATE TABLE IF NOT EXISTS sidecar_internal.restore_slice_v2"), + "Create table should be executed the next for slice table"); + + List<String> expectedPrepStatements = Arrays.asList( + "INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, keyspace_name, table_name, " + + "job_agent, status, blob_secrets, import_options, consistency_level, expire_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + + "INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, blob_secrets) VALUES (?, ? ,?)", + + "INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, status) VALUES (?, ?, ?)", + + "INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, job_agent) VALUES (?, ?, ?)", + + "INSERT INTO sidecar_internal.restore_job_v2 ( created_at, job_id, expire_at) VALUES (?, ?, ?)", + + "SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " + + "consistency_level, expire_at FROM sidecar_internal.restore_job_v2 WHERE created_at = ? AND job_id = ?", + + "SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " + + "consistency_level, expire_at FROM sidecar_internal.restore_job_v2 WHERE created_at = ?", + + "INSERT INTO sidecar_internal.restore_slice_v2 ( job_id, bucket_id, slice_id, bucket, key, " + + "checksum, start_token, end_token, compressed_size, uncompressed_size, status_by_replica, " + + "all_replicas) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + + "SELECT job_id, bucket_id, slice_id, bucket, key, checksum, start_token, end_token, compressed_size, " + + "uncompressed_size, status_by_replica, all_replicas FROM sidecar_internal.restore_slice_v2 " + + "WHERE job_id = ? AND bucket_id = ? AND end_token >= ? AND start_token < ? ALLOW FILTERING", + + "UPDATE sidecar_internal.restore_slice_v2 SET status_by_replica = status_by_replica + ?, " + + "all_replicas = all_replicas + ? WHERE job_id = ? AND bucket_id = ? AND start_token = ? AND slice_id = ?" + ); + + Set<String> expected = new HashSet<>(expectedPrepStatements); + Set<String> actual = new HashSet<>(interceptedPrepStmts); + Set<String> notInExpected = Sets.difference(actual, expected); + assertEquals(expected.size(), actual.size(), "Number of prepared statements should match"); + assertTrue(notInExpected.isEmpty(), + "Found the following statements that not in expected: " + notInExpected); + assertTrue(sidecarSchema.isInitialized()); context.completeNow(); }); diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java index 8d5abee..6efc192 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java @@ -38,13 +38,15 @@ import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor; -import org.apache.cassandra.sidecar.db.RestoreJobTest; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.stats.TestRestoreJobStats; import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; import org.mockito.ArgumentCaptor; +import static org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob; +import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; @@ -57,18 +59,20 @@ class RestoreJobDiscovererTest private static final long idleLoopDelay = 2000; private static final int recencyDays = 10; private final TestRestoreJobStats stats = new TestRestoreJobStats(); - private RestoreJobDatabaseAccessor mockJobAccessor = mock(RestoreJobDatabaseAccessor.class); - private RestoreSliceDatabaseAccessor mockSliceAccessor = mock(RestoreSliceDatabaseAccessor.class); - private RestoreJobManagerGroup mockManagers = mock(RestoreJobManagerGroup.class); - private PeriodicTaskExecutor executor = mock(PeriodicTaskExecutor.class); - private SidecarSchema sidecarSchema = mock(SidecarSchema.class); - private RestoreJobDiscoverer loop = new RestoreJobDiscoverer(testConfig(), - sidecarSchema, - mockJobAccessor, - mockSliceAccessor, - () -> mockManagers, - null, - stats); + private final RestoreJobDatabaseAccessor mockJobAccessor = mock(RestoreJobDatabaseAccessor.class); + private final RestoreSliceDatabaseAccessor mockSliceAccessor = mock(RestoreSliceDatabaseAccessor.class); + private final RestoreJobManagerGroup mockManagers = mock(RestoreJobManagerGroup.class); + private final LocalTokenRangesProvider rangesProvider = mock(LocalTokenRangesProvider.class); + private final PeriodicTaskExecutor executor = mock(PeriodicTaskExecutor.class); + private final SidecarSchema sidecarSchema = mock(SidecarSchema.class); + private final RestoreJobDiscoverer loop = new RestoreJobDiscoverer(testConfig(), + sidecarSchema, + mockJobAccessor, + mockSliceAccessor, + () -> mockManagers, + rangesProvider, + null, + stats); @Test void testGetDelay() @@ -78,9 +82,13 @@ class RestoreJobDiscovererTest // when there is active restore job (status: CREATED) UUID jobId = UUIDs.timeBased(); when(mockJobAccessor.findAllRecent(anyInt())) - .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId, "agent", - RestoreJobStatus.CREATED, null, - new Date(System.currentTimeMillis() + 10000L)))); + .thenReturn(Collections.singletonList(RestoreJob.builder() + .createdAt(RestoreJob.toLocalDate(jobId)) + .jobId(jobId) + .jobAgent("agent") + .jobStatus(RestoreJobStatus.CREATED) + .expireAt(new Date(System.currentTimeMillis() + 10000L)) + .build())); loop.registerPeriodicTaskExecutor(executor); executeBlocking(); assertThat(stats.activeJobCount).describedAs("active jobs count is updated") @@ -88,9 +96,13 @@ class RestoreJobDiscovererTest assertThat(loop.delay()).isEqualTo(activeLoopDelay); // when no more jobs are active, the delay is reset back to idle loop delay accordingly. when(mockJobAccessor.findAllRecent(anyInt())) - .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId, "agent", - RestoreJobStatus.SUCCEEDED, null, - new Date(System.currentTimeMillis() + 10000L)))); + .thenReturn(Collections.singletonList(RestoreJob.builder() + .createdAt(RestoreJob.toLocalDate(jobId)) + .jobId(jobId) + .jobAgent("agent") + .jobStatus(RestoreJobStatus.SUCCEEDED) + .expireAt(new Date(System.currentTimeMillis() + 10000L)) + .build())); executeBlocking(); assertThat(stats.activeJobCount).describedAs("active jobs count is updated") .isZero(); @@ -112,11 +124,11 @@ class RestoreJobDiscovererTest UUID newJobId = UUIDs.timeBased(); UUID failedJobId = UUIDs.timeBased(); UUID succeededJobId = UUIDs.timeBased(); - mockResult.add(RestoreJobTest.createNewTestingJob(newJobId)); - mockResult.add(RestoreJob.forUpdates(failedJobId, "agent", RestoreJobStatus.ABORTED, null, - new Date(System.currentTimeMillis() + 10000L))); - mockResult.add(RestoreJob.forUpdates(succeededJobId, "agent", RestoreJobStatus.SUCCEEDED, null, - new Date(System.currentTimeMillis() + 10000L))); + mockResult.add(createNewTestingJob(newJobId)); + mockResult.add(createUpdatedJob(failedJobId, "agent", RestoreJobStatus.ABORTED, null, + new Date(System.currentTimeMillis() + 10000L))); + mockResult.add(createUpdatedJob(succeededJobId, "agent", RestoreJobStatus.SUCCEEDED, null, + new Date(System.currentTimeMillis() + 10000L))); ArgumentCaptor<UUID> jobIdCapture = ArgumentCaptor.forClass(UUID.class); doNothing().when(mockManagers).removeJobInternal(jobIdCapture.capture()); when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult); @@ -144,8 +156,8 @@ class RestoreJobDiscovererTest // Execution 2 when(mockJobAccessor.findAllRecent(anyInt())) - .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId, "agent", - RestoreJobStatus.SUCCEEDED, null, new Date()))); + .thenReturn(Collections.singletonList(createUpdatedJob(newJobId, "agent", + RestoreJobStatus.SUCCEEDED, null, new Date()))); executeBlocking(); assertThat(stats.activeJobCount).isZero(); @@ -164,7 +176,7 @@ class RestoreJobDiscovererTest loop.signalRefresh(); UUID newJobId2 = UUIDs.timeBased(); when(mockJobAccessor.findAllRecent(anyInt())) - .thenReturn(Collections.singletonList(RestoreJobTest.createNewTestingJob(newJobId2))); + .thenReturn(Collections.singletonList(createNewTestingJob(newJobId2))); assertThat(loop.isRefreshSignaled()).isTrue(); executeBlocking(); @@ -179,8 +191,8 @@ class RestoreJobDiscovererTest // Execution 5 when(mockJobAccessor.findAllRecent(anyInt())) - .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId2, "agent", - RestoreJobStatus.ABORTED, null, new Date()))); + .thenReturn(Collections.singletonList(createUpdatedJob(newJobId2, "agent", + RestoreJobStatus.ABORTED, null, new Date()))); executeBlocking(); assertThat(stats.activeJobCount).isZero(); @@ -200,16 +212,16 @@ class RestoreJobDiscovererTest } @Test - void testExecuteWithExpiredJobs() throws Exception + void testExecuteWithExpiredJobs() { // setup: all 3 jobs are expired. All of them should be aborted via mockJobAccessor when(sidecarSchema.isInitialized()).thenReturn(true); - List<RestoreJob> mockResult - = IntStream.range(0, 3) - .boxed() - .map(x -> RestoreJob.forUpdates(UUIDs.timeBased(), "agent", RestoreJobStatus.CREATED, null, - new Date(System.currentTimeMillis() - 1000L))) - .collect(Collectors.toList()); + List<RestoreJob> mockResult = IntStream.range(0, 3) + .boxed() + .map(x -> createUpdatedJob(UUIDs.timeBased(), "agent", + RestoreJobStatus.CREATED, null, + new Date(System.currentTimeMillis() - 1000L))) + .collect(Collectors.toList()); ArgumentCaptor<UUID> abortedJobs = ArgumentCaptor.forClass(UUID.class); doNothing().when(mockJobAccessor).abort(abortedJobs.capture()); when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult); diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java index 36556a2..3d20810 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java @@ -245,11 +245,16 @@ class RestoreJobManagerTest private RestoreSlice getTestSlice(RestoreJob job) { + InstanceMetadata owner = mock(InstanceMetadata.class); + when(owner.id()).thenReturn(1); RestoreSlice slice = RestoreSlice .builder() .jobId(job.jobId) .bucketId((short) 0) - .targetPathInStaging(testDir, "uploadId") + .stageDirectory(testDir, "uploadId") + .storageKey("storageKey") + .storageBucket("storageBucket") + .ownerInstance(owner) .replicaStatus(Collections.emptyMap()) .replicas(Collections.emptySet()) .build(); diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java index f7761c5..83ddc70 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java @@ -175,10 +175,11 @@ class RestoreProcessorTest RestoreSlice slice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS); when(slice.jobId()).thenReturn(UUIDs.timeBased()); when(slice.owner().id()).thenReturn(1); - when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any())).thenReturn(promise -> { + when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any())).thenReturn(promise -> { Uninterruptibles.awaitUninterruptibly(latch); promise.complete(slice); }); + when(slice.hasImported()).thenReturn(true); return slice; } } diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java index 8353f4f..9598ad2 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java @@ -19,12 +19,15 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Guice; @@ -36,7 +39,9 @@ import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.ExecutorPools.TaskExecutorPool; import org.apache.cassandra.sidecar.db.RestoreJob; +import org.apache.cassandra.sidecar.db.RestoreJobTest; import org.apache.cassandra.sidecar.db.RestoreSlice; +import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.exceptions.RestoreJobException; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.server.MainModule; @@ -48,33 +53,30 @@ import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import static org.apache.cassandra.sidecar.AssertionUtils.getBlocking; -import static org.apache.cassandra.sidecar.db.RestoreJob.toLocalDate; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class RestoreSliceTaskTest { - private RestoreJob restoreJob; private RestoreSlice restoreSlice; private StorageClient storageClient; private TaskExecutorPool executorPool; private SSTableImporter importer; private TestRestoreJobStats stats; private RestoreSliceTask task; + private RestoreSliceDatabaseAccessor sliceDatabaseAccessor; @BeforeEach void setup() { - UUID jobId = UUIDs.timeBased(); - restoreJob = RestoreJob.builder() - .jobId(jobId) - .createdAt(toLocalDate(jobId)) - .jobStatus(RestoreJobStatus.CREATED) - .build(); restoreSlice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS); - when(restoreSlice.targetPathInStaging()).thenReturn(Paths.get(".")); + when(restoreSlice.stageDirectory()).thenReturn(Paths.get(".")); when(restoreSlice.sliceId()).thenReturn("testing-slice"); when(restoreSlice.key()).thenReturn("storage-key"); when(restoreSlice.owner().id()).thenReturn(1); @@ -83,8 +85,10 @@ class RestoreSliceTaskTest Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule())); executorPool = injector.getInstance(ExecutorPools.class).internal(); stats = new TestRestoreJobStats(); - task = new TestRestoreSliceTask(restoreJob, restoreSlice, storageClient, - executorPool, importer, 0, stats); + sliceDatabaseAccessor = mock(RestoreSliceDatabaseAccessor.class); + task = new TestRestoreSliceTask(restoreSlice, storageClient, + executorPool, importer, 0, + sliceDatabaseAccessor, stats); } @Test @@ -155,26 +159,101 @@ class RestoreSliceTaskTest .hasMessageContaining("Object not found"); } + @Test + void testSliceStaging() + { + // test specific setup + RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED)); + doReturn(true).when(job).isManagedBySidecar(); + when(restoreSlice.job()).thenReturn(job); + when(restoreSlice.stagedObjectPath()).thenReturn(Paths.get("nonexist")); + when(storageClient.objectExists(restoreSlice)).thenReturn(CompletableFuture.completedFuture(null)); + when(storageClient.downloadObjectIfAbsent(restoreSlice)) + .thenReturn(CompletableFuture.completedFuture(new File("."))); + + Promise<RestoreSlice> promise = Promise.promise(); + task.handle(promise); + getBlocking(promise.future()); // no error is thrown + + verify(restoreSlice, times(1)).completeStagePhase(); + verify(restoreSlice, times(0)).completeImportPhase(); // should not be called in this phase + verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice); + } + + @Test + void testSliceStagingWithExistingObject(@TempDir Path testFolder) throws IOException + { + // test specific setup + RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED)); + doReturn(true).when(job).isManagedBySidecar(); + when(restoreSlice.job()).thenReturn(job); + Path stagedPath = testFolder.resolve("slice.zip"); + Files.createFile(stagedPath); + when(restoreSlice.stagedObjectPath()).thenReturn(stagedPath); + when(storageClient.objectExists(restoreSlice)) + .thenThrow(new RuntimeException("Should not call this method")); + when(storageClient.downloadObjectIfAbsent(restoreSlice)) + .thenThrow(new RuntimeException("Should not call this method")); + + Promise<RestoreSlice> promise = Promise.promise(); + task.handle(promise); + getBlocking(promise.future()); // no error is thrown + + verify(restoreSlice, times(1)).completeStagePhase(); + verify(restoreSlice, times(0)).completeImportPhase(); // should not be called in this phase + verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice); + } + + @Test + void testSliceImport() + { + // test specific setup + RestoreJob job = spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED)); + doReturn(true).when(job).isManagedBySidecar(); + when(restoreSlice.job()).thenReturn(job); + + Promise<RestoreSlice> promise = Promise.promise(); + task.handle(promise); + getBlocking(promise.future()); // no error is thrown + + verify(restoreSlice, times(0)).completeStagePhase(); // should not be called in the phase + verify(restoreSlice, times(1)).completeImportPhase(); + verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice); + } + + static class TestRestoreSliceTask extends RestoreSliceTask { private final RestoreSlice slice; private final RestoreJobStats stats; - public TestRestoreSliceTask(RestoreJob job, RestoreSlice slice, StorageClient s3Client, - TaskExecutorPool executorPool, SSTableImporter importer, - double requiredUsableSpacePercentage, RestoreJobStats stats) + public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool, + SSTableImporter importer, double requiredUsableSpacePercentage, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats) { - super(job, slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, stats); + super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats); this.slice = slice; this.stats = stats; } - void unzipAndImport(Promise<RestoreSlice> event, File file) + @Override + void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) { stats.captureSliceUnzipTime(1, 123L); stats.captureSliceValidationTime(1, 123L); stats.captureSliceImportTime(1, 123L); + slice.completeImportPhase(); event.tryComplete(slice); + if (onSuccessCommit != null) + { + onSuccessCommit.run(); + } + } + + @Override + void unzipAndImport(Promise<RestoreSlice> event, File file) + { + unzipAndImport(event, file, null); } } } diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java index 7ef7e48..52f19fd 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java @@ -45,7 +45,7 @@ class RestoreSliceTest .startToken(BigInteger.ONE).endToken(BigInteger.valueOf(2)) .replicaStatus(Collections.singletonMap("replica1", RestoreSliceStatus.COMMITTING)) .replicas(Collections.singleton("replica1")) - .targetPathInStaging(path, "uploadId") + .stageDirectory(path, "uploadId") .build(); RestoreSlice slice2 = slice1.unbuild().build(); assertThat(slice1).isEqualTo(slice2); diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java index 2453a96..7047122 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java @@ -184,7 +184,6 @@ public abstract class BaseRestoreJobTests @Override public RestoreJob update(UpdateRestoreJobRequestPayload payload, - QualifiedTableName qualifiedTableName, UUID jobId) { return updateFunc.apply(payload); diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java index c62d4ae..24fd12f 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java @@ -52,10 +52,14 @@ class RestoreJobSummaryHandlerTest extends BaseRestoreJobTests mockLookupRestoreJob(x -> { UUID id = UUID.fromString(jobId); // keyspace name is different - - return RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id)), id, - "ks", "table", "job agent", - RestoreJobStatus.CREATED, SECRETS, SSTableImportOptions.defaults()); + return RestoreJob.builder() + .createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id))) + .jobId(id).jobAgent("job agent") + .keyspace("ks").table("table") + .jobStatus(RestoreJobStatus.CREATED) + .jobSecrets(SECRETS) + .sstableImportOptions(SSTableImportOptions.defaults()) + .build(); }); sendGetRestoreJobSummaryRequestAndVerify("ks", "table", jobId, context, HttpResponseStatus.OK.code()); } @@ -80,8 +84,12 @@ class RestoreJobSummaryHandlerTest extends BaseRestoreJobTests String jobId = "7cd82ff9-d276-11ed-93e5-7fce0df1306f"; mockLookupRestoreJob(x -> { // keyspace name is different - return RestoreJob.create(null, UUID.fromString(jobId), "ks", - "table", null, RestoreJobStatus.CREATED, null, null); + return RestoreJob.builder() + .createdAt(null) + .jobId(UUID.fromString(jobId)) + .keyspace("ks").table("table") + .jobStatus(RestoreJobStatus.CREATED) + .build(); }); sendGetRestoreJobSummaryRequestAndVerify("ks1", "table", "7cd82ff9-d276-11ed-93e5-7fce0df1306f", context, HttpResponseStatus.NOT_FOUND.code()); @@ -111,9 +119,12 @@ class RestoreJobSummaryHandlerTest extends BaseRestoreJobTests { mockLookupRestoreJob(x -> { UUID jobId = UUID.fromString("7cd82ff9-d276-11ed-93e5-7fce0df1306f"); - return RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId)), jobId, - "ks", "table", "job agent", - RestoreJobStatus.CREATED, null, null); + return RestoreJob.builder() + .createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId))) + .jobId(jobId).jobAgent("job agent") + .keyspace("ks").table("table") + .jobStatus(RestoreJobStatus.CREATED) + .build(); }); sendGetRestoreJobSummaryRequestAndVerify("ks", "table", "7cd82ff9-d276-11ed-93e5-7fce0df1306f", context, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java index c0bbe26..0a3cf93 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java @@ -143,8 +143,14 @@ class UpdateRestoreJobHandlerTest extends BaseRestoreJobTests private RestoreJob createTestNewJob(String jobId) { - return RestoreJob.create(null, UUID.fromString(jobId), "ks", "table", - "agent", RestoreJobStatus.SUCCEEDED, SECRETS, SSTableImportOptions.defaults()); + return RestoreJob.builder() + .jobId(UUID.fromString(jobId)) + .keyspace("ks").table("table") + .jobAgent("agent") + .jobStatus(RestoreJobStatus.SUCCEEDED) + .jobSecrets(SECRETS) + .sstableImportOptions(SSTableImportOptions.defaults()) + .build(); } private JsonObject getRequestPayload() diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java new file mode 100644 index 0000000..fc4d354 --- /dev/null +++ b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.AssertionUtils; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; +import org.apache.cassandra.sidecar.exceptions.InsufficientStorageException; +import org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.FileStoreProps; + +import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage; +import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.fileStoreProps; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AsyncFileSystemUtilsTest +{ + private ExecutorPools executorPools; + + @BeforeEach + void setup() + { + executorPools = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()); + } + + @AfterEach + void teardown() + { + executorPools.close(); + } + + @Test + void testReadFileStoreProps() + { + FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".", executorPools.internal())); + assertThat(props.name).isNotBlank(); + + long total = props.totalSpace; + long usable = props.usableSpace; + long unallocated = props.unallocatedSpace; + assertThat(total) + .isGreaterThan(usable) + .isGreaterThan(unallocated) + .isGreaterThan(0L); + + assertThat(unallocated) + .isGreaterThanOrEqualTo(usable) + .isGreaterThan(0L); + + assertThat(usable).isGreaterThan(0L); + } + + @Test + void testEnsureSufficientStorage() throws Exception + { + // this check should pass (hopefully), as the required usable percentage is 0.0001 + AssertionUtils.getBlocking(ensureSufficientStorage(".", 0L, 0.0001, executorPools.internal())); + + // requesting half of the usable space should pass + FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".", executorPools.internal())); + AssertionUtils.getBlocking(ensureSufficientStorage(".", props.usableSpace / 2, + 0, executorPools.internal())); + + assertThatThrownBy(() -> AssertionUtils.getBlocking(ensureSufficientStorage(".", Long.MAX_VALUE, + 0.0001, + executorPools.internal()))) + .describedAs("Request Long.MAX_VALUE on the local file store should fail") + .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class) + .hasMessageContaining("FileStore has insufficient space"); + + assertThatThrownBy(() -> AssertionUtils.getBlocking(ensureSufficientStorage(".", 123L, + 1.0, executorPools.internal()))) + .describedAs("Require 100% usable disk of the local file store should fail") + .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class) + .hasMessageContaining("FileStore has insufficient space"); + } + + @Test + void testEnsureSufficientStorageWithNonexistingFilePath() + { + // The input path `./non-existing + uuid` does not exist. + // `ensureSufficientStorage` should navigate to parent paths until finding an existing path + // to be used for checking + // The test expects no exception is thrown + AssertionUtils.getBlocking(ensureSufficientStorage("./non-existing" + UUID.randomUUID(), 0L, + 0.0001, executorPools.internal())); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org