This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 13aacdd24f IGNITE-21250 Client compute should pass all candidate nodes (#3104) 13aacdd24f is described below commit 13aacdd24fb8597fdf03655e1c3d765ed0d827ef Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Thu Feb 1 21:44:43 2024 +0300 IGNITE-21250 Client compute should pass all candidate nodes (#3104) --- .../compute/ClientComputeExecuteRequest.java | 39 ++++++++----- .../internal/client/compute/ClientCompute.java | 28 ++++++---- .../apache/ignite/client/fakes/FakeCompute.java | 10 ++++ modules/compute/build.gradle | 1 + .../compute/ItEmbeddedWorkerShutdownTest.java | 28 ++++++++++ .../compute/ItThinClientWorkerShutdownTest.java | 45 +++++++++++++++ .../internal/compute/ItWorkerShutdownTest.java | 11 ++-- .../ignite/internal/compute/ComputeComponent.java | 22 ++++++++ .../internal/compute/ComputeComponentImpl.java | 43 +++++++++++++++ .../internal/compute/ComputeJobFailover.java | 1 + .../ignite/internal/compute/ExecutionManager.java | 2 +- .../ignite/internal/compute/IgniteComputeImpl.java | 49 +++++++---------- .../internal/compute/IgniteComputeInternal.java | 27 +++++++++ .../internal/compute/ComputeComponentImplTest.java | 8 +++ .../internal/compute/IgniteComputeImplTest.java | 19 +++++-- .../cpp/ignite/client/compute/compute.cpp | 22 +------- .../ignite/client/detail/compute/compute_impl.cpp | 10 +++- .../ignite/client/detail/compute/compute_impl.h | 7 ++- .../dotnet/Apache.Ignite.Tests/FakeServer.cs | 15 ++++- .../Apache.Ignite/Internal/Compute/Compute.cs | 64 ++++++++++++++-------- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- 21 files changed, 339 insertions(+), 115 deletions(-) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java index 71f0d71c29..5b54080139 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java @@ -20,17 +20,19 @@ package org.apache.ignite.client.handler.requests.compute; import static org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest.packJobStatus; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.NotificationSender; import org.apache.ignite.compute.DeploymentUnit; -import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; /** @@ -50,29 +52,40 @@ public class ClientComputeExecuteRequest { public static CompletableFuture<Void> process( ClientMessageUnpacker in, ClientMessagePacker out, - IgniteCompute compute, + IgniteComputeInternal compute, ClusterService cluster, - NotificationSender notificationSender) { - var nodeName = in.tryUnpackNil() ? null : in.unpackString(); - - var node = nodeName == null - ? cluster.topologyService().localMember() - : cluster.topologyService().getByConsistentId(nodeName); - - if (node == null) { - throw new IgniteException("Specified node is not present in the cluster: " + nodeName); - } + NotificationSender notificationSender + ) { + Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster); List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in); String jobClassName = in.unpackString(); JobExecutionOptions options = JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build(); Object[] args = unpackArgs(in); - JobExecution<Object> execution = compute.executeAsync(Set.of(node), deploymentUnits, jobClassName, options, args); + JobExecution<Object> execution = compute.executeAsyncWithFailover(candidates, deploymentUnits, jobClassName, options, args); sendResultAndStatus(execution, notificationSender); return execution.idAsync().thenAccept(out::packUuid); } + private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker in, ClusterService cluster) { + int size = in.unpackInt(); + Set<ClusterNode> nodes = new HashSet<>(size); + + for (int i = 0; i < size; i++) { + String nodeName = in.unpackString(); + ClusterNode node = cluster.topologyService().getByConsistentId(nodeName); + + if (node == null) { + throw new IgniteException("Specified node is not present in the cluster: " + nodeName); + } + + nodes.add(node); + } + + return nodes; + } + static void sendResultAndStatus(JobExecution<Object> execution, NotificationSender notificationSender) { execution.resultAsync().whenComplete((val, err) -> execution.statusAsync().whenComplete((status, errStatus) -> diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java index 0b0341f298..bf621dbab0 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java @@ -100,9 +100,7 @@ public class ClientCompute implements IgniteCompute { throw new IllegalArgumentException("nodes must not be empty."); } - ClusterNode node = randomNode(nodes); - - return new ClientJobExecution<>(ch, executeOnOneNode(node, units, jobClassName, options, args)); + return new ClientJobExecution<>(ch, executeOnNodesAsync(nodes, units, jobClassName, options, args)); } /** {@inheritDoc} */ @@ -252,7 +250,9 @@ public class ClientCompute implements IgniteCompute { Map<ClusterNode, JobExecution<R>> map = new HashMap<>(nodes.size()); for (ClusterNode node : nodes) { - ClientJobExecution<R> execution = new ClientJobExecution<>(ch, executeOnOneNode(node, units, jobClassName, options, args)); + JobExecution<R> execution = new ClientJobExecution<>(ch, executeOnNodesAsync( + Set.of(node), units, jobClassName, options, args + )); if (map.put(node, execution) != null) { throw new IllegalStateException("Node can't be specified more than once: " + node); } @@ -261,22 +261,19 @@ public class ClientCompute implements IgniteCompute { return map; } - private CompletableFuture<PayloadInputChannel> executeOnOneNode( - ClusterNode node, + private CompletableFuture<PayloadInputChannel> executeOnNodesAsync( + Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, JobExecutionOptions options, Object[] args ) { + ClusterNode node = randomNode(nodes); + return ch.serviceAsync( ClientOp.COMPUTE_EXECUTE, w -> { - if (w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) { - w.out().packNil(); - } else { - w.out().packString(node.name()); - } - + packNodeNames(w.out(), nodes); packJob(w.out(), units, jobClassName, options, args); }, ch -> ch, @@ -408,6 +405,13 @@ public class ClientCompute implements IgniteCompute { return completedFuture(res); } + private static void packNodeNames(ClientMessagePacker w, Set<ClusterNode> nodes) { + w.packInt(nodes.size()); + for (ClusterNode node : nodes) { + w.packString(node.name()); + } + } + private static void packJob(ClientMessagePacker w, List<DeploymentUnit> units, String jobClassName, diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java index c3e69d688c..067d9dbc62 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java @@ -75,6 +75,16 @@ public class FakeCompute implements IgniteComputeInternal { String jobClassName, JobExecutionOptions options, Object... args) { + return executeAsyncWithFailover(nodes, units, jobClassName, options, args); + } + + @Override + public <R> JobExecution<R> executeAsyncWithFailover( + Set<ClusterNode> nodes, + List<DeploymentUnit> units, + String jobClassName, + JobExecutionOptions options, + Object... args) { if (Objects.equals(jobClassName, GET_UNITS)) { String unitString = units.stream().map(DeploymentUnit::render).collect(Collectors.joining(",")); return completedExecution((R) unitString); diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle index b7a26d3821..5e3fc9041c 100644 --- a/modules/compute/build.gradle +++ b/modules/compute/build.gradle @@ -54,6 +54,7 @@ dependencies { integrationTestImplementation project(':ignite-cluster-management') integrationTestImplementation project(':ignite-table') integrationTestImplementation project(':ignite-placement-driver-api') + integrationTestImplementation project(':ignite-client') integrationTestImplementation testFixtures(project(':ignite-core')) integrationTestImplementation testFixtures(project(':ignite-runner')) } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java new file mode 100644 index 0000000000..6d44bb4e3c --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.internal.app.IgniteImpl; + +class ItEmbeddedWorkerShutdownTest extends ItWorkerShutdownTest { + @Override + IgniteCompute compute(IgniteImpl entryNode) { + return entryNode.compute(); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java new file mode 100644 index 0000000000..401a298220 --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.internal.app.IgniteImpl; +import org.junit.jupiter.api.AfterEach; + +class ItThinClientWorkerShutdownTest extends ItWorkerShutdownTest { + private final Map<String, IgniteClient> clients = new HashMap<>(); + + @AfterEach + void cleanup() throws Exception { + for (IgniteClient igniteClient : clients.values()) { + igniteClient.close(); + } + clients.clear(); + } + + @Override + IgniteCompute compute(IgniteImpl entryNode) { + String address = "127.0.0.1:" + entryNode.clientAddress().port(); + IgniteClient client = IgniteClient.builder().addresses(address).build(); + clients.put(address, client); + return client.compute(); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java index c76b6c0870..9befd4dbb7 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; @@ -55,7 +56,7 @@ import org.junit.jupiter.api.Test; * another node. This is not true for broadcast and local jobs. They should not be restarted. */ @SuppressWarnings("resource") -public class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { +public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { /** * Map from node name to node index in {@link super#cluster}. */ @@ -208,7 +209,7 @@ public class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { InteractiveJobs.initChannels(allNodeNames()); // When start broadcast job. - Map<ClusterNode, JobExecution<Object>> executions = entryNode.compute().broadcastAsync( + Map<ClusterNode, JobExecution<Object>> executions = compute(entryNode).broadcastAsync( clusterNodesByNames(workerCandidates(node(0), node(1), node(2))), List.of(), InteractiveJobs.interactiveJobName() @@ -286,7 +287,7 @@ public class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { // When start colocated job on node that is not primary replica. IgniteImpl entryNode = anyNodeExcept(primaryReplica); - TestingJobExecution<Object> execution = new TestingJobExecution<>(entryNode.compute().executeColocatedAsync( + TestingJobExecution<Object> execution = new TestingJobExecution<>(compute(entryNode).executeColocatedAsync( TABLE_NAME, Tuple.create(1).set("K", 1), List.of(), @@ -360,10 +361,12 @@ public class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { private TestingJobExecution<String> executeGlobalInteractiveJob(IgniteImpl entryNode, Set<String> nodes) { return new TestingJobExecution<>( - entryNode.compute().executeAsync(clusterNodesByNames(nodes), List.of(), InteractiveJobs.globalJob().name()) + compute(entryNode).executeAsync(clusterNodesByNames(nodes), List.of(), InteractiveJobs.globalJob().name()) ); } + abstract IgniteCompute compute(IgniteImpl entryNode); + private void createReplicatedTestTableWithOneRow() { // Number of replicas == number of nodes and number of partitions == 1. This gives us the majority on primary replica stop. // After the primary replica is stopped we still be able to select new primary replica selected. diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java index 679000389e..3f88353d1f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java @@ -104,6 +104,28 @@ public interface ComputeComponent extends IgniteComponent { return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units, jobClassName, args); } + /** + * Executes a job of the given class on a remote node. If the node leaves the cluster, it will be restarted on the node given by the + * {@code nextWorkerSelector}. + * + * @param remoteNode Name of the job class. + * @param nextWorkerSelector The selector that returns the next worker to execute job on. + * @param options Job execution options. + * @param units Deployment units which will be loaded for execution. + * @param jobClassName Name of the job class. + * @param args Job args. + * @param <R> Job result type. + * @return Future execution result. + */ + <R> JobExecution<R> executeRemotelyWithFailover( + ClusterNode remoteNode, + NextWorkerSelector nextWorkerSelector, + List<DeploymentUnit> units, + String jobClassName, + ExecutionOptions options, + Object... args + ); + /** * Retrieves the current status of all jobs on all nodes in the cluster. * diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index 19762aa0fc..df5ff79840 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -26,10 +26,14 @@ import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.compute.executor.ComputeExecutor; import org.apache.ignite.internal.compute.executor.JobExecutionInternal; @@ -40,7 +44,11 @@ import org.apache.ignite.internal.compute.messaging.RemoteJobExecution; import org.apache.ignite.internal.future.InFlightFutures; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.TopologyService; @@ -50,6 +58,8 @@ import org.jetbrains.annotations.Nullable; * Implementation of {@link ComputeComponent}. */ public class ComputeComponentImpl implements ComputeComponent { + private static final IgniteLogger LOG = Loggers.forClass(ComputeComponentImpl.class); + /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -58,6 +68,10 @@ public class ComputeComponentImpl implements ComputeComponent { private final InFlightFutures inFlightFutures = new InFlightFutures(); + private final TopologyService topologyService; + + private final LogicalTopologyService logicalTopologyService; + private final JobContextManager jobContextManager; private final ComputeExecutor executor; @@ -66,20 +80,29 @@ public class ComputeComponentImpl implements ComputeComponent { private final ExecutionManager executionManager; + private final ExecutorService failoverExecutor; + /** * Creates a new instance. */ public ComputeComponentImpl( + String nodeName, MessagingService messagingService, TopologyService topologyService, + LogicalTopologyService logicalTopologyService, JobContextManager jobContextManager, ComputeExecutor executor, ComputeConfiguration computeConfiguration ) { + this.topologyService = topologyService; + this.logicalTopologyService = logicalTopologyService; this.jobContextManager = jobContextManager; this.executor = executor; executionManager = new ExecutionManager(computeConfiguration, topologyService); messaging = new ComputeMessaging(executionManager, messagingService, topologyService); + failoverExecutor = Executors.newSingleThreadExecutor( + NamedThreadFactory.create(nodeName, "compute-job-failover", LOG) + ); } /** {@inheritDoc} */ @@ -146,6 +169,25 @@ public class ComputeComponentImpl implements ComputeComponent { } } + @Override + public <R> JobExecution<R> executeRemotelyWithFailover( + ClusterNode remoteNode, + NextWorkerSelector nextWorkerSelector, + List<DeploymentUnit> units, + String jobClassName, + ExecutionOptions options, + Object... args + ) { + JobExecution<R> result = new ComputeJobFailover<R>( + this, logicalTopologyService, topologyService, + remoteNode, nextWorkerSelector, failoverExecutor, units, + jobClassName, options, args + ).failSafeExecute(); + + result.idAsync().thenAccept(jobId -> executionManager.addExecution(jobId, result)); + return result; + } + @Override public CompletableFuture<Collection<JobStatus>> statusesAsync() { return messaging.broadcastStatusesAsync(); @@ -189,6 +231,7 @@ public class ComputeComponentImpl implements ComputeComponent { executionManager.stop(); messaging.stop(); executor.stop(); + IgniteUtils.shutdownAndAwaitTermination(failoverExecutor, 10, TimeUnit.SECONDS); } private <R> JobExecutionInternal<R> exec(JobContext context, ExecutionOptions options, String jobClassName, Object[] args) { diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java index 5a3f96d3ba..ad01f19dd4 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java @@ -91,6 +91,7 @@ class ComputeJobFailover<T> { * @param logicalTopologyService logical topology service. * @param topologyService physical topology service. * @param workerNode the node to execute the job on. + * @param nextWorkerSelector the selector that returns the next worker to execute job on. * @param executor the thread pool where the failover should run on. * @param units deployment units. * @param jobClassName the name of the job class. diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java index 064f9fa3ac..8d361aaddc 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java @@ -102,7 +102,7 @@ public class ExecutionManager { */ public CompletableFuture<Set<JobStatus>> localStatusesAsync() { CompletableFuture<JobStatus>[] statuses = executions.values().stream() - .filter(it -> !(it instanceof RemoteJobExecution)) + .filter(it -> !(it instanceof RemoteJobExecution) && !(it instanceof FailSafeJobExecution)) .map(JobExecution::statusAsync) .toArray(CompletableFuture[]::new); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 5fdeb55292..cd6ff37165 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -33,8 +33,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.ignite.compute.ComputeException; @@ -43,15 +41,11 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.ErrorGroups.Compute; @@ -67,8 +61,6 @@ import org.jetbrains.annotations.Nullable; * Implementation of {@link IgniteCompute}. */ public class IgniteComputeImpl implements IgniteComputeInternal { - private static final IgniteLogger LOG = Loggers.forClass(IgniteComputeImpl.class); - private static final String DEFAULT_SCHEMA_NAME = "PUBLIC"; private final TopologyService topologyService; @@ -79,39 +71,32 @@ public class IgniteComputeImpl implements IgniteComputeInternal { private final ThreadLocalRandom random = ThreadLocalRandom.current(); - private final LogicalTopologyService logicalTopologyService; - private final PlacementDriver placementDriver; private final HybridClock clock; - private final Executor failoverExecutor; - /** * Create new instance. */ public IgniteComputeImpl(PlacementDriver placementDriver, TopologyService topologyService, - LogicalTopologyService logicalTopologyService, IgniteTablesInternal tables, ComputeComponent computeComponent, + IgniteTablesInternal tables, ComputeComponent computeComponent, HybridClock clock) { this.placementDriver = placementDriver; this.topologyService = topologyService; this.tables = tables; this.computeComponent = computeComponent; - this.logicalTopologyService = logicalTopologyService; this.clock = clock; - this.failoverExecutor = Executors.newFixedThreadPool( - 1, - new NamedThreadFactory("compute-job-failover", LOG) - ); } /** {@inheritDoc} */ @Override - public <R> JobExecution<R> executeAsync(Set<ClusterNode> nodes, + public <R> JobExecution<R> executeAsync( + Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, JobExecutionOptions options, - Object... args) { + Object... args + ) { Objects.requireNonNull(nodes); Objects.requireNonNull(units); Objects.requireNonNull(jobClassName); @@ -121,9 +106,21 @@ public class IgniteComputeImpl implements IgniteComputeInternal { throw new IllegalArgumentException("nodes must not be empty."); } + return executeAsyncWithFailover(nodes, units, jobClassName, options, args); + } + + @Override + public <R> JobExecution<R> executeAsyncWithFailover( + Set<ClusterNode> nodes, + List<DeploymentUnit> units, + String jobClassName, + JobExecutionOptions options, + Object... args + ) { Set<ClusterNode> candidates = new HashSet<>(nodes); ClusterNode targetNode = randomNode(candidates); candidates.remove(targetNode); + NextWorkerSelector selector = new DeqNextWorkerSelector(new ConcurrentLinkedDeque<>(candidates)); return new JobExecutionWrapper<>( @@ -169,18 +166,14 @@ public class IgniteComputeImpl implements IgniteComputeInternal { NextWorkerSelector nextWorkerSelector, List<DeploymentUnit> units, String jobClassName, - JobExecutionOptions options, + JobExecutionOptions jobExecutionOptions, Object[] args ) { - ExecutionOptions executionOptions = ExecutionOptions.from(options); + ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions); if (isLocal(targetNode)) { - return computeComponent.executeLocally(executionOptions, units, jobClassName, args); + return computeComponent.executeLocally(options, units, jobClassName, args); } else { - return new ComputeJobFailover<R>( - computeComponent, logicalTopologyService, topologyService, - targetNode, nextWorkerSelector, failoverExecutor, units, - jobClassName, executionOptions, args - ).failSafeExecute(); + return computeComponent.executeRemotelyWithFailover(targetNode, nextWorkerSelector, units, jobClassName, options, args); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java index 417e07c236..32c7517afc 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java @@ -17,16 +17,43 @@ package org.apache.ignite.internal.compute; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; /** * Internal compute facade. */ public interface IgniteComputeInternal extends IgniteCompute { + /** + * Executes a {@link ComputeJob} of the given class on a single node. If the node leaves the cluster, it will be restarted on one of the + * candidate nodes. + * + * @param <R> Job result type. + * @param nodes Candidate nodes; In case target node left the cluster, the job will be restarted on one of them. + * @param units Deployment units. Can be empty. + * @param jobClassName Name of the job class to execute. + * @param options Job execution options. + * @param args Arguments of the job. + * @return CompletableFuture Job result. + */ + <R> JobExecution<R> executeAsyncWithFailover( + Set<ClusterNode> nodes, + List<DeploymentUnit> units, + String jobClassName, + JobExecutionOptions options, + Object... args + ); + /** * Gets job status by id. * diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java index efe77210d8..08414159e2 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java @@ -73,6 +73,7 @@ import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; import org.apache.ignite.compute.version.Version; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.compute.executor.ComputeExecutor; import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl; @@ -131,6 +132,9 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { @Mock(answer = RETURNS_DEEP_STUBS) private TopologyService topologyService; + @Mock + private LogicalTopologyService logicalTopologyService; + @InjectConfiguration private ComputeConfiguration computeConfiguration; @@ -201,8 +205,10 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { computeExecutor = new ComputeExecutorImpl(ignite, stateMachine, computeConfiguration); computeComponent = new ComputeComponentImpl( + INSTANCE_NAME, messagingService, topologyService, + logicalTopologyService, jobContextManager, computeExecutor, computeConfiguration @@ -703,8 +709,10 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { willCompleteSuccessfully()); computeComponent = new ComputeComponentImpl( + INSTANCE_NAME, messagingService, topologyService, + logicalTopologyService, jobContextManager, computeExecutor, computeConfiguration diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java index eea0394208..67673b91b9 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java @@ -23,8 +23,10 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -119,7 +121,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { willBe("remoteResponse") ); - verify(computeComponent).executeRemotely(ExecutionOptions.DEFAULT, remoteNode, testDeploymentUnits, JOB_CLASS_NAME, "a", 42); + verifyExecuteRemotelyWithFailover(ExecutionOptions.DEFAULT); } @Test @@ -148,7 +150,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { willBe("remoteResponse") ); - verify(computeComponent).executeRemotely(expectedOptions, remoteNode, testDeploymentUnits, JOB_CLASS_NAME, "a", 42); + verifyExecuteRemotelyWithFailover(expectedOptions); } @Test @@ -200,9 +202,16 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { .thenReturn(completedExecution("jobResponse")); } - private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions executionOptions) { - when(computeComponent.executeRemotely(executionOptions, remoteNode, testDeploymentUnits, JOB_CLASS_NAME, "a", 42)) - .thenReturn(completedExecution("remoteResponse")); + private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions options) { + when(computeComponent.executeRemotelyWithFailover( + eq(remoteNode), any(), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(options), aryEq(new Object[]{"a", 42}) + )).thenReturn(completedExecution("remoteResponse")); + } + + private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) { + verify(computeComponent).executeRemotelyWithFailover( + eq(remoteNode), any(), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq(options), aryEq(new Object[]{"a", 42}) + ); } private static <R> JobExecution<R> completedExecution(R result) { diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp b/modules/platforms/cpp/ignite/client/compute/compute.cpp index 2887dffa91..e7c0a18900 100644 --- a/modules/platforms/cpp/ignite/client/compute/compute.cpp +++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp @@ -19,32 +19,15 @@ #include "ignite/client/detail/argument_check_utils.h" #include "ignite/client/detail/compute/compute_impl.h" -#include <random> - namespace ignite { -template<typename T> -typename T::value_type get_random_element(const T &cont) { - static std::mutex randomMutex; - static std::random_device rd; - static std::mt19937 gen(rd()); - - assert(!cont.empty()); - - std::uniform_int_distribution<size_t> distrib(0, cont.size() - 1); - - std::lock_guard<std::mutex> lock(randomMutex); - - return cont[distrib(gen)]; -} - void compute::execute_async(const std::vector<cluster_node> &nodes, const std::vector<deployment_unit> &units, std::string_view job_class_name, const std::vector<primitive> &args, ignite_callback<std::optional<primitive>> callback) { detail::arg_check::container_non_empty(nodes, "Nodes container"); detail::arg_check::container_non_empty(job_class_name, "Job class name"); - m_impl->execute_on_one_node(get_random_element(nodes), units, job_class_name, args, std::move(callback)); + m_impl->execute_on_nodes(nodes, units, job_class_name, args, std::move(callback)); } void compute::broadcast_async(const std::set<cluster_node> &nodes, const std::vector<deployment_unit> &units, @@ -69,7 +52,8 @@ void compute::broadcast_async(const std::set<cluster_node> &nodes, const std::ve auto shared_res = std::make_shared<result_group>(std::int32_t(nodes.size()), std::move(callback)); for (const auto &node : nodes) { - m_impl->execute_on_one_node(node, units, job_class_name, args, [node, shared_res](auto &&res) { + std::vector<cluster_node> candidates = { node }; + m_impl->execute_on_nodes(candidates, units, job_class_name, args, [node, shared_res](auto &&res) { auto &val = *shared_res; std::lock_guard<std::mutex> lock(val.m_mutex); diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp index f30a623a78..e2233849e5 100644 --- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp +++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp @@ -83,12 +83,16 @@ void write_units(protocol::writer &writer, const std::vector<deployment_unit> &u } } -void compute_impl::execute_on_one_node(cluster_node node, const std::vector<deployment_unit> &units, +void compute_impl::execute_on_nodes(const std::vector<cluster_node> &nodes, const std::vector<deployment_unit> &units, std::string_view job_class_name, const std::vector<primitive> &args, ignite_callback<std::optional<primitive>> callback) { - auto writer_func = [&node, job_class_name, &units, args](protocol::writer &writer) { - writer.write(node.get_name()); + auto writer_func = [&nodes, job_class_name, &units, args](protocol::writer &writer) { + auto nodes_num = std::int32_t(nodes.size()); + writer.write(nodes_num); + for (const auto &node : nodes) { + writer.write(node.get_name()); + } write_units(writer, units); writer.write(job_class_name); diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h index e2f701e73d..65c4b8a75b 100644 --- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h +++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h @@ -47,15 +47,16 @@ public: , m_tables(std::move(tables)) {} /** - * Executes a compute job represented by the given class on the specified node asynchronously. + * Executes a compute job represented by the given class on one of the specified node asynchronously. If the node leaves the cluster, + * it will be restarted on one of the candidate nodes. * - * @param node Node to use for the job execution. + * @param nodes Candidate node to use for the job execution. * @param units Deployment units. Can be empty. * @param job_class_name Java class name of the job to execute. * @param args Job arguments. * @param callback A callback called on operation completion with job execution result. */ - void execute_on_one_node(cluster_node node, const std::vector<deployment_unit> &units, + void execute_on_nodes(const std::vector<cluster_node> &nodes, const std::vector<deployment_unit> &units, std::string_view job_class_name, const std::vector<primitive> &args, ignite_callback<std::optional<primitive>> callback); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs index 7a2ee27cf4..493a6ab897 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs @@ -616,8 +616,19 @@ namespace Apache.Ignite.Tests private PooledArrayBuffer ComputeExecute(MsgPackReader reader, bool colocated = false) { // Colocated: table id, schema version, key. - // Else: node name. - reader.Skip(colocated ? 4 : 1); + // Else: node names. + if (colocated) + { + reader.Skip(4); + } + else + { + var namesCount = reader.ReadInt32(); + for (int i = 0; i < namesCount; i++) + { + reader.ReadString(); + } + } var unitsCount = reader.TryReadNil() ? 0 : reader.ReadInt32(); var units = new List<DeploymentUnit>(unitsCount); diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs index 6292e78f69..5d938590d0 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs @@ -69,7 +69,10 @@ namespace Apache.Ignite.Internal.Compute IgniteArgumentCheck.NotNull(nodes); IgniteArgumentCheck.NotNull(jobClassName); - return await ExecuteOnOneNode<T>(GetRandomNode(nodes), units, jobClassName, args).ConfigureAwait(false); + var nodesCol = GetNodesCollection(nodes); + IgniteArgumentCheck.Ensure(nodesCol.Count > 0, nameof(nodes), "Nodes can't be empty."); + + return await ExecuteOnNodes<T>(nodesCol, units, jobClassName, args).ConfigureAwait(false); } /// <inheritdoc/> @@ -121,7 +124,7 @@ namespace Apache.Ignite.Internal.Compute foreach (var node in nodes) { - var task = ExecuteOnOneNode<T>(node, units0, jobClassName, args); + var task = ExecuteOnNodes<T>(new[] { node }, units0, jobClassName, args); res[node] = task; } @@ -133,34 +136,26 @@ namespace Apache.Ignite.Internal.Compute public override string ToString() => IgniteToStringBuilder.Build(GetType()); [SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "Secure random is not required here.")] - private static IClusterNode GetRandomNode(IEnumerable<IClusterNode> nodes) + private static IClusterNode GetRandomNode(ICollection<IClusterNode> nodes) { - var nodesCol = GetNodesCollection(nodes); - - IgniteArgumentCheck.Ensure(nodesCol.Count > 0, nameof(nodes), "Nodes can't be empty."); - - var idx = Random.Shared.Next(0, nodesCol.Count); + var idx = Random.Shared.Next(0, nodes.Count); - return nodesCol.ElementAt(idx); + return nodes.ElementAt(idx); } private static ICollection<IClusterNode> GetNodesCollection(IEnumerable<IClusterNode> nodes) => nodes as ICollection<IClusterNode> ?? nodes.ToList(); - private static void WriteUnits(IEnumerable<DeploymentUnit> units, PooledArrayBuffer buf) + private static void WriteEnumerable<T>(IEnumerable<T> items, PooledArrayBuffer buf, Action<T> writerFunc) { var w = buf.MessageWriter; - if (units.TryGetNonEnumeratedCount(out var count)) + if (items.TryGetNonEnumeratedCount(out var count)) { w.Write(count); - foreach (var unit in units) + foreach (var item in items) { - IgniteArgumentCheck.NotNullOrEmpty(unit.Name); - IgniteArgumentCheck.NotNullOrEmpty(unit.Version); - - w.Write(unit.Name); - w.Write(unit.Version); + writerFunc(item); } return; @@ -171,24 +166,45 @@ namespace Apache.Ignite.Internal.Compute var countSpan = buf.GetSpan(5); buf.Advance(5); - foreach (var unit in units) + foreach (var item in items) { count++; - w.Write(unit.Name); - w.Write(unit.Version); + writerFunc(item); } countSpan[0] = MsgPackCode.Array32; BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count); } - private async Task<T> ExecuteOnOneNode<T>( - IClusterNode node, + private static void WriteUnits(IEnumerable<DeploymentUnit> units, PooledArrayBuffer buf) + { + WriteEnumerable(units, buf, writerFunc: unit => + { + IgniteArgumentCheck.NotNullOrEmpty(unit.Name); + IgniteArgumentCheck.NotNullOrEmpty(unit.Version); + + var w = buf.MessageWriter; + w.Write(unit.Name); + w.Write(unit.Version); + }); + } + + private static void WriteNodeNames(IEnumerable<IClusterNode> nodes, PooledArrayBuffer buf) + { + WriteEnumerable(nodes, buf, writerFunc: node => + { + var w = buf.MessageWriter; + w.Write(node.Name); + }); + } + + private async Task<T> ExecuteOnNodes<T>( + ICollection<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, object?[]? args) { - IgniteArgumentCheck.NotNull(node); + IClusterNode node = GetRandomNode(nodes); using var writer = ProtoCommon.GetMessageWriter(); Write(); @@ -205,7 +221,7 @@ namespace Apache.Ignite.Internal.Compute { var w = writer.MessageWriter; - w.Write(node.Name); + WriteNodeNames(nodes, writer); WriteUnits(units, writer); w.Write(jobClassName); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index d23bc0dce7..ed2f4efe7c 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -703,8 +703,10 @@ public class IgniteImpl implements Ignite { ComputeConfiguration computeCfg = nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY); InMemoryComputeStateMachine stateMachine = new InMemoryComputeStateMachine(computeCfg, name); computeComponent = new ComputeComponentImpl( + name, clusterSvc.messagingService(), clusterSvc.topologyService(), + logicalTopologyService, new JobContextManager(deploymentManagerImpl, deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()), new ComputeExecutorImpl(this, stateMachine, computeCfg), computeCfg @@ -713,7 +715,6 @@ public class IgniteImpl implements Ignite { compute = new IgniteComputeImpl( placementDriverMgr.placementDriver(), clusterSvc.topologyService(), - logicalTopologyService, distributedTblMgr, computeComponent, clock