This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 2e33dec0d1 IGNITE-22974 Fix ItThinClientComputeTest.testCancelOnSpecificNodeAsync (#4216) 2e33dec0d1 is described below commit 2e33dec0d1029d7cadc610f543919df22c72849c Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Tue Aug 13 20:24:08 2024 +0300 IGNITE-22974 Fix ItThinClientComputeTest.testCancelOnSpecificNodeAsync (#4216) --- .../app/client/ItAbstractThinClientTest.java | 4 +-- .../runner/app/client/ItThinClientComputeTest.java | 31 +++++++++++++--------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java index afcd0bd403..e5c3ee4c46 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java @@ -79,7 +79,7 @@ public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest { node0Name, "{\n" + " network.port: 3344,\n" - + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n" + " clientConnector.port: 10800,\n" + " rest.port: 10300\n" + " compute.threadPoolSize: 1\n" @@ -90,7 +90,7 @@ public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest { node1Name, "{\n" + " network.port: 3345,\n" - + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n" + " clientConnector.sendServerExceptionStackTraceToClient: true\n" + " clientConnector.metricsEnabled: true\n" + " clientConnector.port: 10801,\n" diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index d152c7aeaa..f45f447015 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -182,12 +182,12 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @ValueSource(booleans = {true, false}) void testCancelOnSpecificNodeAsync(boolean asyncJob) { int sleepMs = 1_000_000; - JobDescriptor sleepJob = JobDescriptor + JobDescriptor<Integer, Void> sleepJob = JobDescriptor .builder(asyncJob ? AsyncSleepJob.class : SleepJob.class) .build(); - JobExecution<String> execution1 = client().compute().submit(JobTarget.node(node(0)), sleepJob, sleepMs); - JobExecution<String> execution2 = client().compute().submit(JobTarget.node(node(1)), sleepJob, sleepMs); + JobExecution<Void> execution1 = client().compute().submit(JobTarget.node(node(0)), sleepJob, sleepMs); + JobExecution<Void> execution2 = client().compute().submit(JobTarget.node(node(1)), sleepJob, sleepMs); await().until(execution1::stateAsync, willBe(jobStateWithStatus(EXECUTING))); await().until(execution2::stateAsync, willBe(jobStateWithStatus(EXECUTING))); @@ -202,19 +202,19 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @Test void changeJobPriority() { int sleepMs = 1_000_000; - JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build(); + JobDescriptor<Integer, Void> sleepJob = JobDescriptor.builder(SleepJob.class).build(); JobTarget target = JobTarget.node(node(0)); // Start 1 task in executor with 1 thread - JobExecution<String> execution1 = client().compute().submit(target, sleepJob, sleepMs); + JobExecution<Void> execution1 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution1::stateAsync, willBe(jobStateWithStatus(EXECUTING))); // Start one more long lasting task - JobExecution<String> execution2 = client().compute().submit(target, sleepJob, sleepMs); + JobExecution<Void> execution2 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution2::stateAsync, willBe(jobStateWithStatus(QUEUED))); // Start third task - JobExecution<String> execution3 = client().compute().submit(target, sleepJob, sleepMs); + JobExecution<Void> execution3 = client().compute().submit(target, sleepJob, sleepMs); await().until(execution3::stateAsync, willBe(jobStateWithStatus(QUEUED))); // Task 2 and 3 are not completed, in queue state @@ -801,7 +801,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { return completedFuture( Arrays.stream(args.split(":")) - .map(o -> o == null ? "null" : o.toString()) + .map(o -> o == null ? "null" : o) .collect(Collectors.joining("_"))); } } @@ -846,9 +846,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class SleepJob implements ComputeJob<Integer, Void> { @Override - public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Integer args) { + public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Integer sleepMs) { try { - Thread.sleep(args); + Thread.sleep(sleepMs); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -859,10 +859,17 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class AsyncSleepJob implements ComputeJob<Integer, Void> { @Override - public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Integer args) { + public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Integer sleepMs) { return CompletableFuture.runAsync(() -> { try { - Thread.sleep(args); + int limit = sleepMs; + while (limit > 0) { + if (context.isCancelled()) { + return; + } + Thread.sleep(100); + limit -= 100; + } } catch (InterruptedException e) { throw new RuntimeException(e); }