This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new bba18a5f99 IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191) bba18a5f99 is described below commit bba18a5f9986b835598da2323190f0f74ff8ea31 Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Tue Feb 20 13:40:25 2024 +0300 IGNITE-21390 Inconsistent behavior of Compute APIs when target node does not exist (#3191) --- .../ignite/compute/NodeNotFoundException.java | 37 ++++++ .../java/org/apache/ignite/lang/ErrorGroups.java | 3 + .../compute/ClientComputeExecuteRequest.java | 18 ++- .../internal/compute/ItComputeErrorsBaseTest.java | 141 +++++++++++++++++++++ .../compute/ItEmbeddedComputeErrorsTest.java | 27 ++++ .../compute/ItFailoverCandidateNotFoundTest.java | 100 +++++++++++++++ .../compute/ItThinClientComputeErrorsTest.java | 44 +++++++ .../internal/compute/utils/InteractiveJobs.java | 32 ++++- .../internal/compute/ComputeJobFailover.java | 21 ++- .../ignite/internal/compute/FailedExecution.java | 59 +++++++++ .../ignite/internal/compute/IgniteComputeImpl.java | 23 +++- .../internal/compute/IgniteComputeImplTest.java | 2 + modules/platforms/cpp/ignite/common/error_codes.h | 1 + modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 + .../cpp/tests/client-test/compute_test.cpp | 20 ++- .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 21 ++- .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 + 17 files changed, 533 insertions(+), 20 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java new file mode 100644 index 0000000000..0c3763740d --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/NodeNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import static org.apache.ignite.lang.ErrorGroups.Compute.NODE_NOT_FOUND_ERR; + +import java.util.Set; +import java.util.UUID; + +/** + * Thrown when compute component can't find the node to run the job on in the cluster. + */ +public class NodeNotFoundException extends ComputeException { + public NodeNotFoundException(Set<String> nodeNames) { + super(NODE_NOT_FOUND_ERR, "None of the specified nodes are present in the cluster: " + nodeNames); + } + + //TODO https://issues.apache.org/jira/browse/IGNITE-20140 + public NodeNotFoundException(UUID traceId, int code, String message, Throwable cause) { + super(traceId, code, message, cause); + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java index 55923ebe75..146f899b1f 100755 --- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java @@ -556,6 +556,9 @@ public class ErrorGroups { /** Cannot change job priority. */ public static final int CHANGE_JOB_PRIORITY_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 13); + + /** Specified node is not found in the cluster. */ + public static final int NODE_NOT_FOUND_ERR = COMPUTE_ERR_GROUP.registerErrorCode((short) 14); } /** Catalog error group. */ diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java index 3b03095b41..1fcb86812e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java @@ -28,11 +28,11 @@ import org.apache.ignite.client.handler.NotificationSender; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; +import org.apache.ignite.compute.NodeNotFoundException; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; /** @@ -70,17 +70,25 @@ public class ClientComputeExecuteRequest { private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker in, ClusterService cluster) { int size = in.unpackInt(); + + if (size < 1) { + throw new IllegalArgumentException("nodes must not be empty."); + } + + Set<String> nodeNames = new HashSet<>(size); Set<ClusterNode> nodes = new HashSet<>(size); for (int i = 0; i < size; i++) { String nodeName = in.unpackString(); + nodeNames.add(nodeName); ClusterNode node = cluster.topologyService().getByConsistentId(nodeName); - - if (node == null) { - throw new IgniteException("Specified node is not present in the cluster: " + nodeName); + if (node != null) { + nodes.add(node); } + } - nodes.add(node); + if (nodes.isEmpty()) { + throw new NodeNotFoundException(nodeNames); } return nodes; diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java new file mode 100644 index 0000000000..60490107f4 --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static org.apache.ignite.internal.compute.utils.InteractiveJobs.Signal.RETURN_WORKER_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.NodeNotFoundException; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.compute.utils.InteractiveJobs; +import org.apache.ignite.internal.compute.utils.TestingJobExecution; +import org.apache.ignite.internal.network.ClusterNodeImpl; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.Test; + +/** + * Tests compute API errors. + */ +@SuppressWarnings("ThrowableNotThrown") +abstract class ItComputeErrorsBaseTest extends ClusterPerClassIntegrationTest { + private final ClusterNode nonExistingNode = new ClusterNodeImpl( + "non-existing-id", "non-existing-name", new NetworkAddress("non-existing-host", 1) + ); + + @Test + void executeAsyncSucceedsWhenAtLeastOnNodeIsInTheCluster() throws InterruptedException { + // When set of nodes contain existing and non-existing nodes + ClusterNode existingNode = CLUSTER.node(0).node(); + Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode); + + // And execute a job + TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes); + + // Then existing node became a worker and run the job. + String workerNodeName = InteractiveJobs.globalJob().currentWorkerName(); + assertThat(workerNodeName, is(existingNode.name())); + + // And job is running. + InteractiveJobs.globalJob().assertAlive(); + + // Cleanup + InteractiveJobs.globalJob().finish(); + execution.assertCompleted(); + } + + @Test + void executeAsyncFailsWhenNoNodesAreInTheCluster() { + // When set of nodes contain only non-existing nodes + Set<ClusterNode> nodes = Set.of(nonExistingNode); + + // And execute a job + TestingJobExecution<String> execution = executeGlobalInteractiveJob(nodes); + + // Then job fails. + String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]"; + assertThat(execution.resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment)); + } + + @Test + void executeSucceedsWhenAtLeastOnNodeIsInTheCluster() { + // When set of nodes contain existing and non-existing nodes + ClusterNode existingNode = CLUSTER.node(0).node(); + Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode); + + // And execute a job + String workerNodeName = compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name(), RETURN_WORKER_NAME.name()); + + // Then existing node was a worker and executed the job. + assertThat(workerNodeName, is(existingNode.name())); + } + + @Test + void executeFailsWhenNoNodesAreInTheCluster() { + // When set of nodes contain only non-existing nodes + Set<ClusterNode> nodes = Set.of(nonExistingNode); + + // Then job fails. + assertThrows( + NodeNotFoundException.class, + () -> compute().execute(nodes, List.of(), InteractiveJobs.globalJob().name()), + "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]" + ); + } + + @Test + void broadcastAsync() { + // When set of nodes contain existing and non-existing nodes + ClusterNode existingNode = CLUSTER.node(0).node(); + Set<ClusterNode> nodes = Set.of(existingNode, nonExistingNode); + + // And prepare communication channels. + InteractiveJobs.initChannels(nodes.stream().map(ClusterNode::name).collect(Collectors.toList())); + + // When broadcast a job + Map<ClusterNode, JobExecution<Object>> executions = compute().broadcastAsync( + nodes, List.of(), InteractiveJobs.interactiveJobName() + ); + + // Then one job is alive + assertThat(executions.size(), is(2)); + new TestingJobExecution<>(executions.get(existingNode)).assertExecuting(); + + // And second job failed + String errorMessageFragment = "None of the specified nodes are present in the cluster: [" + nonExistingNode.name() + "]"; + assertThat(executions.get(nonExistingNode).resultAsync(), willThrow(NodeNotFoundException.class, errorMessageFragment)); + + // Cleanup + InteractiveJobs.all().finish(); + } + + protected abstract IgniteCompute compute(); + + private TestingJobExecution<String> executeGlobalInteractiveJob(Set<ClusterNode> nodes) { + return new TestingJobExecution<>(compute().executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name())); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java new file mode 100644 index 0000000000..efb76aa34c --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedComputeErrorsTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import org.apache.ignite.compute.IgniteCompute; + +class ItEmbeddedComputeErrorsTest extends ItComputeErrorsBaseTest { + @Override + protected IgniteCompute compute() { + return CLUSTER.node(0).compute(); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java new file mode 100644 index 0000000000..969cb83bf8 --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.compute.utils.InteractiveJobs; +import org.apache.ignite.internal.compute.utils.TestingJobExecution; +import org.apache.ignite.network.ClusterNode; +import org.junit.jupiter.api.Test; + +/** + * Tests that the failover fails when candidate node is not in the cluster when it is selected for the failover. + */ +public class ItFailoverCandidateNotFoundTest extends ClusterPerTestIntegrationTest { + @Override + protected int initialNodes() { + return 5; + } + + /** + * Make a CMG from non-worker nodes. We wont lose the leader in tests then. + */ + @Override + protected int[] cmgMetastoreNodes() { + return new int[]{0, 3, 4}; + } + + @Test + void thinClientFailoverCandidateLeavesCluster() throws Exception { + failoverCandidateLeavesCluster(node(0).compute()); + } + + @Test + void embeddedFailoverCandidateLeavesCluster() throws Exception { + String address = "127.0.0.1:" + node(0).clientAddress().port(); + try (IgniteClient client = IgniteClient.builder().addresses(address).build()) { + failoverCandidateLeavesCluster(client.compute()); + } + } + + private void failoverCandidateLeavesCluster(IgniteCompute compute) throws Exception { + // Given remote candidates to execute a job. + Set<ClusterNode> remoteWorkerCandidates = Set.of(node(1).node(), node(2).node()); + Set<String> remoteWorkerCandidateNames = remoteWorkerCandidates.stream() + .map(ClusterNode::name) + .collect(Collectors.toCollection(HashSet::new)); + + // When execute job. + TestingJobExecution<String> execution = executeGlobalInteractiveJob(compute, remoteWorkerCandidates); + + // Then one of candidates became a worker and run the job. + String workerNodeName = InteractiveJobs.globalJob().currentWorkerName(); + // And job is running. + InteractiveJobs.globalJob().assertAlive(); + // And. + execution.assertExecuting(); + + // Remove worker node from candidates, leaving other node. + remoteWorkerCandidateNames.remove(workerNodeName); + assertThat(remoteWorkerCandidateNames.size(), is(1)); + + // Stop non-worker candidate node. + String failoverCandidateNodeName = remoteWorkerCandidateNames.stream().findFirst().orElseThrow(); + stopNode(failoverCandidateNodeName); + + // When stop worker node. + stopNode(workerNodeName); + + // Then the job is failed, because there are no more failover workers. + execution.assertFailed(); + } + + private static TestingJobExecution<String> executeGlobalInteractiveJob(IgniteCompute compute, Set<ClusterNode> nodes) { + return new TestingJobExecution<>(compute.executeAsync(nodes, List.of(), InteractiveJobs.globalJob().name())); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java new file mode 100644 index 0000000000..4a2980c41e --- /dev/null +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientComputeErrorsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.compute.IgniteCompute; +import org.junit.jupiter.api.AfterEach; + +class ItThinClientComputeErrorsTest extends ItComputeErrorsBaseTest { + private final Map<String, IgniteClient> clients = new HashMap<>(); + + @AfterEach + void cleanup() throws Exception { + for (IgniteClient igniteClient : clients.values()) { + igniteClient.close(); + } + clients.clear(); + } + + @Override + protected IgniteCompute compute() { + String address = "127.0.0.1:" + CLUSTER.node(0).clientAddress().port(); + IgniteClient client = IgniteClient.builder().addresses(address).build(); + clients.put(address, client); + return client.compute(); + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java index e55376b0c0..463634860c 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java @@ -123,19 +123,27 @@ public final class InteractiveJobs { /** * Signals that are sent by test code to the jobs. */ - private enum Signal { + public enum Signal { /** * Signal to the job to continue running and send ACK as a response. */ CONTINUE, + /** * Ask job to throw an exception. */ THROW, + /** * Ask job to return result. */ RETURN, + + /** + * Ask job to complete and return worker name. + */ + RETURN_WORKER_NAME, + /** * Signal to the job to continue running and send current worker name to the response channel. */ @@ -158,6 +166,8 @@ public final class InteractiveJobs { public String execute(JobExecutionContext context, Object... args) { RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet(); + offerArgsAsSignals(args); + try { while (true) { Signal receivedSignal = listenSignal(); @@ -169,6 +179,8 @@ public final class InteractiveJobs { break; case RETURN: return "Done"; + case RETURN_WORKER_NAME: + return context.ignite().name(); case GET_WORKER_NAME: GLOBAL_CHANNEL.add(context.ignite().name()); break; @@ -180,6 +192,24 @@ public final class InteractiveJobs { RUNNING_INTERACTIVE_JOBS_CNT.decrementAndGet(); } } + + /** + * If any of the args are strings, convert them to signals and offer them to the job. + * + * @param args Job args. + */ + private static void offerArgsAsSignals(Object[] args) { + for (Object arg : args) { + if (arg instanceof String) { + String signal = (String) arg; + try { + GLOBAL_SIGNALS.offer(Signal.valueOf(signal)); + } catch (IllegalArgumentException ignored) { + // Ignore non-signal strings + } + } + } + } } /** diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java index ad01f19dd4..14bb35ef44 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java @@ -153,7 +153,12 @@ class ComputeJobFailover<T> { return; } - executor.execute(() -> nextWorkerSelector.next() + LOG.info("Worker node {} has left the cluster.", leftNode.name()); + executor.execute(this::selectNewWorker); + } + + private void selectNewWorker() { + nextWorkerSelector.next() .thenAccept(nextWorker -> { if (nextWorker == null) { LOG.warn("No more worker nodes to restart the job. Failing the job {}.", jobContext.jobClassName()); @@ -165,15 +170,19 @@ class ComputeJobFailover<T> { return; } - LOG.warn( - "Worker node {} has left the cluster. Restarting the job {} on node {}.", - leftNode, jobContext.jobClassName(), nextWorker - ); + if (topologyService.getByConsistentId(nextWorker.name()) == null) { + LOG.warn("Worker node {} is not found in the cluster", nextWorker.name()); + // Restart next worker selection + executor.execute(this::selectNewWorker); + return; + } + + LOG.info("Restarting the job {} on node {}.", jobContext.jobClassName(), nextWorker.name()); runningWorkerNode.set(nextWorker); JobExecution<T> jobExecution = launchJobOn(runningWorkerNode.get()); jobContext.updateJobExecution(jobExecution); - })); + }); } } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java new file mode 100644 index 0000000000..d80dd2aa32 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailedExecution.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static java.util.concurrent.CompletableFuture.failedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobStatus; +import org.jetbrains.annotations.Nullable; + +/** + * Job execution implementation which will return failed future with specified error from all methods. + * + * @param <R> Job result type. + */ +public class FailedExecution<R> implements JobExecution<R> { + + private final Throwable error; + + FailedExecution(Throwable error) { + this.error = error; + } + + @Override + public CompletableFuture<R> resultAsync() { + return failedFuture(error); + } + + @Override + public CompletableFuture<@Nullable JobStatus> statusAsync() { + return failedFuture(error); + } + + @Override + public CompletableFuture<@Nullable Boolean> cancelAsync() { + return failedFuture(error); + } + + @Override + public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) { + return failedFuture(error); + } +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index e7d6c43c03..1eb24e223f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -38,12 +38,14 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.compute.NodeNotFoundException; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -120,7 +122,17 @@ public class IgniteComputeImpl implements IgniteComputeInternal { JobExecutionOptions options, Object... args ) { - Set<ClusterNode> candidates = new HashSet<>(nodes); + Set<ClusterNode> candidates = new HashSet<>(); + for (ClusterNode node : nodes) { + if (topologyService.getByConsistentId(node.name()) != null) { + candidates.add(node); + } + } + if (candidates.isEmpty()) { + Set<String> nodeNames = nodes.stream().map(ClusterNode::name).collect(Collectors.toSet()); + return new FailedExecution<>(new NodeNotFoundException(nodeNames)); + } + ClusterNode targetNode = randomNode(candidates); candidates.remove(targetNode); @@ -348,8 +360,13 @@ public class IgniteComputeImpl implements IgniteComputeInternal { .collect(toUnmodifiableMap(identity(), // No failover nodes for broadcast. We use failover here in order to complete futures with exceptions // if worker node has left the cluster. - node -> new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node, - CompletableFutures::nullCompletedFuture, units, jobClassName, options, args)))); + node -> { + if (topologyService.getByConsistentId(node.name()) == null) { + return new FailedExecution<>(new NodeNotFoundException(Set.of(node.name()))); + } + return new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node, + CompletableFutures::nullCompletedFuture, units, jobClassName, options, args)); + })); } @Override diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java index a5ade1709a..6f0156914e 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java @@ -98,6 +98,8 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { @BeforeEach void setupMocks() { lenient().when(topologyService.localMember()).thenReturn(localNode); + lenient().when(topologyService.getByConsistentId(localNode.name())).thenReturn(localNode); + lenient().when(topologyService.getByConsistentId(remoteNode.name())).thenReturn(remoteNode); } @Test diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h index b5e661512c..75c19a2910 100644 --- a/modules/platforms/cpp/ignite/common/error_codes.h +++ b/modules/platforms/cpp/ignite/common/error_codes.h @@ -189,6 +189,7 @@ enum class code : underlying_t { CHANGE_JOB_PRIORITY_JOB_EXECUTING = 0x10000b, PRIMARY_REPLICA_RESOLVE = 0x10000c, CHANGE_JOB_PRIORITY = 0x10000d, + NODE_NOT_FOUND = 0x10000e, // Catalog group. Group code: 17 VALIDATION = 0x110001, diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp b/modules/platforms/cpp/ignite/odbc/common_types.cpp index 95c944753f..6a8a602db9 100644 --- a/modules/platforms/cpp/ignite/odbc/common_types.cpp +++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp @@ -276,6 +276,7 @@ sql_state error_code_to_sql_state(error::code code) { case error::code::PRIMARY_REPLICA_RESOLVE: case error::code::CHANGE_JOB_PRIORITY_JOB_EXECUTING: case error::code::CHANGE_JOB_PRIORITY: + case error::code::NODE_NOT_FOUND: return sql_state::SHY000_GENERAL_ERROR; // Catalog group. Group code: 17 diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp b/modules/platforms/cpp/tests/client-test/compute_test.cpp index 5a1e6f6bd5..879275172d 100644 --- a/modules/platforms/cpp/tests/client-test/compute_test.cpp +++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp @@ -183,7 +183,7 @@ TEST_F(compute_test, job_error_propagates_to_client) { ignite_error); } -TEST_F(compute_test, unknown_node_throws) { +TEST_F(compute_test, unknown_node_execute_throws) { auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234}); EXPECT_THROW( @@ -191,7 +191,23 @@ TEST_F(compute_test, unknown_node_throws) { try { m_client.get_compute().execute({unknown_node}, {}, ECHO_JOB, {"unused"}); } catch (const ignite_error &e) { - EXPECT_THAT(e.what_str(), testing::HasSubstr("Specified node is not present in the cluster: random")); + EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); + throw; + } + }, + ignite_error); +} + +//TODO https://issues.apache.org/jira/browse/IGNITE-21553 +TEST_F(compute_test, DISABLED_unknown_node_broadcast_throws) { + auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234}); + + EXPECT_THROW( + { + try { + m_client.get_compute().broadcast({unknown_node}, {}, ECHO_JOB, {"unused"}); + } catch (const ignite_error &e) { + EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); throw; } }, diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index 5b534d95bb..6b90338351 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -179,14 +179,29 @@ namespace Apache.Ignite.Tests.Compute } [Test] - public void TestUnknownNodeThrows() + public void TestUnknownNodeExecuteAsyncThrows() { var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0)); - var ex = Assert.ThrowsAsync<IgniteException>(async () => + var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () => await Client.Compute.ExecuteAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused")); - StringAssert.Contains("Specified node is not present in the cluster: y", ex!.Message); + StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message); + Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code); + } + + [Test] + public void TestUnknownNodeBroadcastAsyncThrows() + { + var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0)); + + IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = + Client.Compute.BroadcastAsync<string>(new[] { unknownNode }, Units, EchoJob, "unused"); + + var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () => await taskMap[unknownNode]); + + StringAssert.Contains("None of the specified nodes are present in the cluster: [y]", ex!.Message); + Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code); } [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs index 1036d86955..23f59b6f8b 100644 --- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs +++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs @@ -510,6 +510,9 @@ namespace Apache.Ignite /// <summary> ChangeJobPriority error. </summary> public const int ChangeJobPriority = (GroupCode << 16) | (13 & 0xFFFF); + + /// <summary> NodeNotFound error. </summary> + public const int NodeNotFound = (GroupCode << 16) | (14 & 0xFFFF); } /// <summary> Catalog errors. </summary>