This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 22da0aa7c7e IGNITE-26413 Add DeploymentUnitInfo to JobExecutionContext
(#6600)
22da0aa7c7e is described below
commit 22da0aa7c7ebe13e9ff95ac35af3b9ccbfb23163
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Sep 18 10:59:38 2025 +0300
IGNITE-26413 Add DeploymentUnitInfo to JobExecutionContext (#6600)
* Add `DeploymentUnitInfo` class with unit name, version, and path
* Add `JobExecutionContext.deploymentUnits` method to allow compute jobs to
access a collection of `DeploymentUnitInfo` associated with their execution
* Use new API in platform executor
---
.../apache/ignite/compute/JobExecutionContext.java | 9 +++
.../ignite/deployment/DeploymentUnitInfo.java | 80 ++++++++++++++++++++++
.../handler/ClientInboundMessageHandler.java | 34 ++++++---
.../internal/compute/ItComputeTestStandalone.java | 27 ++++++++
.../internal/compute/DeploymentUnitContentJob.java | 46 +++++++++++++
.../internal/compute/DeploymentUnitInfoJob.java | 37 ++++++++++
.../internal/compute/JobExecutionContextImpl.java | 26 +++++++
.../compute/executor/ComputeExecutorImpl.java | 18 +----
.../platform/PlatformComputeConnection.java | 5 +-
.../platform/dotnet/DotNetComputeExecutor.java | 7 +-
10 files changed, 256 insertions(+), 33 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 5f271b2861c..5f275ea15ac 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -17,7 +17,9 @@
package org.apache.ignite.compute;
+import java.util.Collection;
import org.apache.ignite.Ignite;
+import org.apache.ignite.deployment.DeploymentUnitInfo;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
@@ -46,4 +48,11 @@ public interface JobExecutionContext {
* @return Partition associated with this job.
*/
@Nullable Partition partition();
+
+ /**
+ * Collection of deployment units associated with this job execution.
+ *
+ * @return Collection of deployment units for this job execution.
+ */
+ Collection<DeploymentUnitInfo> deploymentUnits();
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentUnitInfo.java
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentUnitInfo.java
new file mode 100644
index 00000000000..315259801a9
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentUnitInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.nio.file.Path;
+import java.util.Objects;
+import org.apache.ignite.deployment.version.Version;
+
+/**
+ * Information about a deployment unit used by a job.
+ */
+public final class DeploymentUnitInfo {
+ private final String name;
+ private final Version version;
+ private final Path path;
+
+ /**
+ * Creates deployment unit info.
+ *
+ * @param name Name of the deployment unit.
+ * @param version Version of the deployment unit.
+ * @param path Path to the deployment unit resources.
+ */
+ public DeploymentUnitInfo(String name, Version version, Path path) {
+ this.name = Objects.requireNonNull(name, "name");
+ this.version = Objects.requireNonNull(version, "version");
+ this.path = Objects.requireNonNull(path, "path");
+ }
+
+ /**
+ * Returns the name of the deployment unit.
+ *
+ * @return Name of the deployment unit.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the version of the deployment unit.
+ *
+ * @return Version of the deployment unit.
+ */
+ public Version version() {
+ return version;
+ }
+
+ /**
+ * Returns the path to the deployment unit resources.
+ *
+ * @return Path to the deployment unit resources.
+ */
+ public Path path() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return "DeploymentUnitInfo{"
+ + "name='" + name + '\''
+ + ", version=" + version
+ + ", path=" + path
+ + '}';
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 7b27fca3973..5b86b2e261d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
import static org.apache.ignite.lang.ErrorGroups.Client.HANDSHAKE_HEADER_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_COMPATIBILITY_ERR;
@@ -40,7 +41,9 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
+import java.io.IOException;
import java.util.BitSet;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -112,6 +115,8 @@ import
org.apache.ignite.client.handler.requests.table.partition.ClientTablePart
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.deployment.DeploymentUnitInfo;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
@@ -1328,19 +1333,37 @@ public class ClientInboundMessageHandler
return new IgniteException(traceId, code, message, cause);
}
+ private static void packDeploymentUnitPaths(List<String>
deploymentUnitPaths, ClientMessagePacker packer) {
+ packer.packInt(deploymentUnitPaths.size());
+ for (String path : deploymentUnitPaths) {
+ packer.packString(path);
+ }
+ }
+
+ private static void packDeploymentUnitPaths(Collection<DeploymentUnitInfo>
deploymentUnits, ClientMessagePacker packer) {
+ packer.packInt(deploymentUnits.size());
+ for (DeploymentUnitInfo unit : deploymentUnits) {
+ try {
+ packer.packString(unit.path().toRealPath().toString());
+ } catch (IOException e) {
+ throw sneakyThrow(e);
+ }
+ }
+ }
+
private class ComputeConnection implements PlatformComputeConnection {
@Override
public CompletableFuture<ComputeJobDataHolder> executeJobAsync(
long jobId,
- List<String> deploymentUnitPaths,
String jobClassName,
+ JobExecutionContext ctx,
@Nullable ComputeJobDataHolder arg
) {
return sendServerToClientRequest(ServerOp.COMPUTE_JOB_EXEC,
packer -> {
packer.packLong(jobId);
packer.packString(jobClassName);
- packDeploymentUnitPaths(deploymentUnitPaths, packer);
+ packDeploymentUnitPaths(ctx.deploymentUnits(), packer);
packer.packBoolean(false); // Retain deployment units
in cache.
ClientComputeJobPacker.packJobArgument(arg, null,
packer);
})
@@ -1383,12 +1406,5 @@ public class ClientInboundMessageHandler
ChannelHandlerContext ctx = channelHandlerContext;
return ctx != null && ctx.channel().isActive();
}
-
- private void packDeploymentUnitPaths(List<String> deploymentUnitPaths,
ClientMessagePacker packer) {
- packer.packInt(deploymentUnitPaths.size());
- for (String path : deploymentUnitPaths) {
- packer.packString(path);
- }
- }
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 6c7964a8ff2..c2090bcecb4 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -29,10 +29,12 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.io.InputStream;
@@ -222,6 +224,31 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
assertThat(successJob, willCompleteSuccessfully());
}
+ @Test
+ void jobContextProvidesDeploymentUnitInfo() {
+ var units0 = List.of(
+ new DeploymentUnit(unit.name(), Version.LATEST),
+ unit);
+
+ JobDescriptor<Void, String> job =
JobDescriptor.builder(DeploymentUnitInfoJob.class).units(units0).build();
+
+ String jobRes = compute().execute(JobTarget.node(clusterNode(0)), job,
null);
+
+ assertThat(jobRes, containsString("name='jobs'"));
+ assertThat(jobRes, containsString("version=1.0.0"));
+ assertThat(jobRes, containsString("path="));
+ assertThat(jobRes.split(";").length, equalTo(2));
+ }
+
+ @Test
+ void jobContextProvidesDeploymentUnitPath() {
+ JobDescriptor<Void, String> job =
JobDescriptor.builder(DeploymentUnitContentJob.class).units(units).build();
+
+ String jobRes = compute().execute(JobTarget.node(clusterNode(0)), job,
null);
+
+ assertEquals("ignite-integration-test-jobs-1.0-SNAPSHOT.jar", jobRes);
+ }
+
private static void deployJar(String unitId, Version unitVersion, String
jarName) throws IOException {
IgniteDeployment deployment = deployment(0);
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitContentJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitContentJob.java
new file mode 100644
index 00000000000..aaaa2dc72f2
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitContentJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** Compute job. */
+public class DeploymentUnitContentJob implements ComputeJob<Void, String> {
+ @Override
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Void input) {
+ String paths = context.deploymentUnits().stream()
+ .flatMap(x -> {
+ try {
+ return Files.list(x.path());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .map(x -> x.getFileName().toString())
+ .reduce((x, y) -> x + ";" + y)
+ .orElse("");
+
+ return completedFuture(paths);
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitInfoJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitInfoJob.java
new file mode 100644
index 00000000000..1074f43179b
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/DeploymentUnitInfoJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** Compute job. */
+public class DeploymentUnitInfoJob implements ComputeJob<Void, String> {
+ @Override
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Void input) {
+ String paths = context.deploymentUnits().stream()
+ .map(Object::toString)
+ .reduce((x, y) -> x + ";" + y)
+ .orElse("");
+
+ return completedFuture(paths);
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index 16ce3614cd2..44935641848 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -17,10 +17,16 @@
package org.apache.ignite.internal.compute;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.deployment.DeploymentUnitInfo;
import org.apache.ignite.internal.compute.loader.JobClassLoader;
+import org.apache.ignite.internal.deployunit.DisposableDeploymentUnit;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
@@ -36,6 +42,8 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
private final @Nullable Partition partition;
+ private final Lazy<Collection<DeploymentUnitInfo>> deploymentUnits;
+
/**
* Constructor.
*
@@ -49,6 +57,7 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
this.isInterrupted = isInterrupted;
this.classLoader = classLoader;
this.partition = partition;
+ this.deploymentUnits = new Lazy<>(this::initDeploymentUnits);
}
@Override
@@ -66,6 +75,12 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
return partition;
}
+ @Override
+ public Collection<DeploymentUnitInfo> deploymentUnits() {
+ //noinspection DataFlowIssue
+ return deploymentUnits.get();
+ }
+
/**
* Gets the job class loader.
*
@@ -74,4 +89,15 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
public JobClassLoader classLoader() {
return classLoader;
}
+
+ private Collection<DeploymentUnitInfo> initDeploymentUnits() {
+ List<DisposableDeploymentUnit> units = classLoader.units();
+ ArrayList<DeploymentUnitInfo> result = new ArrayList<>(units.size());
+
+ for (DisposableDeploymentUnit unit : units) {
+ result.add(new DeploymentUnitInfo(unit.unit().name(),
unit.unit().version(), unit.path()));
+ }
+
+ return result;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index f6afa008775..605aa598516 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -24,9 +24,7 @@ import static
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNu
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
-import java.io.IOException;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,7 +51,6 @@ import
org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.compute.task.JobSubmitter;
import org.apache.ignite.internal.compute.task.TaskExecutionContextImpl;
import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
-import org.apache.ignite.internal.deployunit.DisposableDeploymentUnit;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -187,26 +184,13 @@ public class ComputeExecutorImpl implements
ComputeExecutor {
throw new IllegalStateException("DotNetComputeExecutor is
not set");
}
- return
dotNetExec.getJobCallable(getDeploymentUnitPaths(classLoader), jobClassName,
arg, context);
+ return dotNetExec.getJobCallable(jobClassName, arg, context);
default:
throw new IllegalArgumentException("Unsupported executor type:
" + executorType);
}
}
- private static ArrayList<String> getDeploymentUnitPaths(JobClassLoader
classLoader) {
- ArrayList<String> unitPaths = new ArrayList<>();
-
- for (DisposableDeploymentUnit unit : classLoader.units()) {
- try {
- unitPaths.add(unit.path().toRealPath().toString());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return unitPaths;
- }
-
private static Callable<CompletableFuture<ComputeJobDataHolder>>
getJavaJobCallable(
String jobClassName,
JobClassLoader classLoader,
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/PlatformComputeConnection.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/PlatformComputeConnection.java
index 53f04c4e10d..cbc014b2e71 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/PlatformComputeConnection.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/PlatformComputeConnection.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute.executor.platform;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.jetbrains.annotations.Nullable;
@@ -30,15 +31,15 @@ public interface PlatformComputeConnection {
* Executes a job asynchronously.
*
* @param jobId Job id (for cancellation).
- * @param deploymentUnitPaths Paths to deployment units.
* @param jobClassName Name of the job class.
+ * @param ctx Job execution context.
* @param arg Arguments for the job.
* @return A CompletableFuture that will be completed with the result of
the job execution.
*/
CompletableFuture<ComputeJobDataHolder> executeJobAsync(
long jobId,
- List<String> deploymentUnitPaths,
String jobClassName,
+ JobExecutionContext ctx,
@Nullable ComputeJobDataHolder arg
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/dotnet/DotNetComputeExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/dotnet/DotNetComputeExecutor.java
index fcf5ec4d04c..39a51b16849 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/dotnet/DotNetComputeExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/platform/dotnet/DotNetComputeExecutor.java
@@ -82,19 +82,17 @@ public class DotNetComputeExecutor {
/**
* Creates a callable for executing a job.
*
- * @param deploymentUnitPaths Paths to deployment units.
* @param jobClassName Name of the job class.
* @param arg Job argument.
* @param context Job execution context.
* @return Callable that executes the job.
*/
public Callable<CompletableFuture<ComputeJobDataHolder>> getJobCallable(
- List<String> deploymentUnitPaths,
String jobClassName,
@Nullable ComputeJobDataHolder arg,
JobExecutionContext context
) {
- return () -> executeJobAsync(deploymentUnitPaths, jobClassName, arg,
context);
+ return () -> executeJobAsync(jobClassName, arg, context);
}
/**
@@ -143,7 +141,6 @@ public class DotNetComputeExecutor {
}
private CompletableFuture<ComputeJobDataHolder> executeJobAsync(
- List<String> deploymentUnitPaths,
String jobClassName,
@Nullable ComputeJobDataHolder arg,
JobExecutionContext context
@@ -157,7 +154,7 @@ public class DotNetComputeExecutor {
return getPlatformComputeConnectionWithRetryAsync()
.thenCompose(conn -> conn.connectionFut()
- .thenCompose(c -> c.executeJobAsync(jobId,
deploymentUnitPaths, jobClassName, arg))
+ .thenCompose(c -> c.executeJobAsync(jobId,
jobClassName, context, arg))
.exceptionally(e -> {
var cause = unwrapCause(e);