This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e2594a72ee7 IGNITE-26331 Introduce compute execution context (#6508)
e2594a72ee7 is described below
commit e2594a72ee70d4e40d91d25c7028b7e31fec4ea9
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Aug 29 12:43:05 2025 +0300
IGNITE-26331 Introduce compute execution context (#6508)
---
.../ClientComputeExecuteColocatedRequest.java | 17 +--
.../compute/ClientComputeExecuteRequest.java | 3 +-
.../apache/ignite/client/fakes/FakeCompute.java | 31 +++---
.../ignite/internal/compute/ComputeComponent.java | 36 ++-----
.../internal/compute/ComputeComponentImpl.java | 56 +++-------
.../internal/compute/ComputeJobFailover.java | 38 ++-----
.../ignite/internal/compute/ExecutionContext.java | 118 +++++++++++++++++++++
.../ignite/internal/compute/IgniteComputeImpl.java | 108 ++++++-------------
.../internal/compute/IgniteComputeInternal.java | 24 +----
.../apache/ignite/internal/compute/JobStarter.java | 18 +---
.../internal/compute/RemoteExecutionContext.java | 72 -------------
.../compute/messaging/ComputeMessaging.java | 29 ++---
.../internal/compute/ComputeComponentImplTest.java | 23 ++--
.../internal/compute/IgniteComputeImplTest.java | 34 +++---
14 files changed, 260 insertions(+), 347 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index a4f6926df47..f2bb27b1e57 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
@@ -31,10 +32,10 @@ import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
+import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
@@ -71,7 +72,7 @@ public class ClientComputeExecuteColocatedRequest {
BitSet noValueSet = in.unpackBitSet();
byte[] tupleBytes = in.readBinary();
- Job job = ClientComputeJobUnpacker.unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+ Job job = unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
unpackTaskId(in, clientContext.hasFeature(COMPUTE_TASK_ID)); //
Placeholder for a possible future usage
return readTableAsync(tableId, tables).thenCompose(table ->
readTuple(schemaId, noValueSet, tupleBytes, table, true)
@@ -84,11 +85,13 @@ public class ClientComputeExecuteColocatedRequest {
CompletableFuture<JobExecution<ComputeJobDataHolder>>
jobExecutionFut = compute.submitColocatedInternal(
table,
keyTuple,
- job.deploymentUnits(),
- job.jobClassName(),
- job.options(),
- metadataBuilder,
- job.arg(),
+ new ExecutionContext(
+ job.options(),
+ job.deploymentUnits(),
+ job.jobClassName(),
+ metadataBuilder,
+ job.arg()
+ ),
null
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 8c5eb87bb76..02f496299b3 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
+import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
@@ -86,7 +87,7 @@ public class ClientComputeExecuteRequest {
.clientAddress(clientContext.remoteAddress().toString());
CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut =
compute.executeAsyncWithFailover(
- candidates, job.deploymentUnits(), job.jobClassName(),
job.options(), metadataBuilder, job.arg(), null
+ candidates, new ExecutionContext(job.options(),
job.deploymentUnits(), job.jobClassName(), metadataBuilder, job.arg()), null
);
sendResultAndState(executionFut, notificationSender);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 934085e4736..8368b46b091 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -57,6 +57,7 @@ import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.HybridTimestampProvider;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -107,14 +108,12 @@ public class FakeCompute implements IgniteComputeInternal
{
@Override
public CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeAsyncWithFailover(
Set<ClusterNode> nodes,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
- @Nullable CancellationToken cancellationToken) {
+ ExecutionContext executionContext,
+ @Nullable CancellationToken cancellationToken
+ ) {
+ String jobClassName = executionContext.jobClassName();
if (Objects.equals(jobClassName, GET_UNITS)) {
- String unitString =
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
+ String unitString =
executionContext.units().stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
return completedExecution(unitString);
}
@@ -135,7 +134,7 @@ public class FakeCompute implements IgniteComputeInternal {
ComputeJob<Object, Object> job =
ComputeUtils.instantiateJob(jobClass);
CompletableFuture<Object> jobFut = job.executeAsync(
new JobExecutionContextImpl(ignite, new AtomicBoolean(),
jobClassLoader, null),
- SharedComputeUtils.unmarshalArgOrResult(arg, null, null));
+
SharedComputeUtils.unmarshalArgOrResult(executionContext.arg(), null, null));
return jobExecution(jobFut != null ? jobFut :
nullCompletedFuture());
}
@@ -154,11 +153,7 @@ public class FakeCompute implements IgniteComputeInternal {
public CompletableFuture<JobExecution<ComputeJobDataHolder>>
submitColocatedInternal(
TableViewInternal table,
Tuple key,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- ComputeJobDataHolder args,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
return jobExecution(future != null
@@ -202,11 +197,11 @@ public class FakeCompute implements IgniteComputeInternal
{
return executeAsyncWithFailover(
nodes,
- descriptor.units(),
- descriptor.jobClassName(),
- descriptor.options(),
- ComputeEventMetadata.builder(),
- SharedComputeUtils.marshalArgOrResult(arg, null,
observableTimestamp.longValue()),
+ new ExecutionContext(
+ descriptor,
+ ComputeEventMetadata.builder(),
+ SharedComputeUtils.marshalArgOrResult(arg, null,
observableTimestamp.longValue())
+ ),
cancellationToken
).thenApply(internalExecution -> unmarshalingExecution(descriptor,
internalExecution));
} else if (target instanceof ColocatedJobTarget) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 5f8091c5bb3..a8f4da83a62 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -38,20 +38,12 @@ public interface ComputeComponent extends IgniteComponent {
/**
* Executes a job of the given class on the current node.
*
- * @param options Job execution options.
- * @param units Deployment units which will be loaded for execution.
- * @param jobClassName Name of the job class.
- * @param metadataBuilder Event metadata builder.
- * @param arg Job argument.
+ * @param executionContext Execution context.
* @param cancellationToken Cancellation token or {@code null}.
* @return Future of the job execution object which will be completed when
the job is submitted.
*/
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executeLocally(
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
);
@@ -59,21 +51,13 @@ public interface ComputeComponent extends IgniteComponent {
* Executes a job of the given class on a remote node.
*
* @param remoteNode Remote node name.
- * @param options Job execution options.
- * @param units Deployment units which will be loaded for execution.
- * @param jobClassName Name of the job class.
- * @param metadataBuilder Event metadata builder.
- * @param arg Job argument.
+ * @param executionContext Execution context.
* @param cancellationToken Cancellation token or {@code null}.
* @return Future of the job execution object which will be completed when
the job is submitted.
*/
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executeRemotely(
ClusterNode remoteNode,
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
);
@@ -83,22 +67,14 @@ public interface ComputeComponent extends IgniteComponent {
*
* @param remoteNode Remote node name.
* @param nextWorkerSelector The selector that returns the next worker to
execute job on.
- * @param options Job execution options.
- * @param units Deployment units which will be loaded for execution.
- * @param jobClassName Name of the job class.
- * @param metadataBuilder Event metadata builder.
- * @param arg Job argument.
+ * @param executionContext Execution context.
* @param cancellationToken Cancellation token or {@code null}.
* @return Future of the job execution object which will be completed when
the job is submitted.
*/
CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeRemotelyWithFailover(
ClusterNode remoteNode,
NextWorkerSelector nextWorkerSelector,
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 635beab11e8..47feca02b11 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -126,11 +126,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
@Override
public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executeLocally(
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
if (!busyLock.enterBusy()) {
@@ -138,14 +134,12 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
}
try {
- CompletableFuture<JobContext> classLoaderFut =
jobContextManager.acquireClassLoader(units);
+ CompletableFuture<JobContext> classLoaderFut =
jobContextManager.acquireClassLoader(executionContext.units());
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
future =
- mapClassLoaderExceptions(classLoaderFut, jobClassName)
+ mapClassLoaderExceptions(classLoaderFut,
executionContext.jobClassName())
.thenApply(context -> {
- JobExecutionInternal<ComputeJobDataHolder>
execution = execJob(
- context, options, jobClassName,
metadataBuilder, arg
- );
+ JobExecutionInternal<ComputeJobDataHolder>
execution = execJob(context, executionContext);
execution.resultAsync().whenComplete((result,
e) -> context.close());
inFlightFutures.registerFuture(execution.resultAsync());
@@ -222,11 +216,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
@Override
public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executeRemotely(
ClusterNode remoteNode,
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
if (!busyLock.enterBusy()) {
@@ -234,14 +224,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
}
try {
- CompletableFuture<UUID> jobIdFuture =
messaging.remoteExecuteRequestAsync(
- remoteNode,
- options,
- units,
- jobClassName,
- metadataBuilder,
- arg
- );
+ CompletableFuture<UUID> jobIdFuture =
messaging.remoteExecuteRequestAsync(remoteNode, executionContext);
inFlightFutures.registerFuture(jobIdFuture);
@@ -267,16 +250,12 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
public CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeRemotelyWithFailover(
ClusterNode remoteNode,
NextWorkerSelector nextWorkerSelector,
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
return ComputeJobFailover.failSafeExecute(
this, logicalTopologyService, topologyService,
failoverExecutor, eventLog,
- remoteNode, nextWorkerSelector, options, units,
jobClassName, metadataBuilder, arg
+ remoteNode, nextWorkerSelector, executionContext
)
.thenApply(execution -> {
// Do not add cancel action to the underlying jobs, let
the FailSafeJobExecution handle it.
@@ -329,8 +308,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
executor.start();
- messaging.start((options, units, jobClassName, metadataBuilder, arg) ->
- executeLocally(options, units, jobClassName, metadataBuilder,
arg, null));
+ messaging.start(executionContext -> executeLocally(executionContext,
null));
executionManager.start();
computeViewProvider.init(executionManager);
@@ -356,15 +334,15 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
return nullCompletedFuture();
}
- private JobExecutionInternal<ComputeJobDataHolder> execJob(
- JobContext context,
- ExecutionOptions options,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg
- ) {
+ private JobExecutionInternal<ComputeJobDataHolder> execJob(JobContext
context, ExecutionContext executionContext) {
try {
- return executor.executeJob(options, jobClassName,
context.classLoader(), metadataBuilder, arg);
+ return executor.executeJob(
+ executionContext.options(),
+ executionContext.jobClassName(),
+ context.classLoader(),
+ executionContext.metadataBuilder(),
+ executionContext.arg()
+ );
} catch (Throwable e) {
context.close();
throw e;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index 2646411ea8c..293667baa50 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -17,18 +17,15 @@
package org.apache.ignite.internal.compute;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.events.ComputeEventsFactory;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -37,7 +34,6 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.lang.ErrorGroups.Compute;
import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.Nullable;
/**
* This is a helper class for {@link ComputeComponent} to handle job failures.
You can think about this class as a "retryable compute job
@@ -87,7 +83,7 @@ class ComputeJobFailover {
/**
* Context of the called job. Captures deployment units, jobClassName and
arguments.
*/
- private final RemoteExecutionContext jobContext;
+ private final ExecutionContext jobContext;
/**
* Job id of the execution.
@@ -120,11 +116,7 @@ class ComputeJobFailover {
EventLog eventLog,
ClusterNode workerNode,
NextWorkerSelector nextWorkerSelector,
- ExecutionOptions executionOptions,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg
+ ExecutionContext executionContext
) {
this.computeComponent = computeComponent;
this.logicalTopologyService = logicalTopologyService;
@@ -135,9 +127,9 @@ class ComputeJobFailover {
this.nextWorkerSelector = nextWorkerSelector;
// Assign failover job id so that it is consistent for any remote job.
- metadataBuilder.jobId(jobId);
+ executionContext.metadataBuilder().jobId(jobId);
- this.jobContext = new RemoteExecutionContext(executionOptions, units,
jobClassName, metadataBuilder, arg);
+ this.jobContext = executionContext;
}
static CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
failSafeExecute(
@@ -148,11 +140,7 @@ class ComputeJobFailover {
EventLog eventLog,
ClusterNode workerNode,
NextWorkerSelector nextWorkerSelector,
- ExecutionOptions executionOptions,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg
+ ExecutionContext executionContext
) {
return new ComputeJobFailover(
computeComponent,
@@ -162,11 +150,7 @@ class ComputeJobFailover {
eventLog,
workerNode,
nextWorkerSelector,
- executionOptions,
- units,
- jobClassName,
- metadataBuilder,
- arg
+ executionContext
).execute();
}
@@ -191,15 +175,9 @@ class ComputeJobFailover {
private CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
launchJobOn(ClusterNode runningWorkerNode) {
if
(runningWorkerNode.name().equals(topologyService.localMember().name())) {
- return computeComponent.executeLocally(
- jobContext.executionOptions(), jobContext.units(),
jobContext.jobClassName(),
- jobContext.metadataBuilder(), jobContext.arg(), null
- );
+ return computeComponent.executeLocally(jobContext, null);
} else {
- return computeComponent.executeRemotely(
- runningWorkerNode, jobContext.executionOptions(),
jobContext.units(), jobContext.jobClassName(),
- jobContext.metadataBuilder(), jobContext.arg(), null
- );
+ return computeComponent.executeRemotely(runningWorkerNode,
jobContext, null);
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
new file mode 100644
index 00000000000..5a97a13d26d
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.List;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Captures the context of a job execution.
+ */
+public class ExecutionContext {
+ private final ExecutionOptions options;
+
+ private final List<DeploymentUnit> units;
+
+ private final String jobClassName;
+
+ private final ComputeEventMetadataBuilder metadataBuilder;
+
+ private final ComputeJobDataHolder arg;
+
+ /**
+ * Creates new execution context.
+ *
+ * @param options Job execution options.
+ * @param units Deployment units which will be loaded for execution.
+ * @param jobClassName Name of the job class.
+ * @param metadataBuilder Event metadata builder.
+ * @param arg Job argument.
+ */
+ public ExecutionContext(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ ComputeEventMetadataBuilder metadataBuilder,
+ @Nullable ComputeJobDataHolder arg
+ ) {
+ this.options = options;
+ this.units = units;
+ this.jobClassName = jobClassName;
+ this.metadataBuilder = metadataBuilder;
+ this.arg = arg;
+ }
+
+ /**
+ * Creates new execution context.
+ *
+ * @param jobExecutionOptions Job execution options.
+ * @param units Deployment units which will be loaded for execution.
+ * @param jobClassName Name of the job class.
+ * @param metadataBuilder Event metadata builder.
+ * @param arg Job argument.
+ */
+ public ExecutionContext(
+ JobExecutionOptions jobExecutionOptions,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ ComputeEventMetadataBuilder metadataBuilder,
+ @Nullable ComputeJobDataHolder arg
+ ) {
+ this(ExecutionOptions.from(jobExecutionOptions), units, jobClassName,
metadataBuilder, arg);
+ }
+
+ /**
+ * Creates new execution context. Takes execution options, deployment
units and job class name from a job descriptor.
+ *
+ * @param descriptor Job descriptor.
+ * @param metadataBuilder Event metadata builder.
+ * @param arg Job argument.
+ */
+ public <T, R> ExecutionContext(
+ JobDescriptor<T, R> descriptor,
+ ComputeEventMetadataBuilder metadataBuilder,
+ @Nullable ComputeJobDataHolder arg
+ ) {
+ this(descriptor.options(), descriptor.units(),
descriptor.jobClassName(), metadataBuilder, arg);
+ }
+
+ public ExecutionOptions options() {
+ return options;
+ }
+
+ public List<DeploymentUnit> units() {
+ return units;
+ }
+
+ public String jobClassName() {
+ return jobClassName;
+ }
+
+ public ComputeEventMetadataBuilder metadataBuilder() {
+ return metadataBuilder;
+ }
+
+ @Nullable
+ public ComputeJobDataHolder arg() {
+ return arg;
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index cdb08ae8949..a54f82f6de0 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -163,15 +163,13 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
Objects.requireNonNull(descriptor);
ComputeJobDataHolder argHolder =
SharedComputeUtils.marshalArgOrResult(arg, descriptor.argumentMarshaller());
+ ExecutionContext executionContext = new ExecutionContext(descriptor,
metadataBuilder, argHolder);
if (target instanceof AnyNodeJobTarget) {
Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
return unmarshalResult(
- executeAsyncWithFailover(
- nodes, descriptor.units(),
descriptor.jobClassName(), descriptor.options(), metadataBuilder,
- argHolder, cancellationToken
- ),
+ executeAsyncWithFailover(nodes, executionContext,
cancellationToken),
descriptor,
observableTimestampTracker
);
@@ -183,7 +181,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
QualifiedName tableName = colocatedTarget.tableName();
Object key = colocatedTarget.key();
- metadataBuilder.tableName(tableName.toCanonicalForm());
+
executionContext.metadataBuilder().tableName(tableName.toCanonicalForm());
CompletableFuture<JobExecution<ComputeJobDataHolder>> jobFut;
if (mapper != null) {
@@ -200,11 +198,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
key,
mapper
),
- descriptor.units(),
- descriptor.jobClassName(),
- descriptor.options(),
- metadataBuilder,
- argHolder,
+ executionContext,
cancellationToken
)));
@@ -213,11 +207,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
.thenCompose(table -> submitColocatedInternal(
table,
(Tuple) key,
- descriptor.units(),
- descriptor.jobClassName(),
- descriptor.options(),
- metadataBuilder,
- argHolder,
+ executionContext,
cancellationToken
));
}
@@ -360,11 +350,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
executeOnOneNodeWithFailover(
node,
nextWorkerSelector,
- descriptor.units(),
- descriptor.jobClassName(),
- options,
- metadataBuilder,
- argHolder,
+ new ExecutionContext(options, descriptor.units(),
descriptor.jobClassName(), metadataBuilder, argHolder),
cancellationToken
),
descriptor,
@@ -408,11 +394,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
@Override
public CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeAsyncWithFailover(
Set<ClusterNode> nodes,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
Set<ClusterNode> candidates = new HashSet<>();
@@ -431,16 +413,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
NextWorkerSelector selector = new DeqNextWorkerSelector(new
ConcurrentLinkedDeque<>(candidates));
- return executeOnOneNodeWithFailover(
- targetNode,
- selector,
- units,
- jobClassName,
- options,
- metadataBuilder,
- arg,
- cancellationToken
- );
+ return executeOnOneNodeWithFailover(targetNode, selector,
executionContext, cancellationToken);
}
private static ClusterNode randomNode(Set<ClusterNode> nodes) {
@@ -457,41 +430,26 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
private CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeOnOneNodeWithFailover(
ClusterNode targetNode,
NextWorkerSelector nextWorkerSelector,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions jobExecutionOptions,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
- ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
-
- return executeOnOneNodeWithFailover(
- targetNode, nextWorkerSelector, units, jobClassName, options,
metadataBuilder,
- arg, cancellationToken
- );
+ return convertToComputeFuture(
+ executeOnOneNodeWithFailoverInternal(targetNode,
nextWorkerSelector, executionContext, cancellationToken)
+ ).thenApply(JobExecutionWrapper::new);
}
- private CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeOnOneNodeWithFailover(
+ private CompletableFuture<? extends JobExecution<ComputeJobDataHolder>>
executeOnOneNodeWithFailoverInternal(
ClusterNode targetNode,
NextWorkerSelector nextWorkerSelector,
- List<DeploymentUnit> units,
- String jobClassName,
- ExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
- metadataBuilder.initiatorNode(nodeName);
+ executionContext.metadataBuilder().initiatorNode(nodeName);
if (isLocal(targetNode)) {
- return convertToComputeFuture(computeComponent.executeLocally(
- options, units, jobClassName, metadataBuilder, arg,
cancellationToken
- )).thenApply(JobExecutionWrapper::new);
+ return computeComponent.executeLocally(executionContext,
cancellationToken);
} else {
- return
convertToComputeFuture(computeComponent.executeRemotelyWithFailover(
- targetNode, nextWorkerSelector, options, units,
jobClassName, metadataBuilder, arg, cancellationToken
- )).thenApply(JobExecutionWrapper::new);
+ return computeComponent.executeRemotelyWithFailover(targetNode,
nextWorkerSelector, executionContext, cancellationToken);
}
}
@@ -520,18 +478,15 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
public CompletableFuture<JobExecution<ComputeJobDataHolder>>
submitColocatedInternal(
TableViewInternal table,
Tuple key,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
return primaryReplicaForPartitionByTupleKey(table, key)
.thenCompose(primaryNode -> executeOnOneNodeWithFailover(
primaryNode,
new NextColocatedWorkerSelector<>(placementDriver,
topologyService, clock, nodeProperties, table, key),
- units, jobClassName, options, metadataBuilder, arg,
cancellationToken
+ executionContext,
+ cancellationToken
));
}
@@ -560,7 +515,8 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
new PartitionNextWorkerSelector(
placementDriver, topologyService, clock,
nodeProperties,
table.zoneId(), table.tableId(), partition),
- units, jobClassName, options, metadataBuilder, arg,
cancellationToken
+ new ExecutionContext(options, units, jobClassName,
metadataBuilder, arg),
+ cancellationToken
));
}
@@ -709,23 +665,25 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
byte[] payload,
ClusterNode node,
List<DeploymentUnit> deploymentUnits,
- ReceiverExecutionOptions options) {
- JobExecutionOptions jobOptions = JobExecutionOptions.builder()
+ ReceiverExecutionOptions options
+ ) {
+ ExecutionOptions jobOptions = ExecutionOptions.builder()
.priority(options.priority())
.maxRetries(options.maxRetries())
.executorType(options.executorType())
.build();
- // Use Compute to execute receiver on the target node with failover,
class loading, scheduling.
- return executeAsyncWithFailover(
- Set.of(node),
+ ExecutionContext executionContext = new ExecutionContext(
+ jobOptions,
deploymentUnits,
getReceiverJobClassName(options.executorType()),
- jobOptions,
ComputeEventMetadata.builder(Type.DATA_RECEIVER),
- SharedComputeUtils.marshalArgOrResult(payload, null),
- null
- ).thenCompose(JobExecution::resultAsync)
+ SharedComputeUtils.marshalArgOrResult(payload, null)
+ );
+
+ // Use Compute to execute receiver on the target node with failover,
class loading, scheduling.
+ return executeAsyncWithFailover(Set.of(node), executionContext, null)
+ .thenCompose(JobExecution::resultAsync)
.handle((res, err) -> {
if (err != null) {
if (err.getCause() instanceof ComputeException) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 5702442d123..868654c19dc 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -47,21 +47,13 @@ public interface IgniteComputeInternal extends
IgniteCompute {
* candidate nodes.
*
* @param nodes Candidate nodes; In case target node left the cluster, the
job will be restarted on one of them.
- * @param units Deployment units. Can be empty.
- * @param jobClassName Name of the job class to execute.
- * @param options Job execution options.
- * @param metadataBuilder Event metadata builder.
- * @param arg Argument of the job.
+ * @param executionContext Execution context.
* @param cancellationToken Cancellation token or {@code null}.
* @return CompletableFuture Job result.
*/
CompletableFuture<JobExecution<ComputeJobDataHolder>>
executeAsyncWithFailover(
Set<ClusterNode> nodes,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
);
@@ -71,22 +63,14 @@ public interface IgniteComputeInternal extends
IgniteCompute {
*
* @param table Table whose key is used to determine the node to execute
the job on.
* @param key Key that identifies the node to execute the job on.
- * @param units Deployment units. Can be empty.
- * @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
- * @param metadataBuilder Event metadata builder.
- * @param arg Argument of the job.
+ * @param executionContext Execution context.
* @param cancellationToken Cancellation token or {@code null}.
* @return Job execution object.
*/
CompletableFuture<JobExecution<ComputeJobDataHolder>>
submitColocatedInternal(
TableViewInternal table,
Tuple key,
- List<DeploymentUnit> units,
- String jobClassName,
- JobExecutionOptions options,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg,
+ ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
index 554f38c573a..ea1654db06a 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
@@ -17,11 +17,7 @@
package org.apache.ignite.internal.compute;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
-import org.jetbrains.annotations.Nullable;
/**
* Compute job starter interface.
@@ -31,18 +27,8 @@ public interface JobStarter {
/**
* Start compute job.
*
- * @param options Compute job execution options.
- * @param units Deployment units. Can be empty.
- * @param jobClassName Name of the job class to execute.
- * @param metadataBuilder Event metadata builder.
- * @param args Arguments of the job.
+ * @param executionContext Execution context.
* @return Future of the job execution object.
*/
- CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> start(
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder args
- );
+ CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
start(ExecutionContext executionContext);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
deleted file mode 100644
index d1170579fa2..00000000000
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.compute;
-
-import java.util.List;
-import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Captures the context of a remote job execution.
- */
-class RemoteExecutionContext {
- private final ExecutionOptions executionOptions;
-
- private final List<DeploymentUnit> units;
-
- private final String jobClassName;
-
- private final ComputeEventMetadataBuilder metadataBuilder;
-
- private final ComputeJobDataHolder arg;
-
- RemoteExecutionContext(
- ExecutionOptions executionOptions,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder arg
- ) {
- this.executionOptions = executionOptions;
- this.units = units;
- this.jobClassName = jobClassName;
- this.metadataBuilder = metadataBuilder;
- this.arg = arg;
- }
-
- ExecutionOptions executionOptions() {
- return executionOptions;
- }
-
- List<DeploymentUnit> units() {
- return units;
- }
-
- String jobClassName() {
- return jobClassName;
- }
-
- ComputeEventMetadataBuilder metadataBuilder() {
- return metadataBuilder;
- }
-
- @Nullable ComputeJobDataHolder arg() {
- return arg;
- }
-}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index 5ea525a33ab..1e3fc0edd3a 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -46,8 +46,8 @@ import
org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeMessageTypes;
import org.apache.ignite.internal.compute.ComputeMessagesFactory;
import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.ExecutionManager;
-import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobStarter;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
@@ -171,31 +171,20 @@ public class ComputeMessaging {
* Submit Compute job to execution on remote node.
*
* @param remoteNode The job will be executed on this node.
- * @param options Job execution options.
- * @param units Deployment units. Can be empty.
- * @param jobClassName Name of the job class to execute.
- * @param metadataBuilder Event metadata builder.
- * @param input Arguments of the job.
+ * @param executionContext Execution context.
* @return Job id future that will be completed when the job is submitted
on the remote node.
*/
- public CompletableFuture<UUID> remoteExecuteRequestAsync(
- ClusterNode remoteNode,
- ExecutionOptions options,
- List<DeploymentUnit> units,
- String jobClassName,
- ComputeEventMetadataBuilder metadataBuilder,
- @Nullable ComputeJobDataHolder input
- ) {
- List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
+ public CompletableFuture<UUID> remoteExecuteRequestAsync(ClusterNode
remoteNode, ExecutionContext executionContext) {
+ List<DeploymentUnitMsg> deploymentUnitMsgs =
executionContext.units().stream()
.map(ComputeUtils::toDeploymentUnitMsg)
.collect(toList());
ExecuteRequestV2 executeRequest = messagesFactory.executeRequestV2()
- .executeOptions(options)
+ .executeOptions(executionContext.options())
.deploymentUnits(deploymentUnitMsgs)
- .jobClassName(jobClassName)
- .metadataBuilder(metadataBuilder)
- .input(input)
+ .jobClassName(executionContext.jobClassName())
+ .metadataBuilder(executionContext.metadataBuilder())
+ .input(executionContext.arg())
.build();
return invoke(remoteNode, executeRequest)
@@ -209,7 +198,7 @@ public class ComputeMessaging {
? ((ExecuteRequestV2) request).metadataBuilder()
: ComputeEventMetadata.builder();
- starter.start(request.executeOptions(), units, request.jobClassName(),
metadataBuilder, request.input())
+ starter.start(new ExecutionContext(request.executeOptions(), units,
request.jobClassName(), metadataBuilder, request.input()))
.whenComplete((execution, err) -> {
if (err != null) {
sendExecuteResponse(null, err, sender,
correlationId);
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 441a51601b6..3ea5379a0d0 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -220,7 +220,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
CancelHandle cancelHandle = CancelHandle.create();
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executionFut = computeComponent.executeLocally(
- DEFAULT, List.of(), SimpleJob.class.getName(),
ComputeEventMetadata.builder(), null, cancelHandle.token()
+ new ExecutionContext(DEFAULT, List.of(),
SimpleJob.class.getName(), ComputeEventMetadata.builder(), null),
+ cancelHandle.token()
);
assertFalse(infiniteFuture.isDone());
@@ -724,13 +725,16 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
}
private CompletableFuture<String> executeLocally(List<DeploymentUnit>
units, String jobClassName) {
- return computeComponent.executeLocally(DEFAULT, units, jobClassName,
ComputeEventMetadata.builder(), null, null)
- .thenCompose(ComputeComponentImplTest::unwrapResult);
+ return computeComponent.executeLocally(
+ new ExecutionContext(DEFAULT, units, jobClassName,
ComputeEventMetadata.builder(), null),
+ null
+ ).thenCompose(ComputeComponentImplTest::unwrapResult);
}
private JobExecution<ComputeJobDataHolder> executeLocally(String
jobClassName, @Nullable CancellationToken cancellationToken) {
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executionFut = computeComponent.executeLocally(
- DEFAULT, List.of(), jobClassName,
ComputeEventMetadata.builder(), null, cancellationToken
+ new ExecutionContext(DEFAULT, List.of(), jobClassName,
ComputeEventMetadata.builder(), null),
+ cancellationToken
);
assertThat(executionFut, willCompleteSuccessfully());
return executionFut.join();
@@ -742,15 +746,20 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Nullable CancellationToken cancellationToken
) {
CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>>
executionFut = computeComponent.executeRemotely(
- remoteNode, DEFAULT, List.of(), jobClassName,
ComputeEventMetadata.builder(), arg, cancellationToken
+ remoteNode,
+ new ExecutionContext(DEFAULT, List.of(), jobClassName,
ComputeEventMetadata.builder(), arg),
+ cancellationToken
);
assertThat(executionFut, willCompleteSuccessfully());
return executionFut.join();
}
private CompletableFuture<String> executeRemotely(String jobClassName) {
- return computeComponent.executeRemotely(remoteNode, DEFAULT,
List.of(), jobClassName, ComputeEventMetadata.builder(), null, null)
- .thenCompose(ComputeComponentImplTest::unwrapResult);
+ return computeComponent.executeRemotely(
+ remoteNode,
+ new ExecutionContext(DEFAULT, List.of(), jobClassName,
ComputeEventMetadata.builder(), null),
+ null
+ ).thenCompose(ComputeComponentImplTest::unwrapResult);
}
private static CompletableFuture<String>
unwrapResult(JobExecution<ComputeJobDataHolder> execution) {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 788c71fa0cb..7f6f5479f41 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doReturn;
@@ -42,6 +43,7 @@ import static org.mockito.Mockito.when;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
@@ -75,6 +77,7 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatcher;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
@@ -143,8 +146,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
willBe("jobResponse")
);
- verify(computeComponent)
- .executeLocally(eq(ExecutionOptions.DEFAULT),
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull());
+ verifyExecuteLocally(ExecutionOptions.DEFAULT);
assertEquals(jobTimestamp, observableTimestampTracker.get());
}
@@ -185,8 +187,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
willBe("jobResponse")
);
- verify(computeComponent)
- .executeLocally(eq(ExecutionOptions.DEFAULT),
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull());
+ verifyExecuteLocally(ExecutionOptions.DEFAULT);
assertEquals(jobTimestamp, observableTimestampTracker.get());
}
@@ -222,8 +223,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
willBe("jobResponse")
);
- verify(computeComponent)
- .executeLocally(eq(expectedOptions), eq(testDeploymentUnits),
eq(JOB_CLASS_NAME), any(), any(), isNull());
+ verifyExecuteLocally(expectedOptions);
}
@Test
@@ -308,28 +308,38 @@ class IgniteComputeImplTest extends
BaseIgniteAbstractTest {
}
private void respondWhenExecutingSimpleJobLocally(ExecutionOptions
executionOptions) {
- when(computeComponent.executeLocally(eq(executionOptions),
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull()))
+ when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)),
isNull()))
.thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
"jobResponse", null, jobTimestamp.longValue()),
localNode)));
}
private void respondWhenExecutingSimpleJobLocally(ExecutionOptions
executionOptions, CancellationToken token) {
- when(computeComponent.executeLocally(eq(executionOptions),
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), eq(token)))
+ when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)),
eq(token)))
.thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
"jobResponse", null, jobTimestamp.longValue()),
localNode)));
}
private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions
options) {
when(computeComponent.executeRemotelyWithFailover(
- eq(remoteNode), any(), eq(options), eq(testDeploymentUnits),
eq(JOB_CLASS_NAME), any(), any(), isNull()
+ eq(remoteNode), any(), argThat(ctxEq(options)), isNull()
)).thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
"remoteResponse", null, jobTimestamp.longValue()),
remoteNode)));
}
+ private void verifyExecuteLocally(ExecutionOptions options) {
+ verify(computeComponent)
+ .executeLocally(argThat(ctxEq(options)), isNull());
+ }
+
private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) {
- verify(computeComponent).executeRemotelyWithFailover(
- eq(remoteNode), any(), eq(options), eq(testDeploymentUnits),
eq(JOB_CLASS_NAME), any(), any(), isNull()
- );
+ verify(computeComponent)
+ .executeRemotelyWithFailover(eq(remoteNode), any(),
argThat(ctxEq(options)), isNull());
+ }
+
+ private ArgumentMatcher<ExecutionContext> ctxEq(ExecutionOptions options) {
+ return ctx -> Objects.equals(ctx.options(), options)
+ && Objects.equals(ctx.units(), testDeploymentUnits)
+ && Objects.equals(ctx.jobClassName(), JOB_CLASS_NAME);
}
private static <R> CancellableJobExecution<R> completedExecution(@Nullable
R result, ClusterNode node) {