This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e02967e095 IGNITE-20848 Introduce management REST API for Compute (#3062) e02967e095 is described below commit e02967e095d2468a54903674ff1484e573941113 Author: Ivan Gagarkin <gagarkin....@gmail.com> AuthorDate: Wed Jan 31 12:55:58 2024 +0100 IGNITE-20848 Introduce management REST API for Compute (#3062) --- .../ignite/internal/compute/ComputeComponent.java | 8 + .../internal/compute/ComputeComponentImpl.java | 6 + .../internal/compute/ComputeMessageTypes.java | 10 + .../ignite/internal/compute/ComputeUtils.java | 17 + .../ignite/internal/compute/ExecutionManager.java | 22 ++ .../compute/message/JobStatusesRequest.java | 29 ++ .../compute/message/JobStatusesResponse.java | 40 ++ .../compute/messaging/ComputeMessaging.java | 84 ++++- modules/rest-api/openapi/openapi.yaml | 169 +++++++++ .../internal/rest/api/compute/ComputeApi.java | 136 +++++++ .../ignite/internal/rest/api/compute/JobState.java | 53 +++ .../internal/rest/api/compute/JobStatus.java | 115 ++++++ .../rest/api/compute/UpdateJobPriorityBody.java | 46 +++ modules/rest/build.gradle | 7 + .../rest/compute/ItComputeControllerTest.java | 410 +++++++++++++++++++++ .../internal/rest/compute/ComputeController.java | 101 +++++ .../internal/rest/compute/ComputeRestFactory.java | 47 +++ .../exception/ComputeJobNotFoundException.java | 36 ++ .../exception/ComputeJobStateException.java | 37 ++ .../ComputeJobNotFoundExceptionHandler.java | 44 +++ .../handler/ComputeJobStateExceptionHandler.java | 43 +++ .../rest/matcher/MicronautHttpResponseMatcher.java | 119 ++++++ .../internal/rest/matcher/ProblemMatcher.java | 169 +++++++++ .../rest/matcher/RestJobStatusMatcher.java | 220 +++++++++++ .../org/apache/ignite/internal/app/IgniteImpl.java | 7 +- 25 files changed, 1972 insertions(+), 3 deletions(-) diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java index 22a0737a6a..679000389e 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute; +import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -103,6 +104,13 @@ public interface ComputeComponent extends IgniteComponent { return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units, jobClassName, args); } + /** + * Retrieves the current status of all jobs on all nodes in the cluster. + * + * @return The collection of job statuses. + */ + CompletableFuture<Collection<JobStatus>> statusesAsync(); + /** * Retrieves the current status of the job on any node in the cluster. The job status may be deleted and thus return {@code null} if the * time for retaining job status has been exceeded. diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index 1dc4598500..19762aa0fc 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.map import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; +import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -145,6 +146,11 @@ public class ComputeComponentImpl implements ComputeComponent { } } + @Override + public CompletableFuture<Collection<JobStatus>> statusesAsync() { + return messaging.broadcastStatusesAsync(); + } + @Override public CompletableFuture<@Nullable JobStatus> statusAsync(UUID jobId) { return messaging.broadcastStatusAsync(jobId); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java index 8a38cb32e4..d3da0d352f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java @@ -20,12 +20,16 @@ package org.apache.ignite.internal.compute; import org.apache.ignite.internal.compute.message.DeploymentUnitMsg; import org.apache.ignite.internal.compute.message.ExecuteRequest; import org.apache.ignite.internal.compute.message.ExecuteResponse; +import org.apache.ignite.internal.compute.message.JobCancelRequest; +import org.apache.ignite.internal.compute.message.JobCancelResponse; import org.apache.ignite.internal.compute.message.JobChangePriorityRequest; import org.apache.ignite.internal.compute.message.JobChangePriorityResponse; import org.apache.ignite.internal.compute.message.JobResultRequest; import org.apache.ignite.internal.compute.message.JobResultResponse; import org.apache.ignite.internal.compute.message.JobStatusRequest; import org.apache.ignite.internal.compute.message.JobStatusResponse; +import org.apache.ignite.internal.compute.message.JobStatusesRequest; +import org.apache.ignite.internal.compute.message.JobStatusesResponse; import org.apache.ignite.network.annotations.MessageGroup; /** @@ -71,4 +75,10 @@ public class ComputeMessageTypes { /** Type for {@link JobChangePriorityResponse}. */ public static final short JOB_CHANGE_PRIORITY_RESPONSE = 10; + + /** Type for {@link JobStatusesRequest}. */ + public static final short JOB_STATUSES_REQUEST = 11; + + /** Type for {@link JobStatusesResponse}. */ + public static final short JOB_STATUSES_RESPONSE = 12; } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java index 5d1f62d420..cf63bab9cc 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java @@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR; import java.lang.reflect.Constructor; +import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -37,6 +38,7 @@ import org.apache.ignite.internal.compute.message.JobCancelResponse; import org.apache.ignite.internal.compute.message.JobChangePriorityResponse; import org.apache.ignite.internal.compute.message.JobResultResponse; import org.apache.ignite.internal.compute.message.JobStatusResponse; +import org.apache.ignite.internal.compute.message.JobStatusesResponse; import org.jetbrains.annotations.Nullable; /** @@ -141,6 +143,21 @@ public class ComputeUtils { return completedFuture((R) jobResultResponse.result()); } + /** + * Extract compute job statuses from statuses response. + * + * @param jobStatusesResponse Job statuses result message response. + * @return Completable future with result. + */ + public static CompletableFuture<Collection<JobStatus>> statusesFromJobStatusesResponse(JobStatusesResponse jobStatusesResponse) { + Throwable throwable = jobStatusesResponse.throwable(); + if (throwable != null) { + return failedFuture(throwable); + } + + return completedFuture(jobStatusesResponse.statuses()); + } + /** * Extract compute job status from status response. * diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java index 4a7d20423f..064f9fa3ac 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java @@ -18,10 +18,14 @@ package org.apache.ignite.internal.compute; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Compute.RESULT_NOT_FOUND_ERR; +import java.util.Arrays; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +33,7 @@ import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobStatus; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; +import org.apache.ignite.internal.compute.messaging.RemoteJobExecution; import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; @@ -90,6 +95,23 @@ public class ExecutionManager { return failedFuture(new ComputeException(RESULT_NOT_FOUND_ERR, "Job result not found for the job with ID: " + jobId)); } + /** + * Retrieves the current status of all jobs in the local node. + * + * @return The set of all job statuses. + */ + public CompletableFuture<Set<JobStatus>> localStatusesAsync() { + CompletableFuture<JobStatus>[] statuses = executions.values().stream() + .filter(it -> !(it instanceof RemoteJobExecution)) + .map(JobExecution::statusAsync) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(statuses) + .thenApply(ignored -> Arrays.stream(statuses).map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(toSet())); + } + /** * Retrieves the current status of the job. * diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java new file mode 100644 index 0000000000..94dab7502d --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.message; + +import org.apache.ignite.internal.compute.ComputeMessageTypes; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Remote job statuses request. + */ +@Transferable(ComputeMessageTypes.JOB_STATUSES_REQUEST) +public interface JobStatusesRequest extends NetworkMessage { +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java new file mode 100644 index 0000000000..cd8c98ac31 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.message; + +import java.util.Collection; +import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.internal.compute.ComputeMessageTypes; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; +import org.jetbrains.annotations.Nullable; + +/** + * Remote job statuses response. + */ +@Transferable(ComputeMessageTypes.JOB_STATUSES_RESPONSE) +public interface JobStatusesResponse extends NetworkMessage { + @Nullable + @Marshallable + Collection<JobStatus> statuses(); + + @Nullable + @Marshallable + Throwable throwable(); +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java index cbe1f2285c..03d821746b 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java @@ -18,22 +18,25 @@ package org.apache.ignite.internal.compute.messaging; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.compute.ComputeUtils.cancelFromJobCancelResponse; import static org.apache.ignite.internal.compute.ComputeUtils.changePriorityFromJobChangePriorityResponse; import static org.apache.ignite.internal.compute.ComputeUtils.jobIdFromExecuteResponse; import static org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResultResponse; import static org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse; +import static org.apache.ignite.internal.compute.ComputeUtils.statusesFromJobStatusesResponse; import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import static org.apache.ignite.lang.ErrorGroups.Compute.CANCELLING_ERR; import static org.apache.ignite.lang.ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR; import static org.apache.ignite.lang.ErrorGroups.Compute.FAIL_TO_GET_JOB_STATUS_ERR; +import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; @@ -55,8 +58,11 @@ import org.apache.ignite.internal.compute.message.JobResultRequest; import org.apache.ignite.internal.compute.message.JobResultResponse; import org.apache.ignite.internal.compute.message.JobStatusRequest; import org.apache.ignite.internal.compute.message.JobStatusResponse; +import org.apache.ignite.internal.compute.message.JobStatusesRequest; +import org.apache.ignite.internal.compute.message.JobStatusesResponse; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.MessagingService; @@ -128,6 +134,8 @@ public class ComputeMessaging { sendExecuteResponse(null, ex, senderConsistentId, correlationId); } else if (message instanceof JobResultRequest) { sendJobResultResponse(null, ex, senderConsistentId, correlationId); + } else if (message instanceof JobStatusesRequest) { + sendJobStatusesResponse(null, ex, senderConsistentId, correlationId); } else if (message instanceof JobStatusRequest) { sendJobStatusResponse(null, ex, senderConsistentId, correlationId); } else if (message instanceof JobCancelRequest) { @@ -147,6 +155,8 @@ public class ComputeMessaging { processExecuteRequest(starter, (ExecuteRequest) message, senderConsistentId, correlationId); } else if (message instanceof JobResultRequest) { processJobResultRequest((JobResultRequest) message, senderConsistentId, correlationId); + } else if (message instanceof JobStatusesRequest) { + processJobStatusesRequest((JobStatusesRequest) message, senderConsistentId, correlationId); } else if (message instanceof JobStatusRequest) { processJobStatusRequest((JobStatusRequest) message, senderConsistentId, correlationId); } else if (message instanceof JobCancelRequest) { @@ -182,7 +192,7 @@ public class ComputeMessaging { ) { List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream() .map(ComputeUtils::toDeploymentUnitMsg) - .collect(Collectors.toList()); + .collect(toList()); ExecuteRequest executeRequest = messagesFactory.executeRequest() .executeOptions(options) @@ -247,6 +257,33 @@ public class ComputeMessaging { messagingService.respond(senderConsistentId, jobResultResponse, correlationId); } + CompletableFuture<Collection<JobStatus>> remoteStatusesAsync(ClusterNode remoteNode) { + JobStatusesRequest jobStatusRequest = messagesFactory.jobStatusesRequest() + .build(); + + return messagingService.invoke(remoteNode, jobStatusRequest, NETWORK_TIMEOUT_MILLIS) + .thenCompose(networkMessage -> statusesFromJobStatusesResponse((JobStatusesResponse) networkMessage)); + } + + private void processJobStatusesRequest(JobStatusesRequest message, String senderConsistentId, long correlationId) { + executionManager.localStatusesAsync() + .whenComplete((statuses, throwable) -> sendJobStatusesResponse(statuses, throwable, senderConsistentId, correlationId)); + } + + private void sendJobStatusesResponse( + @Nullable Collection<JobStatus> statuses, + @Nullable Throwable throwable, + String senderConsistentId, + Long correlationId + ) { + JobStatusesResponse jobStatusResponse = messagesFactory.jobStatusesResponse() + .statuses(statuses) + .throwable(throwable) + .build(); + + messagingService.respond(senderConsistentId, jobStatusResponse, correlationId); + } + /** * Gets compute job status from the remote node. * @@ -360,6 +397,26 @@ public class ComputeMessaging { messagingService.respond(senderConsistentId, jobChangePriorityResponse, correlationId); } + /** + * Broadcasts job statuses request to all nodes in the cluster. + * + * @return The future which will be completed with the collection of statuses from all nodes. + */ + public CompletableFuture<Collection<JobStatus>> broadcastStatusesAsync() { + return broadcastAsyncAndCollect( + node -> remoteStatusesAsync(node), + throwable -> new ComputeException( + FAIL_TO_GET_JOB_STATUS_ERR, + "Failed to retrieve statuses", + throwable + )).thenApply(statuses -> { + return statuses.stream() + .flatMap(Collection::stream) + .filter(Objects::nonNull) + .collect(toList()); + }); + } + /** * Broadcasts job status request to all nodes in the cluster. * @@ -448,6 +505,29 @@ public class ComputeMessaging { result.completeExceptionally(error.apply(throwable)); } }); + + return result; + } + + private <R> CompletableFuture<Collection<R>> broadcastAsyncAndCollect( + Function<ClusterNode, CompletableFuture<@Nullable R>> request, + Function<Throwable, Throwable> error + ) { + CompletableFuture<Collection<R>> result = new CompletableFuture<>(); + + CompletableFuture<R>[] futures = topologyService.allMembers() + .stream() + .map(request::apply) + .toArray(CompletableFuture[]::new); + + CompletableFutures.allOf(futures).whenComplete((collection, throwable) -> { + if (throwable == null) { + result.complete(collection); + } else { + result.completeExceptionally(error.apply(throwable)); + } + }); + return result; } } diff --git a/modules/rest-api/openapi/openapi.yaml b/modules/rest-api/openapi/openapi.yaml index 0e691aacf7..2cd3bc2e4d 100644 --- a/modules/rest-api/openapi/openapi.yaml +++ b/modules/rest-api/openapi/openapi.yaml @@ -118,6 +118,124 @@ paths: application/problem+json: schema: $ref: '#/components/schemas/Problem' + /management/v1/compute/jobs: + get: + tags: + - compute + summary: Retrieve all job statuses + description: Fetches the current statuses of all compute jobs. + operationId: jobStatuses + responses: + "200": + description: Successful retrieval of job statuses. + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/JobStatus' + /management/v1/compute/jobs/{jobId}: + get: + tags: + - compute + summary: Retrieve a job status + description: Fetches the current status of a specific compute job identified + by jobId. + operationId: jobStatus + parameters: + - name: jobId + in: path + description: The unique identifier of the compute job. + required: true + schema: + type: string + description: The unique identifier of the compute job. + format: uuid + responses: + "200": + description: Successful retrieval of the job status. + content: + application/json: + schema: + $ref: '#/components/schemas/JobStatus' + "404": + description: Compute job not found. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + delete: + tags: + - compute + summary: Cancel a job + description: Cancels a specific compute job identified by jobId. + operationId: cancelJob + parameters: + - name: jobId + in: path + description: The unique identifier of the compute job. + required: true + schema: + type: string + description: The unique identifier of the compute job. + format: uuid + responses: + "200": + description: Successful cancellation of the job. + content: + application/json: {} + "404": + description: Compute job not found. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + "409": + description: Compute job is in an illegal state. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + /management/v1/compute/jobs/{jobId}/priority: + put: + tags: + - compute + summary: Update a job's priority + description: Updates the priority of a specific compute job identified by jobId. + operationId: updatePriority + parameters: + - name: jobId + in: path + description: The unique identifier of the compute job. + required: true + schema: + type: string + description: The unique identifier of the compute job. + format: uuid + requestBody: + description: The new priority data for the job. + content: + application/json: + schema: + $ref: '#/components/schemas/UpdateJobPriorityBody' + required: true + responses: + "200": + description: Successful update of the job priority. + content: + application/json: {} + "404": + description: Compute job not found. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + "409": + description: Compute job is in an illegal state. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' /management/v1/configuration/cluster: get: tags: @@ -818,6 +936,47 @@ components: type: string description: The issue with the parameter. description: Information about invalid request parameter. + JobState: + type: string + description: Job state. + enum: + - QUEUED + - EXECUTING + - FAILED + - COMPLETED + - CANCELING + - CANCELED + JobStatus: + required: + - createTime + - id + - state + type: object + properties: + id: + type: string + description: Job ID. + format: uuid + state: + description: Job state. + allOf: + - $ref: '#/components/schemas/JobState' + - {} + createTime: + type: string + description: Job create time. + format: date-time + startTime: + type: string + description: Job start time. + format: date-time + nullable: true + finishTime: + type: string + description: Job finish time. + format: date-time + nullable: true + description: Rest representation of org.apache.ignite.compute.JobStatus. Metric: type: object properties: @@ -982,6 +1141,16 @@ components: - $ref: '#/components/schemas/DeploymentStatus' - description: Unit status. description: Unit version and status. + UpdateJobPriorityBody: + required: + - priority + type: object + properties: + priority: + type: integer + description: Priority. + format: int32 + description: DTO of update job priority request body. deployMode: type: string description: Initial set of nodes to deploy. diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java new file mode 100644 index 0000000000..41f9b08fac --- /dev/null +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.compute; + +import static io.swagger.v3.oas.annotations.media.Schema.RequiredMode.REQUIRED; +import static org.apache.ignite.internal.rest.constants.MediaType.APPLICATION_JSON; + +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Delete; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Put; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.rest.api.Problem; + +/** + * API for managing compute tasks. + */ +@Controller("/management/v1/compute/") +@Tag(name = "compute") +public interface ComputeApi { + /** + * Retrieves the statuses of all compute jobs. + * + * @return A collection of compute job statuses. + */ + @Operation(summary = "Retrieve all job statuses", description = "Fetches the current statuses of all compute jobs.") + @ApiResponse( + responseCode = "200", + description = "Successful retrieval of job statuses.", + content = @Content(mediaType = APPLICATION_JSON, array = @ArraySchema(schema = @Schema(implementation = JobStatus.class))) + ) + @Get("jobs") + CompletableFuture<Collection<JobStatus>> jobStatuses(); + + /** + * Retrieves the status of a specific compute job. + * + * @param jobId The unique identifier of the compute job. + * @return The status of the specified compute job. + */ + @Operation(summary = "Retrieve a job status", description = "Fetches the current status of a specific compute job identified by jobId.") + @ApiResponse( + responseCode = "200", + description = "Successful retrieval of the job status.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = JobStatus.class)) + ) + @ApiResponse( + responseCode = "404", + description = "Compute job not found.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = Problem.class)) + ) + @Get("jobs/{jobId}") + CompletableFuture<JobStatus> jobStatus( + @Schema(name = "jobId", description = "The unique identifier of the compute job.", requiredMode = REQUIRED) UUID jobId + ); + + /** + * Updates the priority of a compute job. + * + * @param jobId The unique identifier of the compute job. + * @param updateJobPriorityBody The new priority data for the job. + * @return The result of the operation. + */ + @Operation(summary = "Update a job's priority", description = "Updates the priority of a specific compute job identified by jobId.") + @ApiResponse( + responseCode = "200", + description = "Successful update of the job priority.", + content = @Content(mediaType = APPLICATION_JSON) + ) + @ApiResponse( + responseCode = "404", + description = "Compute job not found.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = Problem.class)) + ) + @ApiResponse( + responseCode = "409", + description = "Compute job is in an illegal state.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = Problem.class)) + ) + @Put("jobs/{jobId}/priority") + CompletableFuture<Void> updatePriority( + @Schema(name = "jobId", description = "The unique identifier of the compute job.", requiredMode = REQUIRED) UUID jobId, + @Body UpdateJobPriorityBody updateJobPriorityBody + ); + + /** + * Cancels a specific compute job. + * + * @param jobId The unique identifier of the compute job. + * @return The result of the cancellation operation. + */ + @Operation(summary = "Cancel a job", description = "Cancels a specific compute job identified by jobId.") + @ApiResponse( + responseCode = "200", + description = "Successful cancellation of the job.", + content = @Content(mediaType = APPLICATION_JSON) + ) + @ApiResponse( + responseCode = "404", + description = "Compute job not found.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = Problem.class)) + ) + @ApiResponse( + responseCode = "409", + description = "Compute job is in an illegal state.", + content = @Content(mediaType = APPLICATION_JSON, schema = @Schema(implementation = Problem.class)) + ) + @Delete("jobs/{jobId}") + CompletableFuture<Void> cancelJob( + @Schema(name = "jobId", description = "The unique identifier of the compute job.", requiredMode = REQUIRED) UUID jobId + ); +} diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java new file mode 100644 index 0000000000..531e14bf5b --- /dev/null +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.compute; + +/** + * Rest representation of {@link org.apache.ignite.compute.JobState}. + */ +public enum JobState { + /** + * The job is submitted and waiting for an execution start. + */ + QUEUED, + + /** + * The job is being executed. + */ + EXECUTING, + + /** + * The job was unexpectedly terminated during execution. + */ + FAILED, + + /** + * The job was executed successfully and the execution result was returned. + */ + COMPLETED, + + /** + * The job has received the cancel command, but it is still running. + */ + CANCELING, + + /** + * The job was successfully cancelled. + */ + CANCELED; +} diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java new file mode 100644 index 0000000000..f7d17008dd --- /dev/null +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.compute; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.media.Schema.RequiredMode; +import java.time.Instant; +import java.util.UUID; +import org.jetbrains.annotations.Nullable; + +/** + * Rest representation of {@link org.apache.ignite.compute.JobStatus}. + */ +@Schema(name = "JobStatus") +public class JobStatus { + /** + * Job ID. + */ + @Schema(description = "Job ID.", requiredMode = RequiredMode.REQUIRED) + private final UUID id; + + /** + * Job state. + */ + @Schema(description = "Job state.", requiredMode = RequiredMode.REQUIRED) + private final JobState state; + + /** + * Job create time. + */ + @Schema(description = "Job create time.", requiredMode = RequiredMode.REQUIRED) + private final Instant createTime; + + /** + * Job start time. + */ + @Schema(description = "Job start time.", requiredMode = RequiredMode.NOT_REQUIRED) + @Nullable + private final Instant startTime; + + /** + * Job finish time. + */ + @Schema(description = "Job finish time.", requiredMode = RequiredMode.NOT_REQUIRED) + @Nullable + private final Instant finishTime; + + /** + * Constructor. + * + * @param id Job ID. + * @param state Job state. + * @param createTime Job create time. + * @param startTime Job start time. + * @param finishTime Job finish time. + */ + @JsonCreator + public JobStatus( + @JsonProperty("id") UUID id, + @JsonProperty("state") JobState state, + @JsonProperty("createTime") Instant createTime, + @JsonProperty("startTime") @Nullable Instant startTime, + @JsonProperty("finishTime") @Nullable Instant finishTime + ) { + this.id = id; + this.state = state; + this.createTime = createTime; + this.startTime = startTime; + this.finishTime = finishTime; + } + + @JsonProperty("id") + public UUID id() { + return id; + } + + @JsonProperty("state") + public JobState state() { + return state; + } + + @JsonProperty("createTime") + public Instant createTime() { + return createTime; + } + + @Nullable + @JsonProperty("startTime") + public Instant startTime() { + return startTime; + } + + @Nullable + @JsonProperty("finishTime") + public Instant finishTime() { + return finishTime; + } +} diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java new file mode 100644 index 0000000000..fc05fe86cb --- /dev/null +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.api.compute; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.media.Schema.RequiredMode; + +/** + * DTO of update job priority request body. + */ +@Schema(name = "UpdateJobPriorityBody") +public class UpdateJobPriorityBody { + /** + * Priority. + */ + @Schema(description = "Priority.", requiredMode = RequiredMode.REQUIRED) + private final int priority; + + @JsonCreator + public UpdateJobPriorityBody(@JsonProperty("priority") int priority) { + this.priority = priority; + } + + @JsonGetter("priority") + public int priority() { + return priority; + } +} diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle index 4810db7969..f156360b91 100644 --- a/modules/rest/build.gradle +++ b/modules/rest/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation project(':ignite-metrics') implementation project(':ignite-code-deployment') implementation project(':ignite-security-api') + implementation project(':ignite-compute') implementation libs.jetbrains.annotations implementation libs.micronaut.inject implementation libs.micronaut.http.server.netty @@ -78,6 +79,7 @@ dependencies { integrationTestImplementation testFixtures(project(':ignite-cluster-management')) integrationTestImplementation testFixtures(project(':ignite-configuration')) integrationTestImplementation testFixtures(project(":ignite-api")) + integrationTestImplementation testFixtures(project(":ignite-rest")) integrationTestImplementation libs.awaitility integrationTestImplementation libs.micronaut.junit5 integrationTestImplementation libs.micronaut.test @@ -90,4 +92,9 @@ dependencies { //So, exclude asm-core transitive dependency to protect of jar-hell. exclude group: 'org.ow2.asm', module: 'asm' } + + testFixturesImplementation(project(":ignite-rest-api")) + testFixturesImplementation testFixtures(project(":ignite-core")) + testFixturesImplementation libs.hamcrest.core + testFixturesImplementation libs.micronaut.http.core } diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java new file mode 100644 index 0000000000..023e4c8dc0 --- /dev/null +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute; + +import static io.micronaut.http.HttpRequest.DELETE; +import static io.micronaut.http.HttpRequest.PUT; +import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem; +import static org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.canceled; +import static org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.completed; +import static org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.executing; +import static org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.queued; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.micronaut.core.type.Argument; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.api.compute.JobStatus; +import org.apache.ignite.internal.rest.api.compute.UpdateJobPriorityBody; +import org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher; +import org.apache.ignite.network.ClusterNode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for {@link ComputeController}. + */ +@MicronautTest +public class ItComputeControllerTest extends ClusterPerClassIntegrationTest { + private static final String COMPUTE_URL = "/management/v1/compute/"; + + private static final Object LOCK = new Object(); + + @Inject + @Client("http://localhost:10300" + COMPUTE_URL) + HttpClient client0; + + @Inject + @Client("http://localhost:10301" + COMPUTE_URL) + HttpClient client1; + + @Inject + @Client("http://localhost:10303" + COMPUTE_URL) + HttpClient client2; + + @Override + protected String getNodeBootstrapConfigTemplate() { + return "{\n" + + " network: {\n" + + " port: {},\n" + + " nodeFinder: {\n" + + " netClusterNodes: [ {} ]\n" + + " }\n" + + " },\n" + + " clientConnector: { port:{} },\n" + + " rest.port: {},\n" + + " compute.threadPoolSize: 1 \n" + + "}"; + } + + @AfterEach + void tearDown() { + // Cancel all jobs. + getJobStatuses(client0).values().stream() + .filter(it -> it.finishTime() == null) + .map(JobStatus::id) + .forEach(jobId -> cancelJob(client0, jobId)); + + // Wait for all jobs to complete. + await().until(() -> { + Collection<JobStatus> statuses = getJobStatuses(client0).values(); + + for (JobStatus status : statuses) { + if (status.finishTime() == null) { + return false; + } + } + + return true; + }); + } + + @Test + void shouldReturnStatusesOfAllJobs() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> localExecution = runBlockingJob(entryNode, Set.of(entryNode.node())); + + JobExecution<String> remoteExecution = runBlockingJob(entryNode, Set.of(CLUSTER.node(1).node())); + + UUID localJobId = localExecution.idAsync().join(); + UUID remoteJobId = remoteExecution.idAsync().join(); + + await().untilAsserted(() -> { + Map<UUID, JobStatus> statuses = getJobStatuses(client0); + + assertThat(statuses.get(localJobId), executing(localJobId)); + assertThat(statuses.get(remoteJobId), executing(remoteJobId)); + }); + } + + @Test + void shouldReturnStatusOfLocalJob() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> execution = runBlockingJob(entryNode, Set.of(entryNode.node())); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + unblockJob(); + + await().until(() -> getJobStatus(client0, jobId), completed(jobId)); + } + + @Test + void shouldReturnStatusOfRemoteJob() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> execution = runBlockingJob(entryNode, Set.of(CLUSTER.node(1).node())); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + unblockJob(); + + await().until(() -> getJobStatus(client0, jobId), completed(jobId)); + } + + @Test + void shouldReturnProblemIfStatusOfNonExistingJob() { + UUID jobId = UUID.randomUUID(); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> getJobStatus(client0, jobId) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(404) + .withBody(isProblem().withStatus(404).withDetail("Compute job not found [jobId=" + jobId + "]"), Problem.class) + ); + } + + @Test + void shouldCancelJobLocally() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> execution = runBlockingJob(entryNode, Set.of(entryNode.node())); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + cancelJob(client0, jobId); + + await().until(() -> getJobStatus(client0, jobId), canceled(jobId, true)); + } + + @Test + void shouldCancelJobRemotely() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> execution = runBlockingJob(entryNode, Set.of(CLUSTER.node(1).node())); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + cancelJob(client0, jobId); + + await().until(() -> getJobStatus(client0, jobId), canceled(jobId, true)); + } + + @Test + void shouldReturnProblemIfCancelNonExistingJob() { + UUID jobId = UUID.randomUUID(); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> cancelJob(client0, jobId) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(404) + .withBody(isProblem().withStatus(404).withDetail("Compute job not found [jobId=" + jobId + "]"), Problem.class) + ); + } + + @Test + void shouldReturnFalseIfCancelCompletedJob() { + IgniteImpl entryNode = CLUSTER.node(0); + + JobExecution<String> execution = runBlockingJob(entryNode, Set.of(entryNode.node())); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + unblockJob(); + + await().until(() -> getJobStatus(client0, jobId), completed(jobId)); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> cancelJob(client0, jobId) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(409) + .withBody(isProblem().withStatus(409) + .withDetail("Compute job is in illegal state [jobId=" + jobId + ", state=COMPLETED]"), Problem.class) + ); + } + + @Test + void shouldUpdatePriorityLocally() { + IgniteImpl entryNode = CLUSTER.node(0); + + Set<ClusterNode> nodes = Set.of(entryNode.node()); + + JobExecution<String> execution = runBlockingJob(entryNode, nodes); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + JobExecution<String> execution2 = runBlockingJob(entryNode, nodes); + + UUID jobId2 = execution2.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId2), queued(jobId2)); + + updatePriority(client0, jobId2, 1); + } + + @Test + void shouldUpdatePriorityRemotely() { + IgniteImpl entryNode = CLUSTER.node(0); + + Set<ClusterNode> nodes = Set.of(CLUSTER.node(1).node()); + + JobExecution<String> execution = runBlockingJob(entryNode, nodes); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + JobExecution<String> execution2 = runBlockingJob(entryNode, nodes); + + UUID jobId2 = execution2.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId2), queued(jobId2)); + + updatePriority(client0, jobId2, 1); + } + + @Test + void shouldReturnProblemIfUpdatePriorityOfNonExistingJob() { + UUID jobId = UUID.randomUUID(); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> updatePriority(client0, jobId, 1) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(404) + .withBody(isProblem().withStatus(404).withDetail("Compute job not found [jobId=" + jobId + "]"), Problem.class) + ); + } + + @Test + void shouldReturnFalseIfUpdatePriorityOfRunningJob() { + IgniteImpl entryNode = CLUSTER.node(0); + + Set<ClusterNode> nodes = Set.of(entryNode.node()); + + JobExecution<String> execution = runBlockingJob(entryNode, nodes); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> updatePriority(client0, jobId, 1) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(409) + .withBody(isProblem().withStatus(409) + .withDetail("Compute job is in illegal state [jobId=" + jobId + ", state=EXECUTING]"), Problem.class) + ); + } + + @Test + void shouldReturnFalseIfUpdatePriorityOfCompletedJob() { + IgniteImpl entryNode = CLUSTER.node(0); + + Set<ClusterNode> nodes = Set.of(entryNode.node()); + + JobExecution<String> execution = runBlockingJob(entryNode, nodes); + + UUID jobId = execution.idAsync().join(); + + await().until(() -> getJobStatus(client0, jobId), executing(jobId)); + + unblockJob(); + + await().until(() -> getJobStatus(client0, jobId), completed(jobId)); + + HttpClientResponseException httpClientResponseException = assertThrows( + HttpClientResponseException.class, + () -> updatePriority(client0, jobId, 1) + ); + + assertThat( + httpClientResponseException.getResponse(), + MicronautHttpResponseMatcher.<Problem>hasStatusCode(409) + .withBody(isProblem().withStatus(409) + .withDetail("Compute job is in illegal state [jobId=" + jobId + ", state=COMPLETED]"), Problem.class) + ); + } + + private static JobExecution<String> runBlockingJob(IgniteImpl entryNode, Set<ClusterNode> nodes) { + return entryNode.compute().executeAsync(nodes, List.of(), BlockingJob.class.getName()); + } + + private static void unblockJob() { + synchronized (LOCK) { + LOCK.notifyAll(); + } + } + + private static Map<UUID, JobStatus> getJobStatuses(HttpClient client) { + List<JobStatus> statuses = client.toBlocking() + .retrieve(HttpRequest.GET("/jobs"), Argument.listOf(JobStatus.class)); + + return statuses.stream().collect(Collectors.toMap(JobStatus::id, s -> s)); + } + + private static JobStatus getJobStatus(HttpClient client, UUID jobId) { + return client.toBlocking().retrieve("/jobs/" + jobId, JobStatus.class); + } + + private static void updatePriority(HttpClient client, UUID jobId, int priority) { + client.toBlocking() + .exchange(PUT("/jobs/" + jobId + "/priority", new UpdateJobPriorityBody(priority))); + } + + private static void cancelJob(HttpClient client, UUID jobId) { + client.toBlocking().exchange(DELETE("/jobs/" + jobId)); + } + + private static class BlockingJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + synchronized (LOCK) { + try { + LOCK.wait(); + } catch (InterruptedException e) { + // No-op. + } + } + + return null; + } + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java new file mode 100644 index 0000000000..0909f41a75 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute; + +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import io.micronaut.http.annotation.Controller; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.compute.ComputeComponent; +import org.apache.ignite.internal.rest.api.compute.ComputeApi; +import org.apache.ignite.internal.rest.api.compute.JobState; +import org.apache.ignite.internal.rest.api.compute.JobStatus; +import org.apache.ignite.internal.rest.api.compute.UpdateJobPriorityBody; +import org.apache.ignite.internal.rest.compute.exception.ComputeJobNotFoundException; +import org.apache.ignite.internal.rest.compute.exception.ComputeJobStateException; +import org.jetbrains.annotations.Nullable; + +/** + * REST controller for compute operations. + */ +@Controller +public class ComputeController implements ComputeApi { + private final ComputeComponent computeComponent; + + public ComputeController(ComputeComponent computeComponent) { + this.computeComponent = computeComponent; + } + + @Override + public CompletableFuture<Collection<JobStatus>> jobStatuses() { + return computeComponent.statusesAsync() + .thenApply(statuses -> statuses.stream().map(ComputeController::toJobStatus).collect(toList())); + } + + @Override + public CompletableFuture<JobStatus> jobStatus(UUID jobId) { + return jobStatus0(jobId); + } + + @Override + public CompletableFuture<Void> updatePriority(UUID jobId, UpdateJobPriorityBody updateJobPriorityBody) { + return computeComponent.changePriorityAsync(jobId, updateJobPriorityBody.priority()) + .thenCompose(result -> handleOperationResult(jobId, result)); + } + + @Override + public CompletableFuture<Void> cancelJob(UUID jobId) { + return computeComponent.cancelAsync(jobId) + .thenCompose(result -> handleOperationResult(jobId, result)); + } + + private CompletableFuture<Void> handleOperationResult(UUID jobId, @Nullable Boolean result) { + if (result == null) { + return failedFuture(new ComputeJobNotFoundException(jobId.toString())); + } else if (!result) { + return jobStatus0(jobId).thenCompose(status -> failedFuture(new ComputeJobStateException(jobId.toString(), status.state()))); + } else { + return nullCompletedFuture(); + } + } + + private CompletableFuture<JobStatus> jobStatus0(UUID jobId) { + return computeComponent.statusAsync(jobId) + .thenApply(status -> { + if (status == null) { + throw new ComputeJobNotFoundException(jobId.toString()); + } else { + return toJobStatus(status); + } + }); + } + + private static JobStatus toJobStatus(org.apache.ignite.compute.JobStatus jobStatus) { + return new JobStatus( + jobStatus.id(), + JobState.valueOf(jobStatus.state().toString()), + jobStatus.createTime(), + jobStatus.startTime(), + jobStatus.finishTime() + ); + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java new file mode 100644 index 0000000000..97878eb356 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute; + +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Singleton; +import org.apache.ignite.internal.compute.ComputeComponent; +import org.apache.ignite.internal.rest.RestFactory; + +/** + * Factory that creates beans that are needed for {@link ComputeController}. + */ +@Factory +public class ComputeRestFactory implements RestFactory { + private ComputeComponent computeComponent; + + public ComputeRestFactory(ComputeComponent computeComponent) { + this.computeComponent = computeComponent; + } + + @Bean + @Singleton + public ComputeComponent computeComponent() { + return computeComponent; + } + + @Override + public void cleanResources() { + computeComponent = null; + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java new file mode 100644 index 0000000000..d43cb47214 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute.exception; + +import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR; + +import org.apache.ignite.internal.lang.IgniteInternalException; + +/** + * Thrown when job not found. + */ +public class ComputeJobNotFoundException extends IgniteInternalException { + /** + * Constructor. + * + * @param jobId Job ID. + */ + public ComputeJobNotFoundException(String jobId) { + super(ILLEGAL_ARGUMENT_ERR, "Compute job not found [jobId=" + jobId + ']'); + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java new file mode 100644 index 0000000000..2d0b1b009e --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute.exception; + +import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR; + +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.rest.api.compute.JobState; + +/** + * Thrown when compute job is in illegal state. + */ +public class ComputeJobStateException extends IgniteInternalException { + /** + * Constructor. + * + * @param jobId Job ID. + */ + public ComputeJobStateException(String jobId, JobState state) { + super(ILLEGAL_ARGUMENT_ERR, "Compute job is in illegal state [jobId=" + jobId + ", state=" + state + ']'); + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java new file mode 100644 index 0000000000..c667e59b66 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute.exception.handler; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.compute.exception.ComputeJobNotFoundException; +import org.apache.ignite.internal.rest.constants.HttpCode; +import org.apache.ignite.internal.rest.problem.HttpProblemResponse; + +/** + * REST exception handler for {@link ComputeJobNotFoundExceptionHandler}. + */ +@Singleton +@Requires(classes = {ComputeJobNotFoundExceptionHandler.class, ExceptionHandler.class}) +public class ComputeJobNotFoundExceptionHandler implements + ExceptionHandler<ComputeJobNotFoundException, HttpResponse<? extends Problem>> { + @Override + public HttpResponse<? extends Problem> handle(HttpRequest request, ComputeJobNotFoundException exception) { + return HttpProblemResponse.from( + Problem.fromHttpCode(HttpCode.NOT_FOUND) + .detail(exception.getMessage()).build() + ); + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java new file mode 100644 index 0000000000..f5e345c4e6 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.compute.exception.handler; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.compute.exception.ComputeJobStateException; +import org.apache.ignite.internal.rest.constants.HttpCode; +import org.apache.ignite.internal.rest.problem.HttpProblemResponse; + +/** + * REST exception handler for {@link ComputeJobStateException}. + */ +@Singleton +@Requires(classes = {ComputeJobStateException.class, ExceptionHandler.class}) +public class ComputeJobStateExceptionHandler implements ExceptionHandler<ComputeJobStateException, HttpResponse<? extends Problem>> { + @Override + public HttpResponse<? extends Problem> handle(HttpRequest request, ComputeJobStateException exception) { + return HttpProblemResponse.from( + Problem.fromHttpCode(HttpCode.CONFLICT) + .detail(exception.getMessage()).build() + ); + } +} diff --git a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java new file mode 100644 index 0000000000..d1163b55b7 --- /dev/null +++ b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.matcher; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matcher for {@link HttpResponse}. + */ +public class MicronautHttpResponseMatcher<T> extends TypeSafeMatcher<HttpResponse<?>> { + private final Matcher<Integer> statusCodeMatcher; + + private Matcher<T> bodyMatcher; + + private Class<T> body; + + private MicronautHttpResponseMatcher(Matcher<Integer> statusCodeMatcher) { + this.statusCodeMatcher = statusCodeMatcher; + } + + /** + * Creates a matcher that matches when the examined {@link HttpResponse} has a status that matches the specified status. + * + * @param status Expected status. + * @return Matcher. + */ + public static <T> MicronautHttpResponseMatcher<T> hasStatus(HttpStatus status) { + return new MicronautHttpResponseMatcher<>(is(status.getCode())); + } + + /** + * Creates a matcher that matches when the examined {@link HttpResponse} has a status code that matches the specified status code. + * + * @param statusCode Expected status code. + * @return Matcher. + */ + public static <T> MicronautHttpResponseMatcher<T> hasStatusCode(int statusCode) { + return new MicronautHttpResponseMatcher<>(is(statusCode)); + } + + /** + * Sets the expected body. + * + * @param body Body to match. + * @return Matcher. + */ + public MicronautHttpResponseMatcher<T> withBody(T body) { + this.bodyMatcher = equalTo(body); + this.body = (Class<T>) body.getClass(); + return this; + } + + /** + * Sets the body matcher. + * + * @param bodyMatcher Body matcher. + * @param body Body class. + * @return Matcher. + */ + public MicronautHttpResponseMatcher<T> withBody(Matcher<T> bodyMatcher, Class<T> body) { + this.bodyMatcher = bodyMatcher; + this.body = body; + return this; + } + + @Override + protected boolean matchesSafely(HttpResponse<?> httpResponse) { + if (!statusCodeMatcher.matches(httpResponse.code())) { + return false; + } + + if (bodyMatcher != null && !bodyMatcher.matches(httpResponse.getBody(body).get())) { + return false; + } + + return true; + } + + @Override + public void describeTo(Description description) { + if (statusCodeMatcher != null) { + description.appendText("an HttpResponse with status code matching ").appendDescriptionOf(statusCodeMatcher); + } + + if (bodyMatcher != null) { + description.appendText(" and body ").appendDescriptionOf(bodyMatcher); + } + } + + @Override + protected void describeMismatchSafely(HttpResponse<?> item, Description mismatchDescription) { + mismatchDescription.appendText("status code was ") + .appendValue(item.code()) + .appendText(" and body was ") + .appendValue(item.getBody(String.class)); + } +} diff --git a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java new file mode 100644 index 0000000000..4faf8040db --- /dev/null +++ b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.matcher; + +import static org.hamcrest.Matchers.equalTo; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.rest.api.InvalidParam; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.testframework.matchers.AnythingMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matcher for {@link Problem}. + */ +public class ProblemMatcher extends TypeSafeMatcher<Problem> { + private Matcher<String> titleMatcher = AnythingMatcher.anything(); + + private Matcher<Integer> statusMatcher = AnythingMatcher.anything(); + + private Matcher<String> codeMatcher = AnythingMatcher.anything(); + + private Matcher<String> typeMatcher = AnythingMatcher.anything(); + + private Matcher<String> detailMatcher = AnythingMatcher.anything(); + + private Matcher<String> nodeMatcher = AnythingMatcher.anything(); + + private Matcher<UUID> traceIdMatcher = AnythingMatcher.anything(); + + private Matcher<Collection<InvalidParam>> invalidParamsMatcher = AnythingMatcher.anything(); + + /** + * Creates a matcher for {@link Problem}. + * + * @return Matcher. + */ + public static ProblemMatcher isProblem() { + return new ProblemMatcher(); + } + + public ProblemMatcher withTitle(String title) { + return withTitle(equalTo(title)); + } + + public ProblemMatcher withTitle(Matcher<String> matcher) { + this.titleMatcher = matcher; + return this; + } + + public ProblemMatcher withStatus(Integer status) { + return withStatus(equalTo(status)); + } + + public ProblemMatcher withStatus(Matcher<Integer> matcher) { + this.statusMatcher = matcher; + return this; + } + + public ProblemMatcher withCode(String code) { + return withCode(equalTo(code)); + } + + public ProblemMatcher withCode(Matcher<String> matcher) { + this.codeMatcher = matcher; + return this; + } + + public ProblemMatcher withType(String type) { + return withType(equalTo(type)); + } + + public ProblemMatcher withType(Matcher<String> matcher) { + this.typeMatcher = matcher; + return this; + } + + public ProblemMatcher withDetail(String detail) { + return withDetail(equalTo(detail)); + } + + public ProblemMatcher withDetail(Matcher<String> matcher) { + this.detailMatcher = matcher; + return this; + } + + public ProblemMatcher withNode(String node) { + return withNode(equalTo(node)); + } + + public ProblemMatcher withNode(Matcher<String> matcher) { + this.nodeMatcher = matcher; + return this; + } + + public ProblemMatcher withTraceId(UUID traceId) { + return withTraceId(equalTo(traceId)); + } + + public ProblemMatcher withTraceId(Matcher<UUID> matcher) { + this.traceIdMatcher = matcher; + return this; + } + + public ProblemMatcher withInvalidParams(Collection<InvalidParam> invalidParams) { + return withInvalidParams(equalTo(invalidParams)); + } + + public ProblemMatcher withInvalidParams(Matcher<Collection<InvalidParam>> matcher) { + this.invalidParamsMatcher = matcher; + return this; + } + + @Override + protected boolean matchesSafely(Problem problem) { + return titleMatcher.matches(problem.title()) + && statusMatcher.matches(problem.status()) + && codeMatcher.matches(problem.code()) + && typeMatcher.matches(problem.type()) + && detailMatcher.matches(problem.detail()) + && nodeMatcher.matches(problem.node()) + && traceIdMatcher.matches(problem.traceId()) + && invalidParamsMatcher.matches(problem.invalidParams()); + } + + @Override + protected void describeMismatchSafely(Problem item, Description mismatchDescription) { + mismatchDescription.appendText("was a Problem with ") + .appendText("title: ").appendValue(item.title()) + .appendText(", status: ").appendValue(item.status()) + .appendText(", code: ").appendValue(item.code()) + .appendText(", type: ").appendValue(item.type()) + .appendText(", detail: ").appendValue(item.detail()) + .appendText(", node: ").appendValue(item.node()) + .appendText(", traceId: ").appendValue(item.traceId()) + .appendText(", invalidParams: ").appendValue(item.invalidParams()); + } + + @Override + public void describeTo(Description description) { + description.appendText("a Problem with ") + .appendText("title ").appendDescriptionOf(titleMatcher) + .appendText(", status ").appendDescriptionOf(statusMatcher) + .appendText(", code ").appendDescriptionOf(codeMatcher) + .appendText(", type ").appendDescriptionOf(typeMatcher) + .appendText(", detail ").appendDescriptionOf(detailMatcher) + .appendText(", node ").appendDescriptionOf(nodeMatcher) + .appendText(", traceId ").appendDescriptionOf(traceIdMatcher) + .appendText(" and invalidParams ").appendDescriptionOf(invalidParamsMatcher); + } +} diff --git a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java new file mode 100644 index 0000000000..1d044f2673 --- /dev/null +++ b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.matcher; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +import java.time.Instant; +import java.util.UUID; +import org.apache.ignite.internal.rest.api.compute.JobState; +import org.apache.ignite.internal.rest.api.compute.JobStatus; +import org.apache.ignite.internal.testframework.matchers.AnythingMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matcher for {@link JobStatus}. + */ +public class RestJobStatusMatcher extends TypeSafeMatcher<JobStatus> { + private Matcher<JobState> stateMatcher = AnythingMatcher.anything(); + private Matcher<UUID> idMatcher = AnythingMatcher.anything(); + private Matcher<Instant> createTimeMatcher = AnythingMatcher.anything(); + private Matcher<Instant> startTimeMatcher = AnythingMatcher.anything(); + private Matcher<Instant> finishTimeMatcher = AnythingMatcher.anything(); + + public static RestJobStatusMatcher isJobStatus() { + return new RestJobStatusMatcher(); + } + + public static RestJobStatusMatcher queued(UUID id) { + return queued(equalTo(id)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#QUEUED}. + * + * @param idMatcher Id matcher. + * @return Matcher. + */ + public static RestJobStatusMatcher queued(Matcher<UUID> idMatcher) { + return isJobStatus().withId(idMatcher) + .withState(JobState.QUEUED) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(nullValue(Instant.class)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#EXECUTING}. + * + * @param id Id. + * @return Matcher. + */ + public static RestJobStatusMatcher executing(UUID id) { + return isJobStatus().withId(id) + .withState(JobState.EXECUTING) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(notNullValue(Instant.class)) + .withFinishTime(nullValue(Instant.class)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#FAILED}. + * + * @param id Id. + * @param wasRunning Whether the job was running before it failed. + * @return Matcher. + */ + public static RestJobStatusMatcher failed(UUID id, boolean wasRunning) { + return isJobStatus().withId(id) + .withState(JobState.FAILED) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(wasRunning ? notNullValue(Instant.class) : AnythingMatcher.anything()) + .withFinishTime(notNullValue(Instant.class)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#COMPLETED}. + * + * @param id Id. + * @return Matcher. + */ + public static RestJobStatusMatcher completed(UUID id) { + return isJobStatus().withId(id) + .withState(JobState.COMPLETED) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(notNullValue(Instant.class)) + .withFinishTime(notNullValue(Instant.class)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#CANCELING}. + * + * @param id Id. + * @param wasRunning Whether the job was running before it was canceled. + * @return Matcher. + */ + public static RestJobStatusMatcher canceling(UUID id, boolean wasRunning) { + return isJobStatus().withId(id) + .withState(JobState.CANCELING) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(wasRunning ? notNullValue(Instant.class) : AnythingMatcher.anything()) + .withFinishTime(notNullValue(Instant.class)); + } + + /** + * Creates a matcher that matches when the examined {@link JobStatus} has a state of {@link JobState#CANCELED}. + * + * @param id Id. + * @param wasRunning Whether the job was running before it was canceled. + * @return Matcher. + */ + public static RestJobStatusMatcher canceled(UUID id, boolean wasRunning) { + return isJobStatus().withId(id) + .withState(JobState.CANCELED) + .withCreateTime(notNullValue(Instant.class)) + .withStartTime(wasRunning ? notNullValue(Instant.class) : AnythingMatcher.anything()) + .withFinishTime(notNullValue(Instant.class)); + } + + + public RestJobStatusMatcher withState(JobState state) { + return withState(equalTo(state)); + } + + public RestJobStatusMatcher withState(Matcher<JobState> stateMatcher) { + this.stateMatcher = stateMatcher; + return this; + } + + + public RestJobStatusMatcher withId(UUID id) { + return withId(equalTo(id)); + } + + public RestJobStatusMatcher withId(Matcher<UUID> idMatcher) { + this.idMatcher = idMatcher; + return this; + } + + public RestJobStatusMatcher withCreateTime(Instant createTime) { + return withCreateTime(equalTo(createTime)); + } + + public RestJobStatusMatcher withCreateTime(Matcher<Instant> createTimeMatcher) { + this.createTimeMatcher = createTimeMatcher; + return this; + } + + public RestJobStatusMatcher withStartTime(Instant startTime) { + return withStartTime(equalTo(startTime)); + } + + public RestJobStatusMatcher withStartTime(Matcher<Instant> startTimeMatcher) { + this.startTimeMatcher = startTimeMatcher; + return this; + } + + public RestJobStatusMatcher withFinishTime(Instant finishTime) { + return withFinishTime(equalTo(finishTime)); + } + + public RestJobStatusMatcher withFinishTime(Matcher<Instant> finishTimeMatcher) { + this.finishTimeMatcher = finishTimeMatcher; + return this; + } + + @Override + protected boolean matchesSafely(JobStatus status) { + return idMatcher.matches(status.id()) + && stateMatcher.matches(status.state()) + && createTimeMatcher.matches(status.createTime()) + && startTimeMatcher.matches(status.startTime()) + && finishTimeMatcher.matches(status.finishTime()); + } + + @Override + protected void describeMismatchSafely(JobStatus status, Description mismatchDescription) { + mismatchDescription.appendText("was a JobStatus with id ") + .appendValue(status.id()) + .appendText(", state ") + .appendValue(status.state()) + .appendText(", create time ") + .appendValue(status.createTime()) + .appendText(", start time ") + .appendValue(status.startTime()) + .appendText(" and finish time ") + .appendValue(status.finishTime()); + } + + @Override + public void describeTo(Description description) { + description.appendText("a JobStatus with id ") + .appendDescriptionOf(idMatcher) + .appendText(", state ") + .appendDescriptionOf(stateMatcher) + .appendText(", create time ") + .appendDescriptionOf(createTimeMatcher) + .appendText(", start time ") + .appendDescriptionOf(startTimeMatcher) + .appendText(" and finish time ") + .appendDescriptionOf(finishTimeMatcher); + } +} diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 683bc1bb74..d23bc0dce7 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -127,6 +127,7 @@ import org.apache.ignite.internal.rest.RestManager; import org.apache.ignite.internal.rest.RestManagerFactory; import org.apache.ignite.internal.rest.authentication.AuthenticationProviderFactory; import org.apache.ignite.internal.rest.cluster.ClusterManagementRestFactory; +import org.apache.ignite.internal.rest.compute.ComputeRestFactory; import org.apache.ignite.internal.rest.configuration.PresentationsFactory; import org.apache.ignite.internal.rest.configuration.RestConfiguration; import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory; @@ -767,6 +768,8 @@ public class IgniteImpl implements Ignite { Supplier<RestFactory> authProviderFactory = () -> new AuthenticationProviderFactory(authenticationManager); Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager); Supplier<RestFactory> restManagerFactory = () -> new RestManagerFactory(restManager); + Supplier<RestFactory> computeRestFactory = () -> new ComputeRestFactory(computeComponent); + RestConfiguration restConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY); return new RestComponent( @@ -776,7 +779,9 @@ public class IgniteImpl implements Ignite { nodeMetricRestFactory, deploymentCodeRestFactory, authProviderFactory, - restManagerFactory), + restManagerFactory, + computeRestFactory + ), restManager, restConfiguration );