This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new c824d11 CASSANDRASC-106: Add restore task watcher to report long running tasks (#104) c824d11 is described below commit c824d112de2c92d180a90a1830108d225f45dc23 Author: Doug Rohrer <682821+jeetkund...@users.noreply.github.com> AuthorDate: Thu Mar 21 13:27:31 2024 -0400 CASSANDRASC-106: Add restore task watcher to report long running tasks (#104) Patch by Doug Rohrer; Reviewed by Yifan Cai, Francisco Guerrero for CASSANDRASC-106 --- CHANGES.txt | 3 +- checkstyle.xml | 4 +- .../cassandra/sidecar/client/RequestExecutor.java | 1 + .../selection/OrderedInstanceSelectionPolicy.java | 1 + .../sidecar/cluster/CQLSessionProviderImpl.java | 1 + .../sidecar/cluster/CassandraAdapterDelegate.java | 2 + .../sidecar/config/RestoreJobConfiguration.java | 5 ++ .../config/yaml/DriverConfigurationImpl.java | 3 + .../sidecar/config/yaml/JmxConfigurationImpl.java | 2 + .../config/yaml/RestoreJobConfigurationImpl.java | 32 ++++++++++ .../apache/cassandra/sidecar/db/RestoreSlice.java | 29 +++++---- .../sidecar/db/schema/RestoreSlicesSchema.java | 1 + .../cassandra/sidecar/restore/RestoreJobUtil.java | 10 ++- .../sidecar/restore/RestoreProcessor.java | 56 ++++++++++++++--- .../sidecar/restore/RestoreSliceHandler.java} | 28 +++++---- .../sidecar/restore/RestoreSliceTask.java | 66 +++++++++++++++++++- .../cassandra/sidecar/stats/RestoreJobStats.java | 23 ++++++- .../cluster/SidecarLoadBalancingPolicyTest.java | 1 + .../sidecar/testing/IntegrationTestModule.java | 2 + .../testing/SharedExecutorNettyOptions.java | 3 + .../cassandra/testing/SimpleCassandraVersion.java | 1 + .../cassandra/sidecar/HealthServiceSslTest.java | 1 + .../cassandra/sidecar/HealthServiceTest.java | 1 + .../org/apache/cassandra/sidecar/TestModule.java | 1 + .../sidecar/restore/RestoreProcessorTest.java | 72 +++++++++++++++++++++- .../sidecar/restore/RestoreSliceTaskTest.java | 38 +++++++++--- .../sstableuploads/BaseUploadsHandlerTest.java | 1 + .../sidecar/stats/TestRestoreJobStats.java | 9 +++ 28 files changed, 343 insertions(+), 54 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e625cde..ce39b66 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Add restore task watcher to report long running tasks (CASSANDRASC-106) * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105) * Make hash algorithm implementation pluggable (CASSANDRASC-114) * Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112) @@ -81,4 +82,4 @@ * Add integration tests task (CASSANDRA-15031) * Add support for SSL and bindable address (CASSANDRA-15030) * Autogenerate API docs for sidecar (CASSANDRA-15028) - * C* Management process (CASSANDRA-14395) + * C* Management process (CASSANDRA-14395) \ No newline at end of file diff --git a/checkstyle.xml b/checkstyle.xml index fd7c1b0..cea9a21 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -244,8 +244,8 @@ page at http://checkstyle.sourceforge.net/config.html --> <module name="LineLength"> <!-- Checks if a line is too long. --> - <property name="max" value="120" default="120" /> - <property name="severity" value="error" /> + <property name="max" value="160" /> + <property name="severity" value="warning" /> <!-- The default ignore pattern exempts the following elements: diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java index 0dd9e52..3480dca 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java @@ -155,6 +155,7 @@ public class RequestExecutor implements AutoCloseable /** * Closes the underlying HTTP client */ + @Override public void close() throws Exception { httpClient.close(); diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java index 62df635..ecdca71 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java @@ -47,6 +47,7 @@ public class OrderedInstanceSelectionPolicy implements InstanceSelectionPolicy * * @return an iterator of {@link SidecarInstance instances} */ + @Override @NotNull public Iterator<SidecarInstance> iterator() { diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java index f394ca2..6bc30a0 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java @@ -119,6 +119,7 @@ public class CQLSessionProviderImpl implements CQLSessionProvider * * @return Session */ + @Override @Nullable public synchronized Session get() { diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java index 7bc3a64..8ce81e2 100644 --- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java +++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java @@ -373,11 +373,13 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi return nodeSettingsFromJmx; } + @Override public ResultSet executeLocal(Statement statement) { return fromAdapter(adapter -> adapter.executeLocal(statement)); } + @Override public InetSocketAddress localNativeTransportPort() { return fromAdapter(ICassandraAdapter::localNativeTransportPort); diff --git a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java index 3231f17..ec59165 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java @@ -48,4 +48,9 @@ public interface RestoreJobConfiguration * @return time to live for restore job tables: restore_job and restore_slice */ long restoreJobTablesTtlSeconds(); + + /** + * @return the number of seconds above which a restore handler is considered "long-running" + */ + long restoreJobLongRunningHandlerThresholdSeconds(); } diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java index 49334ed..c424728 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java @@ -34,18 +34,21 @@ public class DriverConfigurationImpl implements DriverConfiguration private String localDc; private int numConnections; + @Override @JsonProperty("contact_points") public List<InetSocketAddress> contactPoints() { return contactPoints; } + @Override @JsonProperty("num_connections") public int numConnections() { return numConnections; } + @Override @JsonProperty("local_dc") public String localDc() { diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java index 022d4ec..8349011 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java @@ -47,6 +47,7 @@ public class JmxConfigurationImpl implements JmxConfiguration /** * @return the maximum number of connection retry attempts to make before failing */ + @Override @JsonProperty("max_retries") public int maxRetries() { @@ -56,6 +57,7 @@ public class JmxConfigurationImpl implements JmxConfiguration /** * @return the delay, in milliseconds, between retry attempts */ + @Override @JsonProperty("retry_delay_millis") public long retryDelayMillis() { diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java index 3475f91..336340c 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java @@ -36,6 +36,10 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5; public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process at most 20 slices concurrently public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS = TimeUnit.DAYS.toSeconds(90); + public static final String RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = + "restore_job_long_running_threshold_seconds"; + // A restore job handler is considered long-running if it has been in the "active" list for 10 minutes. + private static final long DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600; @JsonProperty(value = "job_discovery_active_loop_delay_millis") protected final long jobDiscoveryActiveLoopDelayMillis; @@ -52,6 +56,11 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration @JsonProperty(value = "restore_job_tables_ttl_seconds") protected final long restoreJobTablesTtlSeconds; + + @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS, + defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "") + private final long restoreJobLongRunningThresholdSeconds; + protected RestoreJobConfigurationImpl() { this(builder()); @@ -64,6 +73,7 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays; this.processMaxConcurrency = builder.processMaxConcurrency; this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds; + this.restoreJobLongRunningThresholdSeconds = builder.restoreJobLongRunningThresholdSeconds; validate(); } @@ -132,6 +142,14 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration return restoreJobTablesTtlSeconds; } + @Override + @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS, + defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "") + public long restoreJobLongRunningHandlerThresholdSeconds() + { + return restoreJobLongRunningThresholdSeconds; + } + public static Builder builder() { return new Builder(); @@ -142,6 +160,8 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration */ public static class Builder implements DataObjectBuilder<Builder, RestoreJobConfigurationImpl> { + protected long restoreJobLongRunningThresholdSeconds = + DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS; private long jobDiscoveryActiveLoopDelayMillis = DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS; private long jobDiscoveryIdleLoopDelayMillis = DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS; private int jobDiscoveryRecencyDays = DEFAULT_JOB_DISCOVERY_RECENCY_DAYS; @@ -218,6 +238,18 @@ public class RestoreJobConfigurationImpl implements RestoreJobConfiguration return update(b -> b.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds); } + /** + * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns a reference to this Builder enabling + * method chaining. + * + * @param restoreJobLongRunningThresholdSeconds the {@code restoreJobLongRunningThresholdSeconds} to set + * @return a reference to this Builder + */ + public Builder restoreJobLongRunningThresholdSeconds(long restoreJobLongRunningThresholdSeconds) + { + return update(b -> b.restoreJobLongRunningThresholdSeconds = restoreJobLongRunningThresholdSeconds); + } + @Override public RestoreJobConfigurationImpl build() { diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java index 7563e93..c4c7100 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java @@ -29,8 +29,6 @@ import java.util.Set; import java.util.UUID; import com.datastax.driver.core.Row; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.DataObjectBuilder; import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload; @@ -40,6 +38,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.restore.RestoreJobUtil; +import org.apache.cassandra.sidecar.restore.RestoreSliceHandler; import org.apache.cassandra.sidecar.restore.RestoreSliceTask; import org.apache.cassandra.sidecar.restore.RestoreSliceTracker; import org.apache.cassandra.sidecar.restore.StorageClient; @@ -229,17 +228,17 @@ public class RestoreSlice /** * @return {@link RestoreSliceTask} of the restore slice. See {@link RestoreSliceTask} for the steps. */ - public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool s3ClientPool, - ExecutorPools.TaskExecutorPool executorPool, - SSTableImporter importer, - double requiredUsableSpacePercentage, - RestoreSliceDatabaseAccessor sliceDatabaseAccessor, - RestoreJobStats stats, - RestoreJobUtil restoreJobUtil) + public RestoreSliceHandler toAsyncTask(StorageClientPool s3ClientPool, + ExecutorPools.TaskExecutorPool executorPool, + SSTableImporter importer, + double requiredUsableSpacePercentage, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, + RestoreJobStats stats, + RestoreJobUtil restoreJobUtil) { if (isCancelled) - return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", - this, null)); + return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", + this, null), this); try { @@ -254,13 +253,13 @@ public class RestoreSlice catch (IllegalStateException illegalState) { // The slice is not registered with a tracker, retry later. - return promise -> promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started", - this, illegalState)); + return RestoreSliceTask.failed(RestoreJobExceptions.ofSlice("Restore slice is not started", + this, illegalState), this); } catch (Exception cause) { - return promise -> promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed", - this, cause)); + return RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is failed", + this, cause), this); } } diff --git a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java index 0602a9e..3c9e2f1 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java @@ -48,6 +48,7 @@ public class RestoreSlicesSchema extends AbstractSchema.TableSchema this.tableTtlSeconds = tableTtlSeconds; } + @Override protected void prepareStatements(@NotNull Session session) { insertSlice = prepare(insertSlice, session, CqlLiterals.insertSlice(keyspaceConfig)); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java index ef36316..8651d53 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java @@ -59,7 +59,7 @@ public class RestoreJobUtil private static final int RESTORE_JOB_PREFIX_LEN = RESTORE_JOB_PREFIX.length(); private static final int RESTORE_JOB_DEFAULT_HASH_SEED = 0; - private DigestAlgorithmProvider digestAlgorithmProvider; + private final DigestAlgorithmProvider digestAlgorithmProvider; @Inject public RestoreJobUtil(@Named("xxhash32") DigestAlgorithmProvider digestAlgorithmProvider) @@ -223,4 +223,12 @@ public class RestoreJobUtil } } } + + /** + * @return the current time in nanoseconds + */ + public long currentTimeNanos() + { + return System.nanoTime(); + } } diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java index 1a438dc..edded10 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java @@ -21,8 +21,10 @@ package org.apache.cassandra.sidecar.restore; import java.util.HashMap; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; @@ -62,6 +64,8 @@ public class RestoreProcessor implements PeriodicTask private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor; private final RestoreJobStats stats; private final RestoreJobUtil restoreJobUtil; + private final Set<RestoreSliceHandler> activeTasks = ConcurrentHashMap.newKeySet(); + private final long longRunningHandlerThresholdInSeconds; private volatile boolean isClosed = false; // OK to run close twice, so relax the control to volatile @@ -82,6 +86,8 @@ public class RestoreProcessor implements PeriodicTask .processMaxConcurrency()); this.requiredUsableSpacePercentage = config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired() / 100.0; + this.longRunningHandlerThresholdInSeconds = config.restoreJobConfiguration() + .restoreJobLongRunningHandlerThresholdSeconds(); this.importer = importer; this.sliceDatabaseAccessor = sliceDatabaseAccessor; this.stats = stats; @@ -128,26 +134,28 @@ public class RestoreProcessor implements PeriodicTask if (slice == null) // it should never happen, and is only to make ide happy { processMaxConcurrency.releasePermit(); - return; + break; } // capture the new queue length after polling sliceQueue.captureImportQueueLength(); - pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool, importer, - requiredUsableSpacePercentage, - sliceDatabaseAccessor, stats, - restoreJobUtil), - false) // unordered + RestoreSliceHandler task = slice.toAsyncTask(s3ClientPool, pool, importer, + requiredUsableSpacePercentage, + sliceDatabaseAccessor, stats, + restoreJobUtil); + activeTasks.add(task); + pool.executeBlocking(task, false) // unordered; run in parallel .onSuccess(restoreSlice -> { + int instanceId = slice.owner().id(); if (slice.hasImported()) { - stats.captureSliceCompletionTime(slice.owner().id(), System.nanoTime() - slice.creationTimeNanos()); + stats.captureSliceCompletionTime(instanceId, System.nanoTime() - slice.creationTimeNanos()); LOGGER.info("Slice completes successfully. sliceKey={}", slice.key()); slice.complete(); } else if (slice.hasStaged()) { - // todo: report stat of time taken to stage + stats.captureSliceStageTime(instanceId, task.elapsedInNanos()); LOGGER.info("Slice has been staged successfully. sliceKey={}", slice.key()); // the slice is not fully complete yet. Re-enqueue the slice. sliceQueue.offer(slice); @@ -186,12 +194,38 @@ public class RestoreProcessor implements PeriodicTask // decrement the active slices and capture the new queue length sliceQueue.decrementActiveSliceCount(slice); sliceQueue.captureImportQueueLength(); + activeTasks.remove(task); }); } promise.tryComplete(); + checkForLongRunningTasks(); sliceQueue.capturePendingSliceCount(); } + private void checkForLongRunningTasks() + { + for (RestoreSliceHandler task : activeTasks) + { + long elapsedInNanos = task.elapsedInNanos(); + if (elapsedInNanos == -1) + { + continue; + } + long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos, TimeUnit.NANOSECONDS); + if (elapsedInSeconds > longRunningHandlerThresholdInSeconds) + { + LOGGER.warn("Long-running restore slice task detected. " + + "elapsedSeconds={} thresholdSeconds={} sliceKey={} jobId={} status={}", + elapsedInSeconds, + longRunningHandlerThresholdInSeconds, + task.slice().key(), + task.slice().jobId(), + task.slice().job().status); + stats.captureLongRunningRestoreHandler(task.slice().owner().id(), elapsedInNanos); + } + } + } + @Override public void close() { @@ -206,6 +240,12 @@ public class RestoreProcessor implements PeriodicTask return sliceQueue.activeSliceCount(); } + @VisibleForTesting + int activeTasks() + { + return activeTasks.size(); + } + @VisibleForTesting int pendingStartSlices() { diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java similarity index 62% copy from src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java copy to src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java index ef088db..fdcf390 100644 --- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java @@ -16,22 +16,24 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar; +package org.apache.cassandra.sidecar.restore; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.extension.ExtendWith; - -import io.vertx.junit5.VertxExtension; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.db.RestoreSlice; /** - * Health Service Tests + * A handler that processes a restore slice */ -@DisplayName("Health Service Test") -@ExtendWith(VertxExtension.class) -public class HealthServiceTest extends AbstractHealthServiceTest +public interface RestoreSliceHandler extends Handler<Promise<RestoreSlice>> { - public boolean isSslEnabled() - { - return false; - } + /** + * @return slice the handler processes + */ + RestoreSlice slice(); + + /** + * @return the elapsed time in nanoseconds if the task has started processing, -1 otherwise + */ + long elapsedInNanos(); } diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index 4e942c7..ed85d3f 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; @@ -59,7 +58,7 @@ import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSuff * * Note that the class is package private, and it is not intended to be referenced by other packages. */ -public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> +public class RestoreSliceTask implements RestoreSliceHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RestoreSliceTask.class); @@ -71,6 +70,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor; private final RestoreJobStats stats; private final RestoreJobUtil restoreJobUtil; + private long taskStartTimeNanos = -1; public RestoreSliceTask(RestoreSlice slice, StorageClient s3Client, @@ -94,9 +94,15 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> this.restoreJobUtil = restoreJobUtil; } + public static RestoreSliceHandler failed(RestoreJobException cause, RestoreSlice slice) + { + return new Failed(cause, slice); + } + @Override public void handle(Promise<RestoreSlice> event) { + this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos(); if (failOnCancelled(event)) return; @@ -192,7 +198,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> .whenComplete((resp, cause) -> { if (cause == null) { - stats.captureSliceReplicationTime(System.nanoTime() - slice.creationTimeNanos()); + stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos()); slice.setExistsOnS3(); return; } @@ -244,6 +250,11 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> }); } + private long currentTimeInNanos() + { + return restoreJobUtil.currentTimeNanos(); + } + private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event) { if (slice.isCancelled()) @@ -521,4 +532,53 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}", slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException); } + + @Override + public long elapsedInNanos() + { + return taskStartTimeNanos == -1 ? -1 : + currentTimeInNanos() - taskStartTimeNanos; + } + + @Override + public RestoreSlice slice() + { + return slice; + } + + /** + * A RestoreSliceHandler that immediately fails the slice/promise. + * Used when the processor already knows that a slice should not be processed for some reason + * as indicated in cause field. + */ + public static class Failed implements RestoreSliceHandler + { + private final RestoreJobException cause; + private final RestoreSlice slice; + + public Failed(RestoreJobException cause, RestoreSlice slice) + { + this.cause = cause; + this.slice = slice; + } + + @Override + public void handle(Promise<RestoreSlice> promise) + { + promise.tryFail(cause); + } + + @Override + public long elapsedInNanos() + { + // it fails immediately + return 0; + } + + @Override + public RestoreSlice slice() + { + return slice; + } + } } diff --git a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java index 1a44bd8..27ca5fe 100644 --- a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java +++ b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java @@ -26,7 +26,7 @@ public interface RestoreJobStats /** * Captures the total time taken to complete a slice successfully * - * @param instanceId instance that contains the slice + * @param instanceId instance that is processing the slice * @param durationNanos duration in nanoseconds */ default void captureSliceCompletionTime(int instanceId, long durationNanos) @@ -34,6 +34,17 @@ public interface RestoreJobStats } + /** + * Captures the total time taken to stage the slice + * + * @param instanceId instance that contains the slice + * @param durationNanos duration in nanoseconds + */ + default void captureSliceStageTime(int instanceId, long durationNanos) + { + + } + /** * Captures the time taken to import SSTable(s) in a slice * @@ -217,4 +228,14 @@ public interface RestoreJobStats { } + + /** + * Captures a long-running restore job handler + * @param instanceId instance that is processing the slice + * @param handlerDuration restore job current duration in nanoseconds + */ + default void captureLongRunningRestoreHandler(int instanceId, long handlerDuration) + { + + } } diff --git a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java index 4628a14..5b23031 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java +++ b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java @@ -55,6 +55,7 @@ public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase .collect(Collectors.toList()); } + @Override protected int getNumInstancesToManage(int clusterSize) { return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 2 instances in the "cluster" diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java index ef76ffa..d0c069d 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java +++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java @@ -118,6 +118,7 @@ public class IntegrationTestModule extends AbstractModule * @return instance meta information * @throws NoSuchElementException when the instance with {@code id} does not exist */ + @Override public InstanceMetadata instanceFromId(int id) throws NoSuchElementException { return cassandraTestContext.instancesConfig().instanceFromId(id); @@ -130,6 +131,7 @@ public class IntegrationTestModule extends AbstractModule * @return instance meta information * @throws NoSuchElementException when the instance for {@code host} does not exist */ + @Override public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException { return cassandraTestContext.instancesConfig().instanceFromHost(host); diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java index 3bdeead..2dbf616 100644 --- a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java +++ b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java @@ -48,11 +48,13 @@ class SharedExecutorNettyOptions extends NettyOptions private final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory); private final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory); + @Override public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory) { return sharedEventLoopGroup; } + @Override public void onClusterClose(EventLoopGroup eventLoopGroup) { } @@ -63,6 +65,7 @@ class SharedExecutorNettyOptions extends NettyOptions return sharedHWT; } + @Override public void onClusterClose(Timer timer) { } diff --git a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java index d7cd14b..0e90ae8 100644 --- a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java +++ b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java @@ -100,6 +100,7 @@ public class SimpleCassandraVersion implements Comparable<SimpleCassandraVersion } + @Override public int compareTo(SimpleCassandraVersion other) { if (major < other.major) diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java index 641ba55..2939d45 100644 --- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java @@ -30,6 +30,7 @@ import io.vertx.junit5.VertxExtension; @ExtendWith(VertxExtension.class) public class HealthServiceSslTest extends AbstractHealthServiceTest { + @Override public boolean isSslEnabled() { return true; diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java index ef088db..ac4a942 100644 --- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java @@ -30,6 +30,7 @@ import io.vertx.junit5.VertxExtension; @ExtendWith(VertxExtension.class) public class HealthServiceTest extends AbstractHealthServiceTest { + @Override public boolean isSslEnabled() { return false; diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java index 518fc70..68bb4d8 100644 --- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java +++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java @@ -112,6 +112,7 @@ public class TestModule extends AbstractModule RestoreJobConfigurationImpl.builder() .restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1) .processMaxConcurrency(RESTORE_MAX_CONCURRENCY) + .restoreJobLongRunningThresholdSeconds(1) .build(); HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl(200, 1000); return SidecarConfigurationImpl.builder() diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java index c3fafc8..beea547 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java @@ -19,6 +19,9 @@ package org.apache.cassandra.sidecar.restore; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.BeforeEach; @@ -28,7 +31,10 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.util.Modules; +import io.vertx.core.Promise; import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; import org.apache.cassandra.sidecar.server.MainModule; @@ -170,14 +176,74 @@ class RestoreProcessorTest }); } + @Test + public void testLongRunningHandlerDetection() + { + + when(sidecarSchema.isInitialized()).thenReturn(true); + periodicTaskExecutor.schedule(processor); + + CountDownLatch latch = new CountDownLatch(1); + AtomicLong currentTime = new AtomicLong(0); + RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets the start time + long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5, TimeUnit.MINUTES); + currentTime.set(fiveMinutesInNanos); + processor.submit(slice); + loopAssert(3, () -> { + assertThat(stats.longRunningRestoreHandlers.size()).isEqualTo(1); + Long handlerTimeInNanos = stats.longRunningRestoreHandlers.get(slice.owner().id()); + assertThat(handlerTimeInNanos).isNotNull(); + assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos); + assertThat(processor.activeTasks()).isOne(); + }); + + // Make slice completable. + latch.countDown(); + + // Make sure when the slice completes the active handler is removed + loopAssert(3, () -> { + assertThat(processor.activeTasks()).isZero(); + }); + } + private RestoreSlice mockSlowSlice(CountDownLatch latch) + { + return mockSlowSlice(latch, System::nanoTime); + } + + private RestoreSlice mockSlowSlice(CountDownLatch latch, Supplier<Long> timeInNanosSupplier) { RestoreSlice slice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS); when(slice.jobId()).thenReturn(UUIDs.timeBased()); when(slice.owner().id()).thenReturn(1); - when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn(promise -> { - Uninterruptibles.awaitUninterruptibly(latch); - promise.complete(slice); + when(slice.key()).thenReturn("SliceKey"); + RestoreJob job = RestoreJob.builder() + .jobStatus(RestoreJobStatus.CREATED) + .build(); + when(slice.job()).thenReturn(job); + when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(), any())).thenReturn( + new RestoreSliceHandler() + { + private Long startTime = timeInNanosSupplier.get(); + + @Override + public void handle(Promise<RestoreSlice> promise) + { + Uninterruptibles.awaitUninterruptibly(latch); + promise.complete(slice); + } + + @Override + public long elapsedInNanos() + { + return timeInNanosSupplier.get() - startTime; + } + + @Override + public RestoreSlice slice() + { + return slice; + } }); when(slice.hasImported()).thenReturn(true); return slice; diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java index 6e9e1d7..dcfd6a1 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java @@ -25,6 +25,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,6 +69,7 @@ class RestoreSliceTaskTest private TaskExecutorPool executorPool; private TestRestoreJobStats stats; private TestRestoreSliceAccessor sliceDatabaseAccessor; + private RestoreJobUtil util; @BeforeEach void setup() @@ -82,6 +85,7 @@ class RestoreSliceTaskTest mockSSTableImporter = mock(SSTableImporter.class); executorPool = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal(); stats = new TestRestoreJobStats(); + util = mock(RestoreJobUtil.class); sliceDatabaseAccessor = new TestRestoreSliceAccessor(); } @@ -302,14 +306,33 @@ class RestoreSliceTaskTest .hasMessageContaining("Random exception"); } + @Test + void testSliceDuration() + { + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM"); + AtomicLong currentNanos = new AtomicLong(0); + RestoreSliceTask task = createTask(mockSlice, job, currentNanos::get); + Promise<RestoreSlice> promise = Promise.promise(); + task.handle(promise); // Task isn't considered started until it `handle` is called + currentNanos.set(123L); + assertThat(task.elapsedInNanos()).isEqualTo(123L); + } + private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job) + { + return createTask(slice, job, System::nanoTime); + } + + private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job, Supplier<Long> currentNanoTimeSupplier) { when(slice.job()).thenReturn(job); assertThat(slice.job()).isSameAs(job); assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar()); assertThat(slice.job().status).isEqualTo(job.status); + RestoreJobUtil util = mock(RestoreJobUtil.class); + when(util.currentTimeNanos()).thenAnswer(invok -> currentNanoTimeSupplier.get()); return new TestRestoreSliceTask(slice, mockStorageClient, executorPool, mockSSTableImporter, - 0, sliceDatabaseAccessor, stats); + 0, sliceDatabaseAccessor, stats, util); } private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job) @@ -319,7 +342,7 @@ class RestoreSliceTaskTest assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar()); assertThat(slice.job().status).isEqualTo(job.status); return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool, - mockSSTableImporter, 0, sliceDatabaseAccessor, stats); + mockSSTableImporter, 0, sliceDatabaseAccessor, stats, util); } static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor @@ -346,10 +369,11 @@ class RestoreSliceTaskTest public TestRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, - RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats) + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, RestoreJobStats stats, + RestoreJobUtil restoreJobUtil) { - super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats, - null); + super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, + sliceDatabaseAccessor, stats, restoreJobUtil); this.slice = slice; this.stats = stats; } @@ -382,10 +406,10 @@ class RestoreSliceTaskTest TaskExecutorPool executorPool, SSTableImporter importer, double requiredUsableSpacePercentage, RestoreSliceDatabaseAccessor sliceDatabaseAccessor, - RestoreJobStats stats) + RestoreJobStats stats, RestoreJobUtil util) { super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats, - null); + util); } @Override diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java index 30bf04b..6cf72c1 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java @@ -240,6 +240,7 @@ class BaseUploadsHandlerTest super(Vertx.vertx(), 1, null, null, null, null, null, "localhost", 9043); } + @Override protected JmxNotificationListener initializeJmxListener() { return null; diff --git a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java index 713c33c..0a2cd9e 100644 --- a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java +++ b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java @@ -19,7 +19,9 @@ package org.apache.cassandra.sidecar.stats; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Test implementation for testing restore job related stats captured @@ -39,6 +41,7 @@ public class TestRestoreJobStats implements RestoreJobStats public long failedJobCount; public long activeJobCount; public long tokenRefreshCount; + public Map<Integer, Long> longRunningRestoreHandlers = new HashMap<>(); @Override public void captureSliceCompletionTime(int instanceId, long durationNanos) @@ -118,4 +121,10 @@ public class TestRestoreJobStats implements RestoreJobStats { tokenRefreshCount += 1; } + + @Override + public void captureLongRunningRestoreHandler(int instanceId, long handlerDuration) + { + longRunningRestoreHandlers.put(instanceId, handlerDuration); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org