nvharikrishna commented on code in PR #231: URL: https://github.com/apache/cassandra-sidecar/pull/231#discussion_r2211089247
########## server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.adapters.base.RepairOptions; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.config.RepairConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link OperationalJob} to perform repair operation. + */ +public class RepairJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RepairJob.class); + private static final String OPERATION = "repair"; + private static final String PREVIEW_KIND_REPAIRED = "REPAIRED"; + + private final RepairRequestParam repairParams; + private final Vertx vertx; + private final RepairConfiguration repairConfiguration; + protected StorageOperations storageOperations; + + /** + * Enum representing the status of a parent repair session + */ + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param vertx + * @param repairConfiguration + * @param jobId UUID representing the Job to be created + * @param storageOps + * @param repairParams + */ + public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams) + { + super(jobId); + this.vertx = vertx; + this.repairConfiguration = repairConfiguration; + this.storageOperations = storageOps; + this.repairParams = repairParams; + } + + @Override + public boolean isRunningOnCassandra() + { + // TODO: Leverage repair vtables to fail-fast on conflicting repairs (overlapping token-ranges or replica-sets) + // Currently does not check for concurrent repairs + return false; + } + + @Override + protected void executeInternal() + { + Map<String, String> options = generateRepairOptions(repairParams.requestpayload()); + String keyspace = repairParams.keyspace().name(); + + LOGGER.info("Executing repair operation for keyspace {} jobId={} maxRuntime={}", + keyspace, this.jobId(), repairConfiguration.maxRepairJobRuntimeMillis()); + + int cmd = storageOperations.repair(keyspace, options); + if (cmd <= 0) + { + // repairAsync can only return 0 for replication factor 1. + LOGGER.info("Replication factor is 1. No repair is needed for keyspace '{}'", keyspace); + } + else + { + // complete the max wait time promise either when exceeding the wait time, or the result is available + Promise<Boolean> maxWaitTimePromise = Promise.promise(); + vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d -> { + LOGGER.info("Timer Poll"); + maxWaitTimePromise.tryComplete(true); + }); + + // Promise for completion of repair operation + Promise<Void> promise = Promise.promise(); + Future<Void> resultFut = promise.future(); + + // main event loop checks periodically (10s) for completion + vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), id -> queryForCompletedRepair(promise, cmd)); + resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false)); + Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future(); + + Future<Void> compositeFut = Future.any(maxWaitTimeFut, resultFut) + // If this lambda below is evaluated, either one of the futures have completed; + // In either case, the future corresponding to the job execution is returned + .compose(f -> { + LOGGER.info("One of the futures ended waitStatus={} resultStatus={}", + maxWaitTimeFut.isComplete(), resultFut.isComplete()); + boolean isTimeout = (maxWaitTimeFut.succeeded()) ? maxWaitTimeFut.result() : false; + if (isTimeout) + { + LOGGER.error("Timer ran out before the repair job completed. Repair took too long"); + // TODO: Cancel repair? (Nice to have) + // We free up the thread (job fails) and stop polling for completion + return Future.failedFuture("Repair job taking too long"); + } + // otherwise, the result of the job is available + return resultFut; + }); + try + { + compositeFut.toCompletionStage().toCompletableFuture().get(); // wait + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } + + private Map<String, String> generateRepairOptions(RepairPayload repairPayload) + { + Map<String, String> options = new HashMap<>(); + + List<String> tables = repairPayload.tables(); + if (tables != null && !tables.isEmpty()) + { + options.put(RepairOptions.COLUMNFAMILIES.getValue(), String.join(",", tables)); + } + + Boolean isPrimaryRange = repairPayload.isPrimaryRange(); + if (isPrimaryRange != null) + { + options.put(RepairOptions.PRIMARY_RANGE.getValue(), String.valueOf(isPrimaryRange)); + } + // TODO: Verify use-cases involving multiple DCs + + String dc = repairPayload.datacenter(); + if (dc != null) + { + options.put(RepairOptions.DATACENTERS.getValue(), dc); + } + + List<String> hosts = repairPayload.hosts(); + if (hosts != null && !hosts.isEmpty()) + { + options.put(RepairOptions.HOSTS.getValue(), String.join(",", requireNonNull(hosts))); + } + + if (repairPayload.startToken() != null && repairPayload.endToken() != null) + { + options.put(RepairOptions.RANGES.getValue(), repairPayload.startToken() + ":" + repairPayload.endToken()); + } + + if (repairPayload.repairType() == RepairPayload.RepairType.INCREMENTAL) + { + options.put(RepairOptions.INCREMENTAL.getValue(), Boolean.TRUE.toString()); + } + + if (repairPayload.force() != null) + { + options.put(RepairOptions.FORCE_REPAIR.getValue(), String.valueOf(repairPayload.force())); + } + + if (repairPayload.isValidate() != null) + { + options.put(RepairOptions.PREVIEW.getValue(), PREVIEW_KIND_REPAIRED); + } + return options; + } + + private void queryForCompletedRepair(Promise<Void> promise, int cmd) + { + LOGGER.info("Polling repair operation for jobId={} status={}", this.jobId(), promise.future().isComplete()); + List<String> status = storageOperations.getParentRepairStatus(cmd); + String queriedString = "queried for parent session status and"; + if (status == null) + { + LOGGER.error("{} couldn't find repair status for cmd: {}", Review Comment: If the status is not available, most likely it won't be available later as well (should have been evicted or node got bounced etc...). Then, promise need to be failed for this case, right? ########## adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RepairOptions.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +/** + * Enum representing the repair options supported + */ +public enum RepairOptions +{ + /** + * Whether to repair only the primary range of the node (true/false) + */ + PRIMARY_RANGE("primaryRange"), + /** + * Whether to perform an incremental repair (true/false) + * If false, a full repair is performed + */ + INCREMENTAL("incremental"), + /** + * Specific token ranges to repair + */ + RANGES("ranges"), + /** + * List of column families (tables) to repair (comma-separated) + */ + COLUMNFAMILIES("columnFamilies"), + /** + * Restrict repair to specific data centers (comma-separated) + */ + DATACENTERS("dataCenters"), + /** + * Restrict repair to specific hosts (comma-separated IPs or hostnames) + */ + HOSTS("hosts"), + /** + * force the repair operation + */ + FORCE_REPAIR("forceRepair"), + /** + * Type of preview repair to run before actual repair + * Options: "none", "auto", "running", "full" + * Mainly used to assess data consistency without actually repairing + */ + PREVIEW("previewKind"); + + private final String value; + + RepairOptions(String value) + { + this.value = value; + } + + /** + * @return Value corresponding to the repair option + */ + public String getValue() Review Comment: ```suggestion public String optionName() ``` Does it sound better if it called as optionName instead of value? ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/RepairPayload.java: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; + +/** + * Request payload for a repair job + */ +@JsonDeserialize(builder = RepairPayload.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RepairPayload +{ + private static final String TABLES = "tables"; + private static final String IS_PRIMARY_RANGE = "primaryRange"; + private static final String DATACENTER = "datacenter"; + private static final String HOSTS = "hosts"; + private static final String START_TOKEN = "startToken"; + private static final String END_TOKEN = "endToken"; + private static final String REPAIR_TYPE = "repairType"; + private static final String FORCE = "force"; + private static final String VALIDATE = "validate"; + + private final List<String> tables; + private final Boolean isPrimaryRange; + private final String datacenter; + private final List<String> hosts; + private final Long startToken; + private final Long endToken; + private RepairType repairType; + private final Boolean force; + private final Boolean validate; + + /** + * Constructs a new {@link RepairPayload}. + */ + public RepairPayload() + { + this(builder()); + } + + public static RepairPayload.Builder builder() + { + return new RepairPayload.Builder(); + } + + /** + * Constructs a new {@link RepairPayload} from the configured {@link RepairPayload.Builder}. + * + * @param builder the builder used to create this object + */ + protected RepairPayload(RepairPayload.Builder builder) + { + tables = builder.tables; + repairType = builder.repairType; + isPrimaryRange = builder.isPrimaryRange; + datacenter = builder.datacenter; + hosts = builder.hosts; + startToken = builder.startToken; + endToken = builder.endToken; + force = builder.force; + validate = builder.validate; + } + + @JsonProperty(TABLES) + public List<String> tables() Review Comment: Can you add `@Nullable` to specify that it can be null explicitly? ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/RepairPayload.java: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; + +/** + * Request payload for a repair job + */ +@JsonDeserialize(builder = RepairPayload.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RepairPayload +{ + private static final String TABLES = "tables"; + private static final String IS_PRIMARY_RANGE = "primaryRange"; + private static final String DATACENTER = "datacenter"; + private static final String HOSTS = "hosts"; + private static final String START_TOKEN = "startToken"; + private static final String END_TOKEN = "endToken"; + private static final String REPAIR_TYPE = "repairType"; + private static final String FORCE = "force"; + private static final String VALIDATE = "validate"; + + private final List<String> tables; + private final Boolean isPrimaryRange; + private final String datacenter; + private final List<String> hosts; + private final Long startToken; + private final Long endToken; + private RepairType repairType; + private final Boolean force; + private final Boolean validate; + + /** + * Constructs a new {@link RepairPayload}. + */ + public RepairPayload() + { + this(builder()); + } + + public static RepairPayload.Builder builder() + { + return new RepairPayload.Builder(); + } + + /** + * Constructs a new {@link RepairPayload} from the configured {@link RepairPayload.Builder}. + * + * @param builder the builder used to create this object + */ + protected RepairPayload(RepairPayload.Builder builder) + { + tables = builder.tables; + repairType = builder.repairType; + isPrimaryRange = builder.isPrimaryRange; + datacenter = builder.datacenter; + hosts = builder.hosts; + startToken = builder.startToken; + endToken = builder.endToken; + force = builder.force; + validate = builder.validate; + } + + @JsonProperty(TABLES) + public List<String> tables() + { + return tables; + } + + @JsonProperty(IS_PRIMARY_RANGE) + public Boolean isPrimaryRange() + { + return isPrimaryRange; + } + + @JsonProperty(DATACENTER) + public String datacenter() + { + return datacenter; + } + + @JsonProperty(HOSTS) + public List<String> hosts() + { + return hosts; + } + + @JsonProperty(START_TOKEN) + public Long startToken() + { + return startToken; + } + + @JsonProperty(END_TOKEN) + public Long endToken() + { + return endToken; + } + + @JsonProperty(REPAIR_TYPE) + public RepairType repairType() + { + return repairType; + } + + @JsonProperty(FORCE) + public Boolean force() + { + return force; + } + + /** + * {@code NodeSettings} builder static inner class. Review Comment: Is it relevant here? ########## server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.adapters.base.RepairOptions; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.config.RepairConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link OperationalJob} to perform repair operation. + */ +public class RepairJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RepairJob.class); + private static final String OPERATION = "repair"; + private static final String PREVIEW_KIND_REPAIRED = "REPAIRED"; + + private final RepairRequestParam repairParams; + private final Vertx vertx; + private final RepairConfiguration repairConfiguration; + protected StorageOperations storageOperations; + + /** + * Enum representing the status of a parent repair session + */ + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param vertx + * @param repairConfiguration + * @param jobId UUID representing the Job to be created + * @param storageOps + * @param repairParams + */ + public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams) + { + super(jobId); + this.vertx = vertx; + this.repairConfiguration = repairConfiguration; + this.storageOperations = storageOps; + this.repairParams = repairParams; + } + + @Override + public boolean isRunningOnCassandra() + { + // TODO: Leverage repair vtables to fail-fast on conflicting repairs (overlapping token-ranges or replica-sets) + // Currently does not check for concurrent repairs + return false; + } + + @Override + protected void executeInternal() + { + Map<String, String> options = generateRepairOptions(repairParams.requestpayload()); + String keyspace = repairParams.keyspace().name(); + + LOGGER.info("Executing repair operation for keyspace {} jobId={} maxRuntime={}", + keyspace, this.jobId(), repairConfiguration.maxRepairJobRuntimeMillis()); + + int cmd = storageOperations.repair(keyspace, options); + if (cmd <= 0) + { + // repairAsync can only return 0 for replication factor 1. + LOGGER.info("Replication factor is 1. No repair is needed for keyspace '{}'", keyspace); + } + else + { + // complete the max wait time promise either when exceeding the wait time, or the result is available + Promise<Boolean> maxWaitTimePromise = Promise.promise(); + vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d -> { + LOGGER.info("Timer Poll"); + maxWaitTimePromise.tryComplete(true); + }); + + // Promise for completion of repair operation + Promise<Void> promise = Promise.promise(); + Future<Void> resultFut = promise.future(); + + // main event loop checks periodically (10s) for completion + vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), id -> queryForCompletedRepair(promise, cmd)); Review Comment: Timer id is not captured. I think we need to cancel the timer when repair job completes so that queryForCompletedRepair won't get triggered after repair completion. ########## server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.adapters.base.RepairOptions; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.config.RepairConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link OperationalJob} to perform repair operation. + */ +public class RepairJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RepairJob.class); + private static final String OPERATION = "repair"; + private static final String PREVIEW_KIND_REPAIRED = "REPAIRED"; + + private final RepairRequestParam repairParams; + private final Vertx vertx; + private final RepairConfiguration repairConfiguration; + protected StorageOperations storageOperations; + + /** + * Enum representing the status of a parent repair session + */ + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param vertx + * @param repairConfiguration + * @param jobId UUID representing the Job to be created + * @param storageOps + * @param repairParams + */ + public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams) + { + super(jobId); + this.vertx = vertx; + this.repairConfiguration = repairConfiguration; + this.storageOperations = storageOps; + this.repairParams = repairParams; + } + + @Override + public boolean isRunningOnCassandra() + { + // TODO: Leverage repair vtables to fail-fast on conflicting repairs (overlapping token-ranges or replica-sets) + // Currently does not check for concurrent repairs + return false; + } + + @Override + protected void executeInternal() + { + Map<String, String> options = generateRepairOptions(repairParams.requestpayload()); + String keyspace = repairParams.keyspace().name(); + + LOGGER.info("Executing repair operation for keyspace {} jobId={} maxRuntime={}", + keyspace, this.jobId(), repairConfiguration.maxRepairJobRuntimeMillis()); + + int cmd = storageOperations.repair(keyspace, options); + if (cmd <= 0) + { + // repairAsync can only return 0 for replication factor 1. + LOGGER.info("Replication factor is 1. No repair is needed for keyspace '{}'", keyspace); + } + else + { + // complete the max wait time promise either when exceeding the wait time, or the result is available + Promise<Boolean> maxWaitTimePromise = Promise.promise(); + vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d -> { + LOGGER.info("Timer Poll"); + maxWaitTimePromise.tryComplete(true); + }); + + // Promise for completion of repair operation + Promise<Void> promise = Promise.promise(); + Future<Void> resultFut = promise.future(); + + // main event loop checks periodically (10s) for completion + vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), id -> queryForCompletedRepair(promise, cmd)); + resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false)); + Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future(); + + Future<Void> compositeFut = Future.any(maxWaitTimeFut, resultFut) + // If this lambda below is evaluated, either one of the futures have completed; + // In either case, the future corresponding to the job execution is returned + .compose(f -> { + LOGGER.info("One of the futures ended waitStatus={} resultStatus={}", + maxWaitTimeFut.isComplete(), resultFut.isComplete()); + boolean isTimeout = (maxWaitTimeFut.succeeded()) ? maxWaitTimeFut.result() : false; + if (isTimeout) + { + LOGGER.error("Timer ran out before the repair job completed. Repair took too long"); + // TODO: Cancel repair? (Nice to have) + // We free up the thread (job fails) and stop polling for completion + return Future.failedFuture("Repair job taking too long"); + } + // otherwise, the result of the job is available + return resultFut; + }); + try + { + compositeFut.toCompletionStage().toCompletableFuture().get(); // wait + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } + + private Map<String, String> generateRepairOptions(RepairPayload repairPayload) + { + Map<String, String> options = new HashMap<>(); + + List<String> tables = repairPayload.tables(); + if (tables != null && !tables.isEmpty()) + { + options.put(RepairOptions.COLUMNFAMILIES.getValue(), String.join(",", tables)); + } + + Boolean isPrimaryRange = repairPayload.isPrimaryRange(); + if (isPrimaryRange != null) + { + options.put(RepairOptions.PRIMARY_RANGE.getValue(), String.valueOf(isPrimaryRange)); + } + // TODO: Verify use-cases involving multiple DCs + + String dc = repairPayload.datacenter(); + if (dc != null) + { + options.put(RepairOptions.DATACENTERS.getValue(), dc); + } + + List<String> hosts = repairPayload.hosts(); + if (hosts != null && !hosts.isEmpty()) + { + options.put(RepairOptions.HOSTS.getValue(), String.join(",", requireNonNull(hosts))); + } + + if (repairPayload.startToken() != null && repairPayload.endToken() != null) + { + options.put(RepairOptions.RANGES.getValue(), repairPayload.startToken() + ":" + repairPayload.endToken()); + } + + if (repairPayload.repairType() == RepairPayload.RepairType.INCREMENTAL) + { + options.put(RepairOptions.INCREMENTAL.getValue(), Boolean.TRUE.toString()); + } + + if (repairPayload.force() != null) + { + options.put(RepairOptions.FORCE_REPAIR.getValue(), String.valueOf(repairPayload.force())); + } + + if (repairPayload.isValidate() != null) + { + options.put(RepairOptions.PREVIEW.getValue(), PREVIEW_KIND_REPAIRED); + } + return options; + } + + private void queryForCompletedRepair(Promise<Void> promise, int cmd) + { + LOGGER.info("Polling repair operation for jobId={} status={}", this.jobId(), promise.future().isComplete()); + List<String> status = storageOperations.getParentRepairStatus(cmd); + String queriedString = "queried for parent session status and"; + if (status == null) + { + LOGGER.error("{} couldn't find repair status for cmd: {}", + queriedString, cmd); + } + else + { + ParentRepairStatus parentRepairStatus = ParentRepairStatus.valueOf(status.get(0)); + LOGGER.info("Status from polling: {}", parentRepairStatus); + List<String> messages = status.subList(1, status.size()); + switch (parentRepairStatus) + { + case COMPLETED: + case FAILED: + LOGGER.info("{} discovered repair {}", queriedString, parentRepairStatus.name().toLowerCase()); + if (parentRepairStatus == ParentRepairStatus.FAILED) + { + promise.fail(new IOException(messages.get(0))); + } + LOGGER.info("Messages:\n{}", String.join("\n", messages)); + promise.tryComplete(); + vertx.cancelTimer(cmd); Review Comment: It should be timer ID, not the repair's cmd id right? I think timer needs to be cancelled whenever promise is resolved. ########## server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.adapters.base.RepairOptions; +import org.apache.cassandra.sidecar.common.request.data.RepairPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.config.RepairConfiguration; +import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link OperationalJob} to perform repair operation. + */ +public class RepairJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RepairJob.class); + private static final String OPERATION = "repair"; + private static final String PREVIEW_KIND_REPAIRED = "REPAIRED"; + + private final RepairRequestParam repairParams; + private final Vertx vertx; + private final RepairConfiguration repairConfiguration; + protected StorageOperations storageOperations; + + /** + * Enum representing the status of a parent repair session + */ + public enum ParentRepairStatus + { + IN_PROGRESS, COMPLETED, FAILED + } + + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param vertx + * @param repairConfiguration + * @param jobId UUID representing the Job to be created + * @param storageOps + * @param repairParams + */ + public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams) + { + super(jobId); + this.vertx = vertx; + this.repairConfiguration = repairConfiguration; + this.storageOperations = storageOps; + this.repairParams = repairParams; + } + + @Override + public boolean isRunningOnCassandra() + { + // TODO: Leverage repair vtables to fail-fast on conflicting repairs (overlapping token-ranges or replica-sets) + // Currently does not check for concurrent repairs + return false; + } + + @Override + protected void executeInternal() + { + Map<String, String> options = generateRepairOptions(repairParams.requestpayload()); + String keyspace = repairParams.keyspace().name(); + + LOGGER.info("Executing repair operation for keyspace {} jobId={} maxRuntime={}", + keyspace, this.jobId(), repairConfiguration.maxRepairJobRuntimeMillis()); + + int cmd = storageOperations.repair(keyspace, options); + if (cmd <= 0) + { + // repairAsync can only return 0 for replication factor 1. + LOGGER.info("Replication factor is 1. No repair is needed for keyspace '{}'", keyspace); Review Comment: Based on https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/service/StorageService.java#L3076, it looks like 0 is returned not just when RF is 1. Generalize the message a bit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

