This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 3d83c35107 IGNITE-22435 Add JobTarget interface (#3950) 3d83c35107 is described below commit 3d83c35107a3d7be882c7d0be2f0d674d8c64482 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Jun 20 13:00:47 2024 +0300 IGNITE-22435 Add JobTarget interface (#3950) Reduce the number of overloads in IgniteCompute and make the API more readable with the new JobTarget interface. --------- Co-authored-by: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> Co-authored-by: korlov42 <kor...@gridgain.com> --- .../apache/ignite/compute/AnyNodeJobTarget.java | 43 +++++++ .../apache/ignite/compute/ColocatedJobTarget.java | 59 +++++++++ .../org/apache/ignite/compute/IgniteCompute.java | 136 ++------------------- .../java/org/apache/ignite/compute/JobTarget.java | 109 +++++++++++++++++ .../apache/ignite/compute/task/MapReduceJob.java | 3 +- .../apache/ignite/client/ClientOperationType.java | 4 +- .../internal/client/compute/ClientCompute.java | 112 +++++++---------- .../apache/ignite/client/AbstractClientTest.java | 9 +- .../apache/ignite/client/ClientComputeTest.java | 24 ++-- .../ignite/client/PartitionAwarenessTest.java | 9 +- .../java/org/apache/ignite/client/TestServer.java | 4 +- .../apache/ignite/client/fakes/FakeCompute.java | 63 +++------- .../ignite/internal/compute/ItComputeBaseTest.java | 64 +++++----- .../internal/compute/ItComputeErrorsBaseTest.java | 8 +- .../internal/compute/ItComputeTestEmbedded.java | 36 +++--- .../internal/compute/ItComputeTestStandalone.java | 14 +-- .../internal/compute/ItExecutionsCleanerTest.java | 4 +- .../compute/ItFailoverCandidateNotFoundTest.java | 4 +- .../internal/compute/ItWorkerShutdownTest.java | 10 +- .../threading/ItComputeApiThreadingTest.java | 28 +++-- .../internal/compute/AntiHijackIgniteCompute.java | 53 +------- .../ignite/internal/compute/IgniteComputeImpl.java | 124 ++++++++----------- .../internal/compute/IgniteComputeImplTest.java | 21 ++-- .../rest/compute/ItComputeControllerTest.java | 3 +- .../runner/app/client/ItThinClientComputeTest.java | 110 +++++++++-------- .../client/ItThinClientPartitionAwarenessTest.java | 5 +- 26 files changed, 526 insertions(+), 533 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/AnyNodeJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/AnyNodeJobTarget.java new file mode 100644 index 0000000000..2075c0a0f5 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/AnyNodeJobTarget.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Objects; +import java.util.Set; +import org.apache.ignite.network.ClusterNode; + +/** + * Any node execution target. Indicates any node from the provided set. + */ +public class AnyNodeJobTarget implements JobTarget { + private final Set<ClusterNode> nodes; + + AnyNodeJobTarget(Set<ClusterNode> nodes) { + Objects.requireNonNull(nodes); + + if (nodes.isEmpty()) { + throw new IllegalArgumentException("Nodes collection must not be empty."); + } + + this.nodes = nodes; + } + + public Set<ClusterNode> nodes() { + return nodes; + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/ColocatedJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/ColocatedJobTarget.java new file mode 100644 index 0000000000..669452b99b --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/ColocatedJobTarget.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Objects; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.mapper.Mapper; +import org.jetbrains.annotations.Nullable; + +/** + * Colocated job execution target. Indicates a node that hosts the data for the specified key in the provided table. + */ +public class ColocatedJobTarget implements JobTarget { + private final String tableName; + + private final Object key; + + private final @Nullable Mapper<?> keyMapper; + + ColocatedJobTarget(String tableName, Object key, @Nullable Mapper<?> keyMapper) { + Objects.requireNonNull(tableName); + Objects.requireNonNull(key); + + if (keyMapper == null && !(key instanceof Tuple)) { + throw new IllegalArgumentException("Key must be an instance of Tuple when keyMapper is not provided."); + } + + this.tableName = tableName; + this.key = key; + this.keyMapper = keyMapper; + } + + public String tableName() { + return tableName; + } + + public Object key() { + return key; + } + + public @Nullable Mapper<?> keyMapper() { + return keyMapper; + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index c0fb02869d..9db34bbe1b 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -30,8 +30,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.table.Tuple; -import org.apache.ignite.table.mapper.Mapper; /** * Provides the ability to execute Compute jobs. @@ -44,13 +42,13 @@ public interface IgniteCompute { * Submits a {@link ComputeJob} of the given class for an execution on a single node from a set of candidate nodes. * * @param <R> Job result type. - * @param nodes Candidate nodes; the job will be executed on one of them. + * @param target Execution target. * @param descriptor Job descriptor. * @param args Arguments of the job. * @return Job execution object. */ <R> JobExecution<R> submit( - Set<ClusterNode> nodes, + JobTarget target, JobDescriptor descriptor, Object... args ); @@ -60,151 +58,35 @@ public interface IgniteCompute { * {@code submit(...).resultAsync()}. * * @param <R> Job result type. - * @param nodes Candidate nodes; the job will be executed on one of them. + * @param target Execution target. * @param descriptor Job descriptor. * @param args Arguments of the job. * @return Job result future. */ default <R> CompletableFuture<R> executeAsync( - Set<ClusterNode> nodes, + JobTarget target, JobDescriptor descriptor, Object... args ) { - return this.<R>submit(nodes, descriptor, args).resultAsync(); + return this.<R>submit(target, descriptor, args).resultAsync(); } /** * Executes a {@link ComputeJob} of the given class on a single node from a set of candidate nodes. * * @param <R> Job result type - * @param nodes Candidate nodes; the job will be executed on one of them. + * @param target Execution target. * @param descriptor Job descriptor. * @param args Arguments of the job. * @return Job result. * @throws ComputeException If there is any problem executing the job. */ <R> R execute( - Set<ClusterNode> nodes, - JobDescriptor descriptor, - Object... args - ); - - /** - * Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the - * corresponding RAFT group. - * - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @param <R> Job result type. - * @return Job execution object. - */ - <R> JobExecution<R> submitColocated( - String tableName, - Tuple key, + JobTarget target, JobDescriptor descriptor, Object... args ); - /** - * Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the - * corresponding RAFT group. - * - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param keyMapper Mapper used to map the key to a binary representation. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @param <R> Job result type. - * @return Job execution object. - */ - <K, R> JobExecution<R> submitColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ); - - /** - * Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the - * corresponding RAFT group. A shortcut for {@code submitColocated(...).resultAsync()}. - * - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @param <R> Job result type. - * @return Job result future. - */ - default <R> CompletableFuture<R> executeColocatedAsync( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return this.<R>submitColocated(tableName, key, descriptor, args).resultAsync(); - } - - /** - * Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the - * corresponding RAFT group. A shortcut for {@code submitColocated(...).resultAsync()}. - * - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param keyMapper Mapper used to map the key to a binary representation. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @param <R> Job result type. - * @return Job result future. - */ - default <K, R> CompletableFuture<R> executeColocatedAsync( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return this.<K, R>submitColocated(tableName, key, keyMapper, descriptor, args).resultAsync(); - } - - /** - * Executes a job of the given class on the node where the given key is located. The node is a leader of the corresponding RAFT group. - * - * @param <R> Job result type. - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @return Job result. - * @throws ComputeException If there is any problem executing the job. - */ - <R> R executeColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args); - - /** - * Executes a job of the given class on the node where the given key is located. The node is a leader of the corresponding RAFT group. - * - * @param <R> Job result type. - * @param tableName Name of the table whose key is used to determine the node to execute the job on. - * @param key Key that identifies the node to execute the job on. - * @param keyMapper Mapper used to map the key to a binary representation. - * @param descriptor Job descriptor. - * @param args Arguments of the job. - * @return Job result. - * @throws ComputeException If there is any problem executing the job. - */ - <K, R> R executeColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args); - /** * Submits a {@link ComputeJob} of the given class for an execution on all nodes in the given node set. * @@ -235,7 +117,7 @@ public interface IgniteCompute { Object... args ) { Map<ClusterNode, CompletableFuture<R>> futures = nodes.stream() - .collect(toMap(identity(), node -> this.executeAsync(Set.of(node), descriptor, args))); + .collect(toMap(identity(), node -> this.executeAsync(JobTarget.node(node), descriptor, args))); return allOf(futures.values().toArray(CompletableFuture[]::new)) .thenApply(ignored -> { @@ -268,7 +150,7 @@ public interface IgniteCompute { Map<ClusterNode, R> map = new HashMap<>(); for (ClusterNode node : nodes) { - map.put(node, execute(Set.of(node), descriptor, args)); + map.put(node, execute(JobTarget.node(node), descriptor, args)); } return map; diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java new file mode 100644 index 0000000000..958269430e --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.mapper.Mapper; + +/** + * Job execution target. + * + * <p>Determines the rules for selecting nodes to execute a job. + */ +public interface JobTarget { + /** + * Creates a job target for a specific node. + * + * <p>This target determines that a job should be executed on a given node. + * + * @param node Node. + * @return Job target. + */ + static JobTarget node(ClusterNode node) { + return new AnyNodeJobTarget(Set.of(node)); + } + + /** + * Creates a job target for any node from the provided collection. + * + * <p>This target determines that a job can be executed on any node in a given collection, but only one of them. + * Which node is chosen is implementation defined. + * + * @param nodes Collection of nodes. + * @return Job target. + */ + static JobTarget anyNode(ClusterNode... nodes) { + return new AnyNodeJobTarget(Set.of(nodes)); + } + + /** + * Creates a job target for any node from the provided collection. + * + * <p>This target determines that a job can be executed on any node in a given collection, but only one of them. + * Which node is chosen is implementation defined. + * + * @param nodes Collection of nodes. + * @return Job target. + */ + static JobTarget anyNode(Collection<ClusterNode> nodes) { + return new AnyNodeJobTarget(new HashSet<>(nodes)); + } + + /** + * Creates a job target for any node from the provided collection. + * + * <p>This target determines that a job can be executed on any node in a given collection, but only one of them. + * Which node is chosen is implementation defined. + * + * @param nodes Collection of nodes. + * @return Job target. + */ + static JobTarget anyNode(Set<ClusterNode> nodes) { + return new AnyNodeJobTarget(nodes); + } + + /** + * Creates a colocated job target for a specific table and key. + * + * <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table. + * + * @param tableName Table name. + * @param key Key. + * @return Job target. + */ + static JobTarget colocated(String tableName, Tuple key) { + return new ColocatedJobTarget(tableName, key, null); + } + + /** + * Creates a colocated job target for a specific table and key with mapper. + * + * <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table. + * + * @param tableName Table name. + * @param key Key. + * @return Job target. + */ + static <K> JobTarget colocated(String tableName, K key, Mapper<K> keyMapper) { + return new ColocatedJobTarget(tableName, key, keyMapper); + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java index c3df006c38..8b9e7a90b5 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java @@ -22,11 +22,12 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.network.ClusterNode; /** * A description of the job to be submitted as a result of the split step of the {@link MapReduceTask}. Reflects the parameters of the - * {@link org.apache.ignite.compute.IgniteCompute#submit(Set, JobDescriptor, Object...)} method. + * {@link org.apache.ignite.compute.IgniteCompute#submit(JobTarget, JobDescriptor, Object...)} method. */ public class MapReduceJob { private final Set<ClusterNode> nodes; diff --git a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java index 459d88303d..8c6106097e 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java +++ b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java @@ -19,8 +19,8 @@ package org.apache.ignite.client; import java.util.Collection; import java.util.List; -import java.util.Set; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.async.AsyncResultSet; @@ -130,7 +130,7 @@ public enum ClientOperationType { TUPLE_CONTAINS_KEY, /** - * Compute Execute ({@link org.apache.ignite.compute.IgniteCompute#submit(Set, JobDescriptor, Object...)}). + * Compute Execute ({@link org.apache.ignite.compute.IgniteCompute#submit(JobTarget, JobDescriptor, Object...)}). */ COMPUTE_EXECUTE, diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java index 130652d36a..629dacb61e 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java @@ -35,11 +35,14 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.ignite.compute.AnyNodeJobTarget; +import org.apache.ignite.compute.ColocatedJobTarget; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.client.ClientUtils; import org.apache.ignite.internal.client.PayloadInputChannel; @@ -87,57 +90,51 @@ public class ClientCompute implements IgniteCompute { } @Override - public <R> JobExecution<R> submit(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - Objects.requireNonNull(nodes); + public <R> JobExecution<R> submit(JobTarget target, JobDescriptor descriptor, Object... args) { + Objects.requireNonNull(target); Objects.requireNonNull(descriptor); - if (nodes.isEmpty()) { - throw new IllegalArgumentException("nodes must not be empty."); + if (target instanceof AnyNodeJobTarget) { + AnyNodeJobTarget anyNodeJobTarget = (AnyNodeJobTarget) target; + + return new ClientJobExecution<>(ch, executeOnAnyNodeAsync( + anyNodeJobTarget.nodes(), + descriptor.units(), + descriptor.jobClassName(), + descriptor.options(), + args)); } - return new ClientJobExecution<>( - ch, - executeOnNodesAsync(nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args)); - } + if (target instanceof ColocatedJobTarget) { + ColocatedJobTarget colocatedTarget = (ColocatedJobTarget) target; + var mapper = (Mapper<? super Object>) colocatedTarget.keyMapper(); + + if (mapper != null) { + return new ClientJobExecution<>(ch, doExecuteColocatedAsync( + colocatedTarget.tableName(), + colocatedTarget.key(), + mapper, + descriptor.units(), + descriptor.jobClassName(), + descriptor.options(), + args)); + } else { + return new ClientJobExecution<>(ch, doExecuteColocatedAsync( + colocatedTarget.tableName(), + (Tuple) colocatedTarget.key(), + descriptor.units(), + descriptor.jobClassName(), + descriptor.options(), + args)); + } + } - @Override - public <R> R execute(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return sync(this.executeAsync(nodes, descriptor, args)); + throw new IllegalArgumentException("Unsupported job target: " + target); } - /** {@inheritDoc} */ @Override - public <R> JobExecution<R> submitColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - Objects.requireNonNull(tableName); - Objects.requireNonNull(key); - Objects.requireNonNull(descriptor); - - return new ClientJobExecution<>( - ch, - doExecuteColocatedAsync(tableName, key, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args)); - } - - /** {@inheritDoc} */ - @Override - public <K, R> JobExecution<R> submitColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - Objects.requireNonNull(tableName); - Objects.requireNonNull(key); - Objects.requireNonNull(keyMapper); - Objects.requireNonNull(descriptor); - - return new ClientJobExecution<>(ch, doExecuteColocatedAsync( - tableName, key, keyMapper, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args)); + public <R> R execute(JobTarget target, JobDescriptor descriptor, Object... args) { + return sync(executeAsync(target, descriptor, args)); } private CompletableFuture<SubmitResult> doExecuteColocatedAsync( @@ -179,29 +176,6 @@ public class ClientCompute implements IgniteCompute { .thenCompose(Function.identity()); } - /** {@inheritDoc} */ - @Override - public <R> R executeColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return sync(this.executeColocatedAsync(tableName, key, descriptor, args)); - } - - /** {@inheritDoc} */ - @Override - public <K, R> R executeColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return sync(executeColocatedAsync(tableName, key, keyMapper, descriptor, args)); - } - /** {@inheritDoc} */ @Override public <R> Map<ClusterNode, JobExecution<R>> submitBroadcast( @@ -215,7 +189,7 @@ public class ClientCompute implements IgniteCompute { Map<ClusterNode, JobExecution<R>> map = new HashMap<>(nodes.size()); for (ClusterNode node : nodes) { - JobExecution<R> execution = new ClientJobExecution<>(ch, executeOnNodesAsync( + JobExecution<R> execution = new ClientJobExecution<>(ch, executeOnAnyNodeAsync( Set.of(node), descriptor.units(), descriptor.jobClassName(), descriptor.options(), args )); if (map.put(node, execution) != null) { @@ -253,7 +227,7 @@ public class ClientCompute implements IgniteCompute { ); } - private CompletableFuture<SubmitResult> executeOnNodesAsync( + private CompletableFuture<SubmitResult> executeOnAnyNodeAsync( Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, @@ -436,6 +410,7 @@ public class ClientCompute implements IgniteCompute { * @return Result of the job submission. */ private static SubmitResult unpackSubmitResult(PayloadInputChannel ch) { + //noinspection DataFlowIssue (reviewed) return new SubmitResult(ch.in().unpackUuid(), ch.notificationFuture()); } @@ -456,6 +431,7 @@ public class ClientCompute implements IgniteCompute { jobIds.add(ch.in().unpackUuid()); } + //noinspection DataFlowIssue (reviewed) return new SubmitTaskResult(jobId, jobIds, ch.notificationFuture()); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java index baa404d409..3c938a6b87 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java @@ -22,16 +22,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Arrays; -import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.client.fakes.FakeIgnite; import org.apache.ignite.client.fakes.FakeIgniteTables; import org.apache.ignite.client.fakes.FakeSchemaRegistry; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.client.ClientClusterNode; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.AfterAll; @@ -186,9 +185,9 @@ public abstract class AbstractClientTest extends BaseIgniteAbstractTest { * @param names Names. * @return Nodes. */ - public static Set<ClusterNode> getClusterNodes(String... names) { - return Arrays.stream(names) + public static JobTarget getClusterNodes(String... names) { + return JobTarget.anyNode(Arrays.stream(names) .map(s -> new ClientClusterNode("id", s, new NetworkAddress("127.0.0.1", 8080))) - .collect(Collectors.toSet()); + .collect(Collectors.toSet())); } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java index 3fa2517685..8f4f842eac 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java @@ -39,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -51,6 +50,7 @@ import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.client.table.ClientTable; @@ -58,7 +58,6 @@ import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.TableNotFoundException; -import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; import org.junit.jupiter.api.AfterEach; @@ -153,8 +152,8 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { try (var client = getClient(server2)) { JobDescriptor job = JobDescriptor.builder("job").build(); - String res1 = client.compute().executeColocated(TABLE_NAME, Tuple.create().set("key", "k"), job); - String res2 = client.compute().executeColocated(TABLE_NAME, 1L, Mapper.of(Long.class), job); + String res1 = client.compute().execute(JobTarget.colocated(TABLE_NAME, Tuple.create().set("key", "k")), job); + String res2 = client.compute().execute(JobTarget.colocated(TABLE_NAME, 1L, Mapper.of(Long.class)), job); assertEquals("s2", res1); assertEquals("s2", res2); @@ -168,8 +167,8 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { try (var client = getClient(server2)) { JobDescriptor job = JobDescriptor.builder("job").build(); - JobExecution<String> execution1 = client.compute().submitColocated(TABLE_NAME, Tuple.create().set("key", "k"), job); - JobExecution<String> execution2 = client.compute().submitColocated(TABLE_NAME, 1L, Mapper.of(Long.class), job); + JobExecution<String> execution1 = client.compute().submit(JobTarget.colocated(TABLE_NAME, Tuple.create().set("key", "k")), job); + JobExecution<String> execution2 = client.compute().submit(JobTarget.colocated(TABLE_NAME, 1L, Mapper.of(Long.class)), job); assertThat(execution1.resultAsync(), willBe("s2")); assertThat(execution2.resultAsync(), willBe("s2")); @@ -187,7 +186,8 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { Tuple key = Tuple.create().set("key", "k"); var ex = assertThrows(CompletionException.class, - () -> client.compute().executeColocatedAsync("bad-tbl", key, JobDescriptor.builder("job").build()).join()); + () -> client.compute().executeAsync( + JobTarget.colocated("bad-tbl", key), JobDescriptor.builder("job").build()).join()); var tblNotFoundEx = (TableNotFoundException) ex.getCause(); assertThat(tblNotFoundEx.getMessage(), containsString("The table does not exist [name=\"PUBLIC\".\"bad-tbl\"]")); @@ -206,7 +206,7 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { try (var client = getClient(server3)) { Tuple key = Tuple.create().set("key", "k"); - String res1 = client.compute().executeColocated(tableName, key, JobDescriptor.builder("job").build()); + String res1 = client.compute().execute(JobTarget.colocated(tableName, key), JobDescriptor.builder("job").build()); // Drop table and create a new one with a different ID. ((FakeIgniteTables) ignite.tables()).dropTable(tableName); @@ -219,7 +219,9 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { IgniteTestUtils.setFieldValue(table, "partitionAssignment", null); } - String res2 = client.compute().executeColocated(tableName, 1L, Mapper.of(Long.class), JobDescriptor.builder("job").build()); + String res2 = client.compute().execute( + JobTarget.colocated(tableName, 1L, Mapper.of(Long.class)), + JobDescriptor.builder("job").build()); assertEquals("s3", res1); assertEquals("s3", res2); @@ -296,8 +298,8 @@ public class ClientComputeTest extends BaseIgniteAbstractTest { FakeCompute.future = CompletableFuture.failedFuture(new RuntimeException("job failed")); IgniteCompute igniteCompute = client.compute(); - Set<ClusterNode> nodes = getClusterNodes("s1"); - JobExecution<String> execution = igniteCompute.submit(nodes, JobDescriptor.builder("job").build()); + var jobTarget = getClusterNodes("s1"); + JobExecution<String> execution = igniteCompute.submit(jobTarget, JobDescriptor.builder("job").build()); assertThat(execution.resultAsync(), willThrowFast(IgniteException.class)); assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); diff --git a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java index 984bd67db0..c556b9541c 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.client.fakes.FakeInternalTable; import org.apache.ignite.client.handler.FakePlacementDriver; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.client.ReliableChannel; import org.apache.ignite.internal.client.tx.ClientLazyTransaction; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -446,8 +447,8 @@ public class PartitionAwarenessTest extends AbstractClientTest { JobDescriptor job = JobDescriptor.builder("job").build(); - assertThat(compute().executeColocatedAsync(table.name(), t1, job), willBe(nodeKey1)); - assertThat(compute().executeColocatedAsync(table.name(), t2, job), willBe(nodeKey2)); + assertThat(compute().executeAsync(JobTarget.colocated(table.name(), t1), job), willBe(nodeKey1)); + assertThat(compute().executeAsync(JobTarget.colocated(table.name(), t2), job), willBe(nodeKey2)); } @Test @@ -456,8 +457,8 @@ public class PartitionAwarenessTest extends AbstractClientTest { Table table = defaultTable(); JobDescriptor job = JobDescriptor.builder("job").build(); - assertThat(compute().executeColocatedAsync(table.name(), 1L, mapper, job), willBe(nodeKey1)); - assertThat(compute().executeColocatedAsync(table.name(), 2L, mapper, job), willBe(nodeKey2)); + assertThat(compute().executeAsync(JobTarget.colocated(table.name(), 1L, mapper), job), willBe(nodeKey1)); + assertThat(compute().executeAsync(JobTarget.colocated(table.name(), 2L, mapper), job), willBe(nodeKey2)); } @ParameterizedTest diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index 036dc130cb..3354db7fac 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -97,8 +97,6 @@ public class TestServer implements AutoCloseable { private final FakePlacementDriver placementDriver = new FakePlacementDriver(FakeInternalTable.PARTITIONS); - private final CmgMessagesFactory msgFactory = new CmgMessagesFactory(); - /** * Constructor. * @@ -220,7 +218,7 @@ public class TestServer implements AutoCloseable { assertThat(authenticationManager.startAsync(componentContext), willCompleteSuccessfully()); } - ClusterTag tag = msgFactory.clusterTag() + ClusterTag tag = new CmgMessagesFactory().clusterTag() .clusterName("Test Server") .clusterId(clusterId) .build(); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java index 0fa44c7007..b491288cba 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java @@ -39,6 +39,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.ignite.Ignite; +import org.apache.ignite.compute.AnyNodeJobTarget; +import org.apache.ignite.compute.ColocatedJobTarget; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; @@ -47,6 +49,7 @@ import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.ComputeUtils; import org.apache.ignite.internal.compute.IgniteComputeInternal; @@ -56,7 +59,6 @@ import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; -import org.apache.ignite.table.mapper.Mapper; import org.jetbrains.annotations.Nullable; /** @@ -127,57 +129,20 @@ public class FakeCompute implements IgniteComputeInternal { } @Override - public <R> JobExecution<R> submit(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return executeAsyncWithFailover(nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args); - } - - @Override - public <R> R execute(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return sync(this.executeAsync(nodes, descriptor, args)); - } - - @Override - public <R> JobExecution<R> submitColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return jobExecution(future != null ? future : completedFuture((R) nodeName)); - } - - @Override - public <K, R> JobExecution<R> submitColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return jobExecution(future != null ? future : completedFuture((R) nodeName)); - } - - /** {@inheritDoc} */ - @Override - public <R> R executeColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return sync(this.executeColocatedAsync(tableName, key, descriptor, args)); + public <R> JobExecution<R> submit(JobTarget target, JobDescriptor descriptor, Object... args) { + if (target instanceof AnyNodeJobTarget) { + Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes(); + return executeAsyncWithFailover(nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args); + } else if (target instanceof ColocatedJobTarget) { + return jobExecution(future != null ? future : completedFuture((R) nodeName)); + } else { + throw new IllegalArgumentException("Unsupported job target: " + target); + } } - /** {@inheritDoc} */ @Override - public <K, R> R executeColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return sync(executeColocatedAsync(tableName, key, keyMapper, descriptor, args)); + public <R> R execute(JobTarget target, JobDescriptor descriptor, Object... args) { + return sync(executeAsync(target, descriptor, args)); } @Override diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index 9d11f5bca8..59866d2d62 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -48,6 +48,7 @@ import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; @@ -85,7 +86,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> - entryNode.compute().execute(Set.of(entryNode.node()), JobDescriptor.builder(jobClassName).units(units()).build())); + entryNode.compute().execute(JobTarget.node(entryNode.node()), JobDescriptor.builder(jobClassName).units(units()).build())); assertTraceableException(ex, ComputeException.class, errorCode, msg); } @@ -96,7 +97,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute().executeAsync( - Set.of(entryNode.node()), JobDescriptor.builder(jobClassName).units(units()).build()) + JobTarget.node(entryNode.node()), JobDescriptor.builder(jobClassName).units(units()).build()) .get(1, TimeUnit.SECONDS)); assertTraceableException(ex, ComputeException.class, errorCode, msg); @@ -108,7 +109,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { Ignite entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute().execute( - Set.of(node(1).node(), node(2).node()), + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(jobClassName).units(units()).build())); assertTraceableException(ex, ComputeException.class, errorCode, msg); @@ -120,7 +121,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { Ignite entryNode = node(0); ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute().executeAsync( - Set.of(node(1).node(), node(2).node()), JobDescriptor.builder(jobClassName).units(units()).build()) + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(jobClassName).units(units()).build()) .get(1, TimeUnit.SECONDS)); assertTraceableException(ex, ComputeException.class, errorCode, msg); @@ -131,7 +132,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); String result = entryNode.compute().execute( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(concatJobClassName()).units(units()).build(), "a", 42); @@ -143,7 +144,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); JobExecution<String> execution = entryNode.compute().submit( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(concatJobClassName()).units(units()).build(), "a", 42); @@ -157,7 +158,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { Ignite entryNode = node(0); String result = entryNode.compute().execute( - Set.of(node(1).node(), node(2).node()), + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(concatJobClassName()).units(units()).build(), "a", 42); @@ -169,7 +170,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); JobExecution<String> execution = entryNode.compute().submit( - Set.of(node(1).node(), node(2).node()), + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(concatJobClassName()).units(units()).build(), new Object[]{"a", 42}); @@ -183,7 +184,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); CompletableFuture<String> fut = entryNode.compute().executeAsync( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(fut, willBe(entryNode.name())); @@ -195,7 +196,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl remoteNode = node(1); CompletableFuture<String> fut = entryNode.compute().executeAsync( - Set.of(remoteNode.node()), + JobTarget.node(remoteNode.node()), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(fut, willBe(remoteNode.name())); @@ -206,7 +207,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute().execute( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(failingJobClassName()).units(units()).build())); assertComputeException(ex, "JobException", "Oops"); @@ -217,7 +218,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); JobExecution<String> execution = entryNode.compute().submit( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(failingJobClassName()).units(units()).build()); ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); @@ -233,7 +234,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { Ignite entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute().execute( - Set.of(node(1).node(), node(2).node()), + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(failingJobClassName()).units(units()).build())); assertComputeException(ex, "JobException", "Oops"); @@ -244,7 +245,7 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { Ignite entryNode = node(0); JobExecution<String> execution = entryNode.compute().submit( - Set.of(node(1).node(), node(2).node()), + JobTarget.anyNode(node(1).node(), node(2).node()), JobDescriptor.builder(failingJobClassName()).units(units()).build()); ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); @@ -321,9 +322,8 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); - String actualNodeName = entryNode.compute().executeColocated( - "test", - Tuple.create(Map.of("k", 1)), + String actualNodeName = entryNode.compute().execute( + JobTarget.colocated("test", Tuple.create(Map.of("k", 1))), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(actualNodeName, in(allNodeNames())); @@ -335,9 +335,8 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); - JobExecution<String> execution = entryNode.compute().submitColocated( - "test", - Tuple.create(Map.of("k", 1)), + JobExecution<String> execution = entryNode.compute().submit( + JobTarget.colocated("test", Tuple.create(Map.of("k", 1))), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); @@ -351,9 +350,8 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { sql("CREATE TABLE test (k int, key_int int, v int, key_str VARCHAR, CONSTRAINT PK PRIMARY KEY (key_int, key_str))"); sql("INSERT INTO test VALUES (1, 2, 3, '4')"); - String actualNodeName = node(0).compute().executeColocated( - "test", - Tuple.create(Map.of("key_int", 2, "key_str", "4")), + String actualNodeName = node(0).compute().execute( + JobTarget.colocated("test", Tuple.create(Map.of("key_int", 2, "key_str", "4"))), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(actualNodeName, in(allNodeNames())); @@ -364,12 +362,9 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); var ex = assertThrows(CompletionException.class, - () -> { - entryNode.compute().submitColocated( - "\"bad-table\"", - Tuple.create(Map.of("k", 1)), - JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()).resultAsync().join(); - }); + () -> entryNode.compute().submit( + JobTarget.colocated("\"bad-table\"", Tuple.create(Map.of("k", 1))), + JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()).resultAsync().join()); assertInstanceOf(TableNotFoundException.class, ex.getCause()); assertThat(ex.getCause().getMessage(), containsString("The table does not exist [name=\"PUBLIC\".\"bad-table\"]")); @@ -394,8 +389,9 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); - String actualNodeName = entryNode.compute().executeColocated( - "test", 1, Mapper.of(Integer.class), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); + String actualNodeName = entryNode.compute().execute( + JobTarget.colocated("test", 1, Mapper.of(Integer.class)), + JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(actualNodeName, in(allNodeNames())); } @@ -406,10 +402,8 @@ public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { IgniteImpl entryNode = node(0); - JobExecution<String> execution = entryNode.compute().submitColocated( - "test", - 1, - Mapper.of(Integer.class), + JobExecution<String> execution = entryNode.compute().submit( + JobTarget.colocated("test", 1, Mapper.of(Integer.class)), JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build()); assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java index b9e999b683..82a6092e4a 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.NodeNotFoundException; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.compute.utils.InteractiveJobs; @@ -89,7 +90,7 @@ abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest { // And execute a job String workerNodeName = compute().execute( - nodes, + JobTarget.anyNode(nodes), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build(), RETURN_WORKER_NAME.name()); @@ -100,7 +101,7 @@ abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest { @Test void executeFailsWhenNoNodesAreInTheCluster() { // When set of nodes contain only non-existing nodes - Set<ClusterNode> nodes = Set.of(nonExistingNode); + JobTarget nodes = JobTarget.node(nonExistingNode); // Then job fails. assertThrows( @@ -138,6 +139,7 @@ abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest { protected abstract IgniteCompute compute(); private TestingJobExecution<String> executeGlobalInteractiveJob(Set<ClusterNode> nodes) { - return new TestingJobExecution<>(compute().submit(nodes, JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); + return new TestingJobExecution<>( + compute().submit(JobTarget.anyNode(nodes), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java index 41269a8797..e1d56aab29 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java @@ -44,7 +44,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +55,7 @@ import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.compute.JobExecutionOptions; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -86,7 +86,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl entryNode = node(0); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); - JobExecution<String> execution = entryNode.compute().submit(Set.of(entryNode.node()), job, new CountDownLatch(1)); + JobExecution<String> execution = entryNode.compute().submit(JobTarget.node(entryNode.node()), job, new CountDownLatch(1)); await().until(execution::statusAsync, willBe(jobStatusWithState(EXECUTING))); @@ -98,7 +98,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { @Test void cancelsQueuedJobLocally() { IgniteImpl entryNode = node(0); - Set<ClusterNode> nodes = Set.of(entryNode.node()); + var nodes = JobTarget.node(entryNode.node()); CountDownLatch countDownLatch = new CountDownLatch(1); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); @@ -129,7 +129,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl entryNode = node(0); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); - JobExecution<String> execution = entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1)); + JobExecution<String> execution = entryNode.compute().submit(JobTarget.node(node(1).node()), job, new CountDownLatch(1)); await().until(execution::statusAsync, willBe(jobStatusWithState(EXECUTING))); @@ -143,7 +143,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl entryNode = node(0); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); - JobExecution<String> execution = entryNode.compute().submit(Set.of(entryNode.node()), job, new CountDownLatch(1)); + JobExecution<String> execution = entryNode.compute().submit(JobTarget.node(entryNode.node()), job, new CountDownLatch(1)); await().until(execution::statusAsync, willBe(jobStatusWithState(EXECUTING))); assertThat(execution.changePriorityAsync(2), willBe(false)); @@ -155,7 +155,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl entryNode = node(0); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); - JobExecution<String> execution = entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1)); + JobExecution<String> execution = entryNode.compute().submit(JobTarget.node(node(1).node()), job, new CountDownLatch(1)); await().until(execution::statusAsync, willBe(jobStatusWithState(EXECUTING))); assertThat(execution.changePriorityAsync(2), willBe(false)); @@ -165,21 +165,21 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { @Test void changeJobPriorityLocally() { IgniteImpl entryNode = node(0); - Set<ClusterNode> nodes = Set.of(entryNode.node()); + JobTarget jobTarget = JobTarget.node(entryNode.node()); CountDownLatch countDownLatch = new CountDownLatch(1); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); // Start 1 task in executor with 1 thread - JobExecution<String> execution1 = entryNode.compute().submit(nodes, job, countDownLatch); + JobExecution<String> execution1 = entryNode.compute().submit(jobTarget, job, countDownLatch); await().until(execution1::statusAsync, willBe(jobStatusWithState(EXECUTING))); // Start one more task - JobExecution<String> execution2 = entryNode.compute().submit(nodes, job, new CountDownLatch(1)); + JobExecution<String> execution2 = entryNode.compute().submit(jobTarget, job, new CountDownLatch(1)); await().until(execution2::statusAsync, willBe(jobStatusWithState(QUEUED))); // Start third task - JobExecution<String> execution3 = entryNode.compute().submit(nodes, job, countDownLatch); + JobExecution<String> execution3 = entryNode.compute().submit(jobTarget, job, countDownLatch); await().until(execution3::statusAsync, willBe(jobStatusWithState(QUEUED))); // Task 2 and 3 are not completed, in queued state @@ -206,23 +206,23 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { @Test void executesJobLocallyWithOptions() { IgniteImpl entryNode = node(0); - Set<ClusterNode> nodes = Set.of(entryNode.node()); + JobTarget jobTarget = JobTarget.node(entryNode.node()); CountDownLatch countDownLatch = new CountDownLatch(1); JobDescriptor job = JobDescriptor.builder(WaitLatchJob.class).units(units()).build(); // Start 1 task in executor with 1 thread - JobExecution<String> execution1 = entryNode.compute().submit(nodes, job, new Object[]{countDownLatch}); + JobExecution<String> execution1 = entryNode.compute().submit(jobTarget, job, new Object[]{countDownLatch}); await().until(execution1::statusAsync, willBe(jobStatusWithState(EXECUTING))); // Start one more task - JobExecution<String> execution2 = entryNode.compute().submit(nodes, job, new Object[]{new CountDownLatch(1)}); + JobExecution<String> execution2 = entryNode.compute().submit(jobTarget, job, new Object[]{new CountDownLatch(1)}); await().until(execution2::statusAsync, willBe(jobStatusWithState(QUEUED))); // Start third task it should be before task2 in the queue due to higher priority in options JobExecutionOptions options = JobExecutionOptions.builder().priority(1).maxRetries(2).build(); JobExecution<String> execution3 = entryNode.compute().submit( - nodes, + jobTarget, JobDescriptor.builder(WaitLatchThrowExceptionOnFirstExecutionJob.class) .units(units()) .options(options) @@ -261,7 +261,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteException exception = new IgniteException(INTERNAL_ERR, "Test exception"); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute().execute( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(CustomFailingJob.class).units(units()).build(), exception)); @@ -275,7 +275,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteCheckedException exception = new IgniteCheckedException(INTERNAL_ERR, "Test exception"); IgniteCheckedException ex = assertThrows(IgniteCheckedException.class, () -> entryNode.compute().execute( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(CustomFailingJob.class).units(units()).build(), exception)); @@ -297,7 +297,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute().execute( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(CustomFailingJob.class).units(units()).build(), throwable)); @@ -315,7 +315,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { IgniteImpl targetNode = node(targetNodeIndex); assertDoesNotThrow(() -> entryNode.compute().execute( - Set.of(targetNode.node()), + JobTarget.node(targetNode.node()), JobDescriptor.builder(PerformSyncKvGetPutJob.class).build())); } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java index 546e73c820..3d500298d0 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java @@ -31,11 +31,11 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.deployunit.NodesToDeploy; @@ -105,7 +105,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest { List<DeploymentUnit> nonExistingUnits = List.of(new DeploymentUnit("non-existing", "1.0.0")); CompletableFuture<String> result = entryNode.compute().executeAsync( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(concatJobClassName()).units(nonExistingUnits).build(), "a", 42); @@ -128,13 +128,13 @@ class ItComputeTestStandalone extends ItComputeBaseTest { deployJar(entryNode, firstVersion.name(), firstVersion.version(), "ignite-unit-test-job1-1.0-SNAPSHOT.jar"); JobDescriptor job = JobDescriptor.builder("org.apache.ignite.internal.compute.UnitJob").units(jobUnits).build(); - CompletableFuture<Integer> result1 = entryNode.compute().executeAsync(Set.of(entryNode.node()), job); + CompletableFuture<Integer> result1 = entryNode.compute().executeAsync(JobTarget.node(entryNode.node()), job); assertThat(result1, willBe(1)); DeploymentUnit secondVersion = new DeploymentUnit("latest-unit", Version.parseVersion("1.0.1")); deployJar(entryNode, secondVersion.name(), secondVersion.version(), "ignite-unit-test-job2-1.0-SNAPSHOT.jar"); - CompletableFuture<String> result2 = entryNode.compute().executeAsync(Set.of(entryNode.node()), job); + CompletableFuture<String> result2 = entryNode.compute().executeAsync(JobTarget.node(entryNode.node()), job); assertThat(result2, willBe("Hello World!")); } @@ -143,7 +143,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest { IgniteImpl entryNode = node(0); CompletableFuture<Void> job = entryNode.compute().executeAsync( - Set.of(entryNode.node()), + JobTarget.node(entryNode.node()), JobDescriptor.builder(SleepJob.class).units(units).build(), 3L); @@ -168,11 +168,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest { IgniteImpl entryNode = node(0); JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units).build(); - CompletableFuture<Void> successJob = entryNode.compute().executeAsync(Set.of(entryNode.node()), job, 2L); + CompletableFuture<Void> successJob = entryNode.compute().executeAsync(JobTarget.node(entryNode.node()), job, 2L); assertThat(entryNode.deployment().undeployAsync(unit.name(), unit.version()), willCompleteSuccessfully()); - CompletableFuture<Void> failedJob = entryNode.compute().executeAsync(Set.of(entryNode.node()), job, 2L); + CompletableFuture<Void> failedJob = entryNode.compute().executeAsync(JobTarget.node(entryNode.node()), job, 2L); CompletionException ex0 = assertThrows(CompletionException.class, failedJob::join); assertComputeException( diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItExecutionsCleanerTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItExecutionsCleanerTest.java index 2343213f83..e4e66b2604 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItExecutionsCleanerTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItExecutionsCleanerTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.compute.utils.InteractiveJobs; @@ -245,6 +246,7 @@ class ItExecutionsCleanerTest extends ClusterPerClassIntegrationTest { private static TestingJobExecution<Object> submit(Set<ClusterNode> nodes) { IgniteCompute igniteCompute = CLUSTER.node(0).compute(); - return new TestingJobExecution<>(igniteCompute.submit(nodes, JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); + return new TestingJobExecution<>( + igniteCompute.submit(JobTarget.anyNode(nodes), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java index 92b3b468bf..e1cd0e1ce6 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.compute.utils.InteractiveJobs; import org.apache.ignite.internal.compute.utils.TestingJobExecution; @@ -95,6 +96,7 @@ public class ItFailoverCandidateNotFoundTest extends ClusterPerTestIntegrationTe } private static TestingJobExecution<String> executeGlobalInteractiveJob(IgniteCompute compute, Set<ClusterNode> nodes) { - return new TestingJobExecution<>(compute.submit(nodes, JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); + return new TestingJobExecution<>( + compute.submit(JobTarget.anyNode(nodes), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java index 69b3851fa1..a83b9a4fe4 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java @@ -39,6 +39,7 @@ import java.util.stream.Collectors; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.compute.utils.InteractiveJobs; @@ -290,9 +291,8 @@ public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest // When start colocated job on node that is not primary replica. IgniteImpl entryNode = anyNodeExcept(primaryReplica); TestingJobExecution<Object> execution = new TestingJobExecution<>( - compute(entryNode).submitColocated( - TABLE_NAME, - Tuple.create(1).set("K", 1), + compute(entryNode).submit( + JobTarget.colocated(TABLE_NAME, Tuple.create(1).set("K", 1)), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build())); // Then the job is alive. @@ -363,7 +363,7 @@ public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest private TestingJobExecution<String> executeGlobalInteractiveJob(IgniteImpl entryNode, Set<String> nodes) { return new TestingJobExecution<>( compute(entryNode).submit( - clusterNodesByNames(nodes), + JobTarget.anyNode(clusterNodesByNames(nodes)), JobDescriptor.builder(InteractiveJobs.globalJob().name()).build()) ); } @@ -378,7 +378,7 @@ public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest executeSql("INSERT INTO test(k, v) VALUES (1, 101)"); } - private List<String> allNodeNames() { + private static List<String> allNodeNames() { return new ArrayList<>(NODES_NAMES_TO_INDEXES.keySet()); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java index d19788588a..739fd8aa3c 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.compute.IgniteComputeImpl; import org.apache.ignite.internal.wrapper.Wrappers; @@ -148,11 +149,18 @@ class ItComputeApiThreadingTest extends ClusterPerClassIntegrationTest { } private enum ComputeAsyncOperation { - EXECUTE_ASYNC(compute -> compute.executeAsync(justNonEntryNode(), JobDescriptor.builder(NoOpJob.class).build())), + EXECUTE_ASYNC(compute -> compute.executeAsync(JobTarget.anyNode(justNonEntryNode()), JobDescriptor.builder(NoOpJob.class).build())), + EXECUTE_COLOCATED_BY_TUPLE_ASYNC(compute -> - compute.executeColocatedAsync(TABLE_NAME, KEY_TUPLE, JobDescriptor.builder(NoOpJob.class).build())), + compute.executeAsync( + JobTarget.colocated(TABLE_NAME, KEY_TUPLE), + JobDescriptor.builder(NoOpJob.class).build())), + EXECUTE_COLOCATED_BY_KEY_ASYNC(compute -> - compute.executeColocatedAsync(TABLE_NAME, KEY, Mapper.of(Integer.class), JobDescriptor.builder(NoOpJob.class).build())), + compute.executeAsync( + JobTarget.colocated(TABLE_NAME, KEY, Mapper.of(Integer.class)), + JobDescriptor.builder(NoOpJob.class).build())), + EXECUTE_BROADCAST_ASYNC(compute -> compute.executeBroadcastAsync(justNonEntryNode(), JobDescriptor.builder(NoOpJob.class).build())); private final Function<IgniteCompute, CompletableFuture<?>> action; @@ -167,11 +175,17 @@ class ItComputeApiThreadingTest extends ClusterPerClassIntegrationTest { } private enum ComputeSubmitOperation { - SUBMIT(compute -> compute.submit(justNonEntryNode(), JobDescriptor.builder(NoOpJob.class).build())), - SUBMIT_COLOCATED_BY_TUPLE(compute -> compute.submitColocated(TABLE_NAME, KEY_TUPLE, JobDescriptor.builder(NoOpJob.class).build())), - SUBMIT_COLOCATED_BY_KEY(compute -> compute.submitColocated( - TABLE_NAME, KEY, Mapper.of(Integer.class), JobDescriptor.builder(NoOpJob.class).build()) + SUBMIT(compute -> compute.submit(JobTarget.anyNode(justNonEntryNode()), JobDescriptor.builder(NoOpJob.class).build())), + + SUBMIT_COLOCATED_BY_TUPLE(compute -> compute.submit( + JobTarget.colocated(TABLE_NAME, KEY_TUPLE), + JobDescriptor.builder(NoOpJob.class).build())), + + SUBMIT_COLOCATED_BY_KEY(compute -> compute.submit( + JobTarget.colocated(TABLE_NAME, KEY, Mapper.of(Integer.class)), + JobDescriptor.builder(NoOpJob.class).build()) ), + SUBMIT_BROADCAST(compute -> compute .submitBroadcast(justNonEntryNode(), JobDescriptor.builder(NoOpJob.class).build()) .values().iterator().next() diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java index dadccee035..770915221e 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java @@ -28,12 +28,11 @@ import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution; import org.apache.ignite.internal.wrapper.Wrapper; import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.table.Tuple; -import org.apache.ignite.table.mapper.Mapper; /** * Wrapper around {@link IgniteCompute} that adds protection against thread hijacking by users. @@ -51,55 +50,13 @@ public class AntiHijackIgniteCompute implements IgniteCompute, Wrapper { } @Override - public <R> JobExecution<R> submit(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return preventThreadHijack(compute.submit(nodes, descriptor, args)); + public <R> JobExecution<R> submit(JobTarget target, JobDescriptor descriptor, Object... args) { + return preventThreadHijack(compute.submit(target, descriptor, args)); } @Override - public <R> R execute(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return compute.execute(nodes, descriptor, args); - } - - @Override - public <R> JobExecution<R> submitColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return preventThreadHijack(compute.submitColocated(tableName, key, descriptor, args)); - } - - @Override - public <K, R> JobExecution<R> submitColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return preventThreadHijack(compute.submitColocated(tableName, key, keyMapper, descriptor, args)); - } - - @Override - public <R> R executeColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return compute.executeColocated(tableName, key, descriptor, args); - } - - @Override - public <K, R> R executeColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return compute.executeColocated(tableName, key, keyMapper, descriptor, args); + public <R> R execute(JobTarget target, JobDescriptor descriptor, Object... args) { + return compute.execute(target, descriptor, args); } @Override diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 78387092a4..e91ada7979 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -39,6 +39,8 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.ignite.compute.AnyNodeJobTarget; +import org.apache.ignite.compute.ColocatedJobTarget; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; @@ -46,6 +48,7 @@ import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.NodeNotFoundException; import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.TaskExecution; @@ -95,15 +98,55 @@ public class IgniteComputeImpl implements IgniteComputeInternal { } @Override - public <R> JobExecution<R> submit(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - Objects.requireNonNull(nodes); + public <R> JobExecution<R> submit(JobTarget target, JobDescriptor descriptor, Object... args) { + Objects.requireNonNull(target); Objects.requireNonNull(descriptor); - if (nodes.isEmpty()) { - throw new IllegalArgumentException("nodes must not be empty."); + if (target instanceof AnyNodeJobTarget) { + Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes(); + + return executeAsyncWithFailover(nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args); } - return executeAsyncWithFailover(nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args); + if (target instanceof ColocatedJobTarget) { + ColocatedJobTarget colocatedTarget = (ColocatedJobTarget) target; + var mapper = (Mapper<? super Object>) colocatedTarget.keyMapper(); + String tableName = colocatedTarget.tableName(); + Object key = colocatedTarget.key(); + + CompletableFuture<JobExecution<R>> jobFut; + if (mapper != null) { + jobFut = requiredTable(tableName) + .thenCompose(table -> primaryReplicaForPartitionByMappedKey(table, key, mapper) + .thenApply(primaryNode -> executeOnOneNodeWithFailover( + primaryNode, + new NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, table, key, mapper), + descriptor.units(), + descriptor.jobClassName(), + descriptor.options(), + args + ))); + + } else { + jobFut = requiredTable(tableName) + .thenCompose(table -> submitColocatedInternal( + table, + (Tuple) key, + descriptor.units(), + descriptor.jobClassName(), + descriptor.options(), + args)); + } + + return new JobExecutionFutureWrapper<>(jobFut); + } + + throw new IllegalArgumentException("Unsupported job target: " + target); + } + + @Override + public <R> R execute(JobTarget target, JobDescriptor descriptor, Object... args) { + return sync(executeAsync(target, descriptor, args)); } @Override @@ -141,12 +184,6 @@ public class IgniteComputeImpl implements IgniteComputeInternal { )); } - @Override - public <R> R execute(Set<ClusterNode> nodes, JobDescriptor descriptor, Object... args) { - return sync(this.executeAsync(nodes, descriptor, args)); - } - - private static ClusterNode randomNode(Set<ClusterNode> nodes) { int nodesToSkip = ThreadLocalRandom.current().nextInt(nodes.size()); @@ -195,69 +232,6 @@ public class IgniteComputeImpl implements IgniteComputeInternal { return targetNode.equals(topologyService.localMember()); } - @Override - public <R> JobExecution<R> submitColocated( - String tableName, - Tuple tuple, - JobDescriptor descriptor, - Object... args - ) { - Objects.requireNonNull(tableName); - Objects.requireNonNull(tuple); - Objects.requireNonNull(descriptor); - - return new JobExecutionFutureWrapper<>( - requiredTable(tableName) - .thenCompose(table -> submitColocatedInternal( - table, tuple, descriptor.units(), descriptor.jobClassName(), descriptor.options(), args)) - ); - } - - @Override - public <K, R> JobExecution<R> submitColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - Objects.requireNonNull(tableName); - Objects.requireNonNull(key); - Objects.requireNonNull(keyMapper); - Objects.requireNonNull(descriptor); - - return new JobExecutionFutureWrapper<>( - requiredTable(tableName) - .thenCompose(table -> primaryReplicaForPartitionByMappedKey(table, key, keyMapper) - .thenApply(primaryNode -> executeOnOneNodeWithFailover( - primaryNode, - new NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, table, key, keyMapper), - descriptor.units(), descriptor.jobClassName(), descriptor.options(), args - ))) - ); - } - - @Override - public <R> R executeColocated( - String tableName, - Tuple key, - JobDescriptor descriptor, - Object... args - ) { - return sync(this.executeColocatedAsync(tableName, key, descriptor, args)); - } - - @Override - public <K, R> R executeColocated( - String tableName, - K key, - Mapper<K> keyMapper, - JobDescriptor descriptor, - Object... args - ) { - return sync(executeColocatedAsync(tableName, key, keyMapper, descriptor, args)); - } - @Override public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal( TableViewInternal table, @@ -347,7 +321,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal { } private JobExecution<Object> submitJob(MapReduceJob runner) { - return submit(runner.nodes(), runner.jobDescriptor(), runner.args()); + return submit(JobTarget.anyNode(runner.nodes()), runner.jobDescriptor(), runner.args()); } @Override diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java index 4cdf6f49c5..d2b03c2b43 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.compute; -import static java.util.Collections.singleton; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -44,6 +43,7 @@ import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.network.ClusterNodeImpl; @@ -112,7 +112,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { assertThat( compute.executeAsync( - singleton(localNode), + JobTarget.node(localNode), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).build(), "a", 42), willBe("jobResponse") @@ -127,7 +127,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { assertThat( compute.executeAsync( - singleton(remoteNode), + JobTarget.node(remoteNode), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).build(), "a", 42), willBe("remoteResponse") @@ -144,7 +144,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { JobExecutionOptions options = JobExecutionOptions.builder().priority(1).maxRetries(2).build(); assertThat( compute.executeAsync( - singleton(localNode), + JobTarget.node(localNode), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).options(options).build(), "a", 42), willBe("jobResponse") @@ -162,7 +162,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { assertThat( compute.executeAsync( - singleton(remoteNode), + JobTarget.node(remoteNode), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).options(options).build(), "a", 42), willBe("remoteResponse") @@ -177,9 +177,8 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { respondWhenAskForPrimaryReplica(); assertThat( - compute.executeColocatedAsync( - "test", - Tuple.create(Map.of("k", 1)), + compute.executeAsync( + JobTarget.colocated("test", Tuple.create(Map.of("k", 1))), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).build(), "a", 42), willBe("remoteResponse") @@ -192,10 +191,8 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { respondWhenAskForPrimaryReplica(); assertThat( - compute.executeColocatedAsync( - "test", - 1, - Mapper.of(Integer.class), + compute.executeAsync( + JobTarget.colocated("test", 1, Mapper.of(Integer.class)), JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).build(), "a", 42 ), diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java index 767fbc4a15..17851fb0c5 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.rest.api.Problem; @@ -365,7 +366,7 @@ public class ItComputeControllerTest extends ClusterPerClassIntegrationTest { } private static JobExecution<String> runBlockingJob(IgniteImpl entryNode, Set<ClusterNode> nodes) { - return entryNode.compute().submit(nodes, JobDescriptor.builder(BlockingJob.class).build()); + return entryNode.compute().submit(JobTarget.anyNode(nodes), JobDescriptor.builder(BlockingJob.class).build()); } private static void unblockJob() { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index 262b82bcca..23b19928d0 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -79,6 +79,7 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecution; @@ -120,8 +121,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testExecuteOnSpecificNode() { - String res1 = client().compute().execute(Set.of(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); - String res2 = client().compute().execute(Set.of(node(1)), JobDescriptor.builder(NodeNameJob.class).build()); + String res1 = client().compute().execute(JobTarget.node(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); + String res2 = client().compute().execute(JobTarget.node(node(1)), JobDescriptor.builder(NodeNameJob.class).build()); assertEquals("itcct_n_3344", res1); assertEquals("itcct_n_3345", res2); @@ -129,8 +130,11 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testExecuteOnSpecificNodeAsync() { - JobExecution<String> execution1 = client().compute().submit(Set.of(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); - JobExecution<String> execution2 = client().compute().submit(Set.of(node(1)), JobDescriptor.builder(NodeNameJob.class).build()); + JobExecution<String> execution1 = client().compute().submit( + JobTarget.node(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); + + JobExecution<String> execution2 = client().compute().submit( + JobTarget.node(node(1)), JobDescriptor.builder(NodeNameJob.class).build()); assertThat(execution1.resultAsync(), willBe("itcct_n_3344")); assertThat(execution2.resultAsync(), willBe("itcct_n_3345")); @@ -141,7 +145,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testCancellingCompletedJob() { - JobExecution<String> execution = client().compute().submit(Set.of(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); + JobExecution<String> execution = client().compute().submit( + JobTarget.node(node(0)), + JobDescriptor.builder(NodeNameJob.class).build()); assertThat(execution.resultAsync(), willBe("itcct_n_3344")); @@ -152,7 +158,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testChangingPriorityCompletedJob() { - JobExecution<String> execution = client().compute().submit(Set.of(node(0)), JobDescriptor.builder(NodeNameJob.class).build()); + JobExecution<String> execution = client().compute().submit( + JobTarget.node(node(0)), + JobDescriptor.builder(NodeNameJob.class).build()); assertThat(execution.resultAsync(), willBe("itcct_n_3344")); @@ -169,8 +177,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { .builder(asyncJob ? AsyncSleepJob.class : SleepJob.class) .build(); - JobExecution<String> execution1 = client().compute().submit(Set.of(node(0)), sleepJob, sleepMs); - JobExecution<String> execution2 = client().compute().submit(Set.of(node(1)), sleepJob, sleepMs); + JobExecution<String> execution1 = client().compute().submit(JobTarget.node(node(0)), sleepJob, sleepMs); + JobExecution<String> execution2 = client().compute().submit(JobTarget.node(node(1)), sleepJob, sleepMs); await().until(execution1::statusAsync, willBe(jobStatusWithState(EXECUTING))); await().until(execution2::statusAsync, willBe(jobStatusWithState(EXECUTING))); @@ -186,18 +194,18 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { void changeJobPriority() { int sleepMs = 1_000_000; JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build(); - Set<ClusterNode> nodes = Set.of(node(0)); + JobTarget target = JobTarget.node(node(0)); // Start 1 task in executor with 1 thread - JobExecution<String> execution1 = client().compute().submit(nodes, sleepJob, sleepMs); + JobExecution<String> execution1 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution1::statusAsync, willBe(jobStatusWithState(EXECUTING))); // Start one more long lasting task - JobExecution<String> execution2 = client().compute().submit(nodes, sleepJob, sleepMs); + JobExecution<String> execution2 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution2::statusAsync, willBe(jobStatusWithState(QUEUED))); // Start third task - JobExecution<String> execution3 = client().compute().submit(nodes, sleepJob, sleepMs); + JobExecution<String> execution3 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution3::statusAsync, willBe(jobStatusWithState(QUEUED))); // Task 2 and 3 are not completed, in queue state @@ -222,7 +230,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testExecuteOnRandomNode() { - String res = client().compute().execute(new HashSet<>(sortedNodes()), JobDescriptor.builder(NodeNameJob.class).build()); + String res = client().compute().execute(JobTarget.anyNode(sortedNodes()), JobDescriptor.builder(NodeNameJob.class).build()); assertTrue(Set.of("itcct_n_3344", "itcct_n_3345").contains(res)); } @@ -230,7 +238,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testExecuteOnRandomNodeAsync() { JobExecution<String> execution = client().compute().submit( - new HashSet<>(sortedNodes()), JobDescriptor.builder(NodeNameJob.class).build()); + JobTarget.anyNode(sortedNodes()), JobDescriptor.builder(NodeNameJob.class).build()); assertThat( execution.resultAsync(), @@ -301,7 +309,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testExecuteWithArgs() { JobExecution<String> execution = client().compute().submit( - new HashSet<>(client().clusterNodes()), JobDescriptor.builder(ConcatJob.class).build(), 1, "2", 3.3); + JobTarget.anyNode(client().clusterNodes()), JobDescriptor.builder(ConcatJob.class).build(), 1, "2", 3.3); assertThat(execution.resultAsync(), willBe("1_2_3.3")); assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); @@ -310,7 +318,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdAsync() { IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submit(Set.of(node(0)), JobDescriptor.builder(IgniteExceptionJob.class).build()) + client().compute().submit(JobTarget.node(node(0)), JobDescriptor.builder(IgniteExceptionJob.class).build()) ); assertThat(cause.getMessage(), containsString("Custom job error")); @@ -323,7 +331,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdSync() { IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().execute(Set.of(node(0)), JobDescriptor.builder(IgniteExceptionJob.class).build()) + () -> client().compute().execute(JobTarget.node(node(0)), JobDescriptor.builder(IgniteExceptionJob.class).build()) ); assertThat(cause.getMessage(), containsString("Custom job error")); @@ -337,7 +345,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @ValueSource(booleans = {true, false}) void testExceptionInJobPropagatesToClientWithClassAndMessageAsync(boolean asyncJob) { IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submit(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) + client().compute().submit(JobTarget.node(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) ); assertComputeExceptionWithClassAndMessage(cause); @@ -347,7 +355,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @ValueSource(booleans = {true, false}) void testExceptionInJobPropagatesToClientWithClassAndMessageSync(boolean asyncJob) { IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().execute(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) + () -> client().compute().execute(JobTarget.node(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) ); assertComputeExceptionWithClassAndMessage(cause); @@ -357,7 +365,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { void testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync() { // Second node has sendServerExceptionStackTraceToClient enabled. IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submit(Set.of(node(1)), JobDescriptor.builder(ExceptionJob.class) + client().compute().submit(JobTarget.node(node(1)), JobDescriptor.builder(ExceptionJob.class) .build()) ); @@ -368,7 +376,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { void testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync() { // Second node has sendServerExceptionStackTraceToClient enabled. IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().execute(Set.of(node(1)), JobDescriptor.builder(ExceptionJob.class).build()) + () -> client().compute().execute(JobTarget.node(node(1)), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithStackTrace(cause); @@ -391,7 +399,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { var key = Tuple.create().set(COLUMN_KEY, 1); IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submitColocated(TABLE_NAME, key, JobDescriptor.builder(ExceptionJob.class).build())); + client().compute().submit(JobTarget.colocated(TABLE_NAME, key), JobDescriptor.builder(ExceptionJob.class).build())); assertComputeExceptionWithClassAndMessage(cause); } @@ -401,7 +409,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { var key = Tuple.create().set(COLUMN_KEY, 1); IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().executeColocated(TABLE_NAME, key, JobDescriptor.builder(ExceptionJob.class).build()) + () -> client().compute().execute(JobTarget.colocated(TABLE_NAME, key), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithClassAndMessage(cause); @@ -414,7 +422,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { IgniteCompute igniteCompute = client().compute(); IgniteException cause = getExceptionInJobExecutionAsync( - igniteCompute.submitColocated(TABLE_NAME, key, JobDescriptor.builder(ExceptionJob.class).build()) + igniteCompute.submit(JobTarget.colocated(TABLE_NAME, key), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithStackTrace(cause); @@ -426,7 +434,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { var key = Tuple.create().set(COLUMN_KEY, 2); IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().executeColocated(TABLE_NAME, key, JobDescriptor.builder(ExceptionJob.class).build()) + () -> client().compute().execute(JobTarget.colocated(TABLE_NAME, key), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithStackTrace(cause); @@ -439,7 +447,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { IgniteCompute igniteCompute = client().compute(); IgniteException cause = getExceptionInJobExecutionAsync( - igniteCompute.submitColocated(TABLE_NAME, key, mapper, JobDescriptor.builder(ExceptionJob.class) + igniteCompute.submit(JobTarget.colocated(TABLE_NAME, key, mapper), JobDescriptor.builder(ExceptionJob.class) .build()) ); @@ -451,8 +459,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { var key = new TestPojo(1); Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); - IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().executeColocated(TABLE_NAME, key, mapper, JobDescriptor.builder(ExceptionJob.class).build()) + IgniteException cause = getExceptionInJobExecutionSync(() -> client().compute().execute( + JobTarget.colocated(TABLE_NAME, key, mapper), + JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithClassAndMessage(cause); @@ -466,7 +475,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { IgniteCompute igniteCompute = client().compute(); IgniteException cause = getExceptionInJobExecutionAsync( - igniteCompute.submitColocated(TABLE_NAME, key, mapper, JobDescriptor.builder(ExceptionJob.class).build()) + igniteCompute.submit(JobTarget.colocated(TABLE_NAME, key, mapper), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithStackTrace(cause); @@ -479,7 +488,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().executeColocated(TABLE_NAME, key, mapper, JobDescriptor.builder(ExceptionJob.class).build()) + () -> client().compute().execute( + JobTarget.colocated(TABLE_NAME, key, mapper), JobDescriptor.builder(ExceptionJob.class).build()) ); assertComputeExceptionWithStackTrace(cause); @@ -528,8 +538,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { var keyTuple = Tuple.create().set(COLUMN_KEY, key); IgniteCompute igniteCompute = client().compute(); - JobExecution<String> tupleExecution = igniteCompute.submitColocated( - TABLE_NAME, keyTuple, JobDescriptor.builder(NodeNameJob.class).build()); + JobExecution<String> tupleExecution = igniteCompute.submit( + JobTarget.colocated(TABLE_NAME, keyTuple), JobDescriptor.builder(NodeNameJob.class).build()); String expectedNode = "itcct_n_" + port; assertThat(tupleExecution.resultAsync(), willBe(expectedNode)); @@ -544,8 +554,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { IgniteCompute igniteCompute = client().compute(); Mapper<TestPojo> keyMapper = Mapper.of(TestPojo.class); - JobExecution<String> pojoExecution = igniteCompute.submitColocated( - TABLE_NAME, keyPojo, keyMapper, JobDescriptor.builder(NodeNameJob.class).build()); + JobExecution<String> pojoExecution = igniteCompute.submit( + JobTarget.colocated(TABLE_NAME, keyPojo, keyMapper), JobDescriptor.builder(NodeNameJob.class).build()); String expectedNode = "itcct_n_" + port; assertThat(pojoExecution.resultAsync(), willBe(expectedNode)); @@ -560,8 +570,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { int sleepMs = 1_000_000; IgniteCompute igniteCompute = client().compute(); - JobExecution<String> tupleExecution = igniteCompute.submitColocated( - TABLE_NAME, keyTuple, JobDescriptor.builder(SleepJob.class).build(), sleepMs); + JobExecution<String> tupleExecution = igniteCompute.submit( + JobTarget.colocated(TABLE_NAME, keyTuple), JobDescriptor.builder(SleepJob.class).build(), sleepMs); await().until(tupleExecution::statusAsync, willBe(jobStatusWithState(EXECUTING))); @@ -578,8 +588,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { IgniteCompute igniteCompute = client().compute(); Mapper<TestPojo> keyMapper = Mapper.of(TestPojo.class); - JobExecution<String> pojoExecution = igniteCompute.submitColocated( - TABLE_NAME, keyPojo, keyMapper, JobDescriptor.builder(SleepJob.class).build(), sleepMs); + JobExecution<String> pojoExecution = igniteCompute.submit( + JobTarget.colocated(TABLE_NAME, keyPojo, keyMapper), JobDescriptor.builder(SleepJob.class).build(), sleepMs); await().until(pojoExecution::statusAsync, willBe(jobStatusWithState(EXECUTING))); @@ -594,9 +604,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { CompletionException.class, () -> { IgniteCompute igniteCompute = client().compute(); - Set<ClusterNode> nodes = Set.of(node(0)); + JobTarget target = JobTarget.node(node(0)); List<DeploymentUnit> units = List.of(new DeploymentUnit("u", "latest")); - igniteCompute.executeAsync(nodes, JobDescriptor.builder(NodeNameJob.class).units(units).build()).join(); + igniteCompute.executeAsync(target, JobDescriptor.builder(NodeNameJob.class).units(units).build()).join(); }); var cause = (IgniteException) ex.getCause(); @@ -610,9 +620,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { void testExecuteColocatedOnUnknownUnitWithLatestVersionThrows() { CompletionException ex = assertThrows( CompletionException.class, - () -> client().compute().executeColocatedAsync( - TABLE_NAME, - Tuple.create().set(COLUMN_KEY, 1), + () -> client().compute().executeAsync( + JobTarget.colocated(TABLE_NAME, Tuple.create().set(COLUMN_KEY, 1)), JobDescriptor.builder(NodeNameJob.class) .units(new DeploymentUnit("u", "latest")) .build()).join()); @@ -630,7 +639,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { try (IgniteClient client = builder.build()) { int delayMs = 3000; CompletableFuture<String> jobFut = client.compute().executeAsync( - Set.of(node(0)), JobDescriptor.builder(SleepJob.class).build(), delayMs); + JobTarget.node(node(0)), JobDescriptor.builder(SleepJob.class).build(), delayMs); // Wait a bit and close the connection. Thread.sleep(10); @@ -668,9 +677,10 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); TestPojo pojoKey = new TestPojo(1); Tuple tupleKey = Tuple.create().set("key", pojoKey.key); + JobDescriptor job = JobDescriptor.builder(NodeNameJob.class).build(); - var tupleRes = client().compute().executeColocated(tableName, tupleKey, JobDescriptor.builder(NodeNameJob.class).build()); - var pojoRes = client().compute().executeColocated(tableName, pojoKey, mapper, JobDescriptor.builder(NodeNameJob.class).build()); + var tupleRes = client().compute().execute(JobTarget.colocated(tableName, tupleKey), job); + var pojoRes = client().compute().execute(JobTarget.colocated(tableName, pojoKey, mapper), job); assertEquals(tupleRes, pojoRes); } @@ -678,7 +688,11 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @ParameterizedTest @CsvSource({"1E3,-3", "1.12E5,-5", "1.12E5,0", "1.123456789,10", "1.123456789,5"}) void testBigDecimalPropagation(String number, int scale) { - BigDecimal res = client().compute().execute(Set.of(node(0)), JobDescriptor.builder(DecimalJob.class).build(), number, scale); + BigDecimal res = client().compute().execute( + JobTarget.node(node(0)), + JobDescriptor.builder(DecimalJob.class).build(), + number, + scale); var expected = new BigDecimal(number).setScale(scale, RoundingMode.HALF_UP); assertEquals(expected, res); @@ -725,7 +739,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { } private void testEchoArg(Object arg) { - Object res = client().compute().execute(Set.of(node(0)), JobDescriptor.builder(EchoJob.class).build(), arg, arg.toString()); + Object res = client().compute().execute(JobTarget.node(node(0)), JobDescriptor.builder(EchoJob.class).build(), arg, arg.toString()); if (arg instanceof byte[]) { assertArrayEquals((byte[]) arg, (byte[]) res); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java index ded59c2b37..dc9b05ccea 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.client.IgniteClient; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.compute.JobTarget; import org.apache.ignite.internal.runner.app.client.proxy.IgniteClientProxy; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -87,8 +88,8 @@ public class ItThinClientPartitionAwarenessTest extends ItAbstractThinClientTest for (int key = 0; key < 50; key++) { // Get actual primary node using compute. Tuple keyTuple = Tuple.create().set("key", key); - var primaryNodeName = proxyClient.compute().executeColocated( - TABLE_NAME, keyTuple, JobDescriptor.builder(NodeNameJob.class.getName()).build()); + var primaryNodeName = proxyClient.compute().execute( + JobTarget.colocated(TABLE_NAME, keyTuple), JobDescriptor.builder(NodeNameJob.class.getName()).build()); // Perform request and check routing with proxy. resetRequestCount();