This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new b703159 IGNITE-16616 Implement execute method of IgniteCompute interface b703159 is described below commit b703159a6a4ac492f8cbd0535c194b85fb0d10ff Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Mar 16 10:46:32 2022 +0400 IGNITE-16616 Implement execute method of IgniteCompute interface --- modules/api/pom.xml | 4 +- .../java/org/apache/ignite/compute/ComputeJob.java | 0 .../org/apache/ignite/compute/IgniteCompute.java | 11 + .../apache/ignite/compute/JobExecutionContext.java | 3 + .../compute/ComputeConfigurationSchema.java | 42 ++ modules/compute-api/pom.xml | 60 --- modules/compute/pom.xml | 45 +- .../ignite/internal/compute/ComputeComponent.java | 70 ++++ .../internal/compute/ComputeComponentImpl.java | 254 ++++++++++++ .../internal/compute/ComputeMessageTypes.java} | 26 +- .../ignite/internal/compute/IgniteComputeImpl.java | 49 ++- .../internal/compute/JobExecutionContextImpl.java} | 20 +- .../internal/compute/message/ExecuteRequest.java} | 34 +- .../internal/compute/message/ExecuteResponse.java | 46 +++ .../internal/compute/ComputeComponentImplTest.java | 451 +++++++++++++++++++++ .../internal/compute/IgniteComputeImplTest.java | 90 ++++ .../compute/JobExecutionContextImplTest.java} | 34 +- .../internal/metastorage/MetaStorageManager.java | 2 +- .../internal/AbstractClusterIntegrationTest.java | 141 +++++++ .../ignite/internal/compute/ItComputeTest.java | 170 ++++++++ .../org/apache/ignite/internal/app/IgniteImpl.java | 24 +- .../CoreLocalConfigurationModule.java | 4 +- .../CoreLocalConfigurationModuleTest.java | 6 + parent/pom.xml | 6 - pom.xml | 1 - 25 files changed, 1473 insertions(+), 120 deletions(-) diff --git a/modules/api/pom.xml b/modules/api/pom.xml index 25b3eba..0075a29 100644 --- a/modules/api/pom.xml +++ b/modules/api/pom.xml @@ -35,12 +35,12 @@ <dependencies> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-compute-api</artifactId> + <artifactId>ignite-configuration-api</artifactId> </dependency> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-configuration-api</artifactId> + <artifactId>ignite-network-api</artifactId> </dependency> <!-- 3rd party dependencies --> diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java similarity index 100% copy from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java copy to modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java similarity index 79% copy from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java copy to modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index 2ffcaf8..59ebdc9 100644 --- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -38,4 +38,15 @@ public interface IgniteCompute { * @return future job result */ <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args); + + /** + * Executes a {@link ComputeJob}. + * + * @param nodes nodes on which to execute the job + * @param jobClassName name of the job class to execute + * @param args arguments of the job + * @param <R> job result type + * @return future job result + */ + <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args); } diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java similarity index 94% copy from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java copy to modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java index ee190e2..eade5e1 100644 --- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java @@ -17,8 +17,11 @@ package org.apache.ignite.compute; +import org.apache.ignite.Ignite; + /** * Context of {@link ComputeJob} execution. */ public interface JobExecutionContext { + Ignite ignite(); } diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java new file mode 100644 index 0000000..9146f57 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/compute/ComputeConfigurationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration.schemas.compute; + +import static java.lang.Math.max; + +import org.apache.ignite.configuration.annotation.ConfigurationRoot; +import org.apache.ignite.configuration.annotation.ConfigurationType; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Min; + +/** + * Configuration schema for Compute functionality. + */ +@SuppressWarnings("PMD.UnusedPrivateField") +@ConfigurationRoot(rootName = "compute", type = ConfigurationType.LOCAL) +public class ComputeConfigurationSchema { + /** Job thread pool size. */ + @Min(1) + @Value(hasDefault = true) + public final int threadPoolSize = max(Runtime.getRuntime().availableProcessors(), 8); + + /** Job thread pool stop timeout (milliseconds). */ + @Min(1) + @Value(hasDefault = true) + public final long threadPoolStopTimeoutMillis = 10_000; +} diff --git a/modules/compute-api/pom.xml b/modules/compute-api/pom.xml deleted file mode 100644 index bd742a4..0000000 --- a/modules/compute-api/pom.xml +++ /dev/null @@ -1,60 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.ignite</groupId> - <artifactId>ignite-parent</artifactId> - <version>1</version> - <relativePath>../../parent/pom.xml</relativePath> - </parent> - - <artifactId>ignite-compute-api</artifactId> - <version>3.0.0-SNAPSHOT</version> - - <dependencies> - <dependency> - <groupId>org.apache.ignite</groupId> - <artifactId>ignite-network-api</artifactId> - </dependency> - - <!-- Test dependencies --> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> diff --git a/modules/compute/pom.xml b/modules/compute/pom.xml index e4fd50e..ffe5b6d 100644 --- a/modules/compute/pom.xml +++ b/modules/compute/pom.xml @@ -35,7 +35,12 @@ <dependencies> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-compute-api</artifactId> + <artifactId>ignite-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-network</artifactId> </dependency> <!-- Test dependencies --> @@ -56,5 +61,43 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-network-annotation-processor</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <configuration> + <annotationProcessorPaths> + <path> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-network-annotation-processor</artifactId> + <version>${project.version}</version> + </path> + </annotationProcessorPaths> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java new file mode 100644 index 0000000..81bb36c --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.network.ClusterNode; + +/** + * Compute functionality. + */ +public interface ComputeComponent extends IgniteComponent { + /** + * Executes a job of the given class on the current node. + * + * @param jobClass job class + * @param args job args + * @param <R> result type + * @return future execution result + */ + <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args); + + /** + * Executes a job of the given class on the current node. + * + * @param jobClassName name of the job class + * @param args job args + * @param <R> result type + * @return future execution result + */ + <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args); + + /** + * Executes a job of the given class on a remote node. + * + * @param remoteNode name of the job class + * @param jobClass job class + * @param args job args + * @param <R> result type + * @return future execution result + */ + <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args); + + /** + * Executes a job of the given class on a remote node. + * + * @param remoteNode name of the job class + * @param jobClassName name of the job class + * @param args job args + * @param <R> result type + * @return future execution result + */ + <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args); +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java new file mode 100644 index 0000000..3b7e8a4 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.lang.reflect.Constructor; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration; +import org.apache.ignite.internal.compute.message.ExecuteRequest; +import org.apache.ignite.internal.compute.message.ExecuteResponse; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.lang.NodeStoppingException; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of {@link ComputeComponent}. + */ +public class ComputeComponentImpl implements ComputeComponent { + private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE; + + private static final long THREAD_KEEP_ALIVE_SECONDS = 60; + + private final Ignite ignite; + private final MessagingService messagingService; + private final ComputeConfiguration configuration; + + private ExecutorService jobExecutorService; + + private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + /** Busy lock to stop synchronously. */ + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + + /** Prevents double stopping the component. */ + private final AtomicBoolean stopGuard = new AtomicBoolean(); + + /** + * Creates a new instance. + */ + public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration configuration) { + this.ignite = ignite; + this.messagingService = messagingService; + this.configuration = configuration; + } + + /** {@inheritDoc} */ + @Override + public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> jobClass, Object... args) { + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(new NodeStoppingException()); + } + + try { + return doExecuteLocally(jobClass, args); + } finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override + public <R> CompletableFuture<R> executeLocally(String jobClassName, Object... args) { + return completedFuture(null).thenCompose(ignore -> executeLocally(jobClass(jobClassName), args)); + } + + private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> jobClass, Object[] args) { + assert jobExecutorService != null : "Not started yet!"; + + try { + return CompletableFuture.supplyAsync(() -> executeJob(jobClass, args), jobExecutorService); + } catch (RejectedExecutionException e) { + return CompletableFuture.failedFuture(e); + } + } + + private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[] args) { + ComputeJob<R> job = instantiateJob(jobClass); + JobExecutionContext context = new JobExecutionContextImpl(ignite); + return job.execute(context, args); + } + + private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> jobClass) { + if (!(ComputeJob.class.isAssignableFrom(jobClass))) { + throw new IgniteInternalException("'" + jobClass.getName() + "' does not implement ComputeJob interface"); + } + + try { + Constructor<? extends ComputeJob<R>> constructor = jobClass.getDeclaredConstructor(); + + if (!constructor.canAccess(null)) { + constructor.setAccessible(true); + } + + return constructor.newInstance(); + } catch (ReflectiveOperationException e) { + throw new IgniteInternalException("Cannot instantiate job", e); + } + } + + /** {@inheritDoc} */ + @Override + public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object... args) { + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(new NodeStoppingException()); + } + + try { + return doExecuteRemotely(remoteNode, jobClass, args); + } finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override + public <R> CompletableFuture<R> executeRemotely(ClusterNode remoteNode, String jobClassName, Object... args) { + return completedFuture(null).thenCompose(ignored -> executeRemotely(remoteNode, jobClass(jobClassName), args)); + } + + private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode remoteNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) { + ExecuteRequest executeRequest = messagesFactory.executeRequest() + .jobClassName(jobClass.getName()) + .args(args) + .build(); + + return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS) + .thenCompose(message -> resultFromExecuteResponse((ExecuteResponse) message)); + } + + @SuppressWarnings("unchecked") + private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) { + if (executeResponse.throwable() != null) { + return CompletableFuture.failedFuture(executeResponse.throwable()); + } + + return completedFuture((R) executeResponse.result()); + } + + /** {@inheritDoc} */ + @Override + public synchronized void start() { + jobExecutorService = new ThreadPoolExecutor( + configuration.threadPoolSize().value(), + configuration.threadPoolSize().value(), + THREAD_KEEP_ALIVE_SECONDS, + TimeUnit.SECONDS, + newExecutorServiceTaskQueue(), + new NamedThreadFactory("[" + ignite.name() + "] Compute-") + ); + + messagingService.addMessageHandler(ComputeMessageTypes.class, (message, senderAddr, correlationId) -> { + assert correlationId != null; + + if (message instanceof ExecuteRequest) { + processExecuteRequest((ExecuteRequest) message, senderAddr, correlationId); + + return; + } + + throw new IgniteInternalException("Unexpected message type " + message.getClass()); + }); + } + + BlockingQueue<Runnable> newExecutorServiceTaskQueue() { + return new LinkedBlockingQueue<>(); + } + + private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress senderAddr, long correlationId) { + if (!busyLock.enterBusy()) { + sendExecuteResponse(null, new NodeStoppingException(), senderAddr, correlationId); + return; + } + + try { + Class<ComputeJob<Object>> jobClass = jobClass(executeRequest.jobClassName()); + + doExecuteLocally(jobClass, executeRequest.args()) + .handle((result, ex) -> sendExecuteResponse(result, ex, senderAddr, correlationId)); + } finally { + busyLock.leaveBusy(); + } + } + + @Nullable + private Object sendExecuteResponse(Object result, Throwable ex, NetworkAddress senderAddr, Long correlationId) { + ExecuteResponse executeResponse = messagesFactory.executeResponse() + .result(result) + .throwable(ex) + .build(); + + messagingService.respond(senderAddr, executeResponse, correlationId); + + return null; + } + + @SuppressWarnings("unchecked") + private <R, J extends ComputeJob<R>> Class<J> jobClass(String jobClassName) { + try { + return (Class<J>) Class.forName(jobClassName, true, jobClassLoader); + } catch (ClassNotFoundException e) { + throw new IgniteInternalException("Cannot load job class by name '" + jobClassName + "'", e); + } + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + if (!stopGuard.compareAndSet(false, true)) { + return; + } + + busyLock.block(); + + IgniteUtils.shutdownAndAwaitTermination(jobExecutorService, stopTimeoutMillis(), TimeUnit.MILLISECONDS); + } + + long stopTimeoutMillis() { + return configuration.threadPoolStopTimeoutMillis().value(); + } +} diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java similarity index 59% rename from modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java index 4ce517a..8f684ba 100644 --- a/modules/compute-api/src/main/java/org/apache/ignite/compute/ComputeJob.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java @@ -15,20 +15,24 @@ * limitations under the License. */ -package org.apache.ignite.compute; +package org.apache.ignite.internal.compute; + +import org.apache.ignite.internal.compute.message.ExecuteRequest; +import org.apache.ignite.internal.compute.message.ExecuteResponse; +import org.apache.ignite.network.annotations.MessageGroup; /** - * A Compute job that may be executed on an Ignite node (or a few nodes, or on the whole cluster). - * - * @param <R> job result type + * Message types for the Compute module. */ -public interface ComputeJob<R> { +@MessageGroup(groupName = "ComputeMessages", groupType = 6) +public class ComputeMessageTypes { + /** + * Type for {@link ExecuteRequest}. + */ + public static final short EXECUTE_REQUEST = 0; + /** - * Executes the job on an Ignite node. - * - * @param context context - * @param args job arguments - * @return job result + * Type for {@link ExecuteResponse}. */ - R execute(JobExecutionContext context, Object... args); + public static final short EXECUTE_RESPONSE = 1; } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 5ba9061..9a5b43a 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -17,20 +17,65 @@ package org.apache.ignite.internal.compute; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.TopologyService; /** * Implementation of {@link IgniteCompute}. */ public class IgniteComputeImpl implements IgniteCompute { + private final TopologyService topologyService; + private final ComputeComponent computeComponent; + + private final ThreadLocalRandom random = ThreadLocalRandom.current(); + + public IgniteComputeImpl(TopologyService topologyService, ComputeComponent computeComponent) { + this.topologyService = topologyService; + this.computeComponent = computeComponent; + } + /** {@inheritDoc} */ @Override public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) { - // TODO: IGNITE-16616 - implement this method - throw new UnsupportedOperationException("Not implemented yet"); + ClusterNode targetNode = randomNode(nodes); + + if (isLocal(targetNode)) { + return computeComponent.executeLocally(jobClass, args); + } else { + return computeComponent.executeRemotely(targetNode, jobClass, args); + } + } + + /** {@inheritDoc} */ + @Override + public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) { + ClusterNode targetNode = randomNode(nodes); + + if (isLocal(targetNode)) { + return computeComponent.executeLocally(jobClassName, args); + } else { + return computeComponent.executeRemotely(targetNode, jobClassName, args); + } + } + + private boolean isLocal(ClusterNode targetNode) { + return targetNode.equals(topologyService.localMember()); + } + + private ClusterNode randomNode(Set<ClusterNode> nodes) { + int nodesToSkip = random.nextInt(nodes.size()); + + Iterator<ClusterNode> iterator = nodes.iterator(); + for (int i = 0; i < nodesToSkip; i++) { + iterator.next(); + } + + return iterator.next(); } } diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java similarity index 63% rename from modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java index ee190e2..aa81b1b 100644 --- a/modules/compute-api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java @@ -15,10 +15,24 @@ * limitations under the License. */ -package org.apache.ignite.compute; +package org.apache.ignite.internal.compute; + +import org.apache.ignite.Ignite; +import org.apache.ignite.compute.JobExecutionContext; /** - * Context of {@link ComputeJob} execution. + * Implementation of {@link JobExecutionContext}. */ -public interface JobExecutionContext { +public class JobExecutionContextImpl implements JobExecutionContext { + private final Ignite ignite; + + public JobExecutionContextImpl(Ignite ignite) { + this.ignite = ignite; + } + + /** {@inheritDoc} */ + @Override + public Ignite ignite() { + return ignite; + } } diff --git a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java similarity index 54% rename from modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java rename to modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java index 2ffcaf8..b6c9cf4 100644 --- a/modules/compute-api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteRequest.java @@ -15,27 +15,31 @@ * limitations under the License. */ -package org.apache.ignite.compute; +package org.apache.ignite.internal.compute.message; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.internal.compute.ComputeMessageTypes; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; /** - * Provides access to the Compute functionality: the ability to execute compute jobs. - * - * @see ComputeJob - * @see ComputeJob#execute(JobExecutionContext, Object...) + * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}. */ -public interface IgniteCompute { +@Transferable(value = ComputeMessageTypes.EXECUTE_REQUEST) +public interface ExecuteRequest extends NetworkMessage { + /** + * Returns job class name. + * + * @return job class name + */ + String jobClassName(); + /** - * Executes a {@link ComputeJob}. + * Returns job arguments. * - * @param nodes nodes on which to execute the job - * @param jobClass class of the job to execute - * @param args arguments of the job - * @param <R> job result type - * @return future job result + * @return arguments */ - <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args); + @Marshallable + Object[] args(); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java new file mode 100644 index 0000000..3601925 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/ExecuteResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.message; + +import java.util.Set; +import org.apache.ignite.internal.compute.ComputeMessageTypes; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Used to implement remote job execution in {@link org.apache.ignite.compute.IgniteCompute#execute(Set, Class, Object...)}. + */ +@Transferable(value = ComputeMessageTypes.EXECUTE_RESPONSE) +public interface ExecuteResponse extends NetworkMessage { + /** + * Returns job execution result ({@code null} if the execution has failed). + * + * @return result ({@code null} if the execution has failed) + */ + @Marshallable + Object result(); + + /** + * Returns a {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful). + * + * @return {@link Throwable} that was thrown during job execution ({@code null} if the execution was successful) + */ + @Marshallable + Throwable throwable(); +} diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java new file mode 100644 index 0000000..b9e7cbd --- /dev/null +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration; +import org.apache.ignite.internal.compute.message.ExecuteRequest; +import org.apache.ignite.internal.compute.message.ExecuteResponse; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.lang.NodeStoppingException; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessageHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +@Timeout(10) +class ComputeComponentImplTest { + private static final String INSTANCE_NAME = "Ignite-0"; + + @Mock + private Ignite ignite; + + @Mock + private MessagingService messagingService; + + @Mock + private ComputeConfiguration computeConfiguration; + + @Mock + private ConfigurationValue<Integer> threadPoolSizeValue; + @Mock + private ConfigurationValue<Long> threadPoolStopTimeoutMillisValue; + + @InjectMocks + private ComputeComponentImpl computeComponent; + + @Captor + private ArgumentCaptor<ExecuteRequest> executeRequestCaptor; + @Captor + private ArgumentCaptor<ExecuteResponse> executeResponseCaptor; + + private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote")); + + private final AtomicReference<NetworkMessageHandler> computeMessageHandlerRef = new AtomicReference<>(); + + private final AtomicBoolean responseSent = new AtomicBoolean(false); + + @BeforeEach + void setUp() { + lenient().when(computeConfiguration.threadPoolSize()).thenReturn(threadPoolSizeValue); + lenient().when(threadPoolSizeValue.value()).thenReturn(8); + lenient().when(computeConfiguration.threadPoolStopTimeoutMillis()).thenReturn(threadPoolStopTimeoutMillisValue); + lenient().when(threadPoolStopTimeoutMillisValue.value()).thenReturn(10_000L); + + lenient().when(ignite.name()).thenReturn(INSTANCE_NAME); + + doAnswer(invocation -> { + computeMessageHandlerRef.set(invocation.getArgument(1)); + return null; + }).when(messagingService).addMessageHandler(eq(ComputeMessageTypes.class), any()); + + computeComponent.start(); + } + + @AfterEach + void cleanup() throws Exception { + computeComponent.stop(); + } + + @Test + void executesLocally() throws Exception { + String result = computeComponent.executeLocally(SimpleJob.class, "a", 42).get(); + + assertThat(result, is("jobResponse")); + + assertThatExecuteRequestWasNotSent(); + } + + private void assertThatExecuteRequestWasNotSent() { + verify(messagingService, never()).invoke(any(ClusterNode.class), any(), anyLong()); + verify(messagingService, never()).invoke(any(NetworkAddress.class), any(), anyLong()); + } + + @Test + void executesLocallyWithException() { + ExecutionException ex = assertThrows(ExecutionException.class, () -> computeComponent.executeLocally(FailingJob.class).get()); + + assertThat(ex.getCause(), is(instanceOf(JobException.class))); + assertThat(ex.getCause().getMessage(), is("Oops")); + assertThat(ex.getCause().getCause(), is(notNullValue())); + } + + @Test + void executesLocallyByClassName() throws Exception { + String result = computeComponent.<String>executeLocally(SimpleJob.class.getName(), "a", 42).get(); + + assertThat(result, is("jobResponse")); + + assertThatExecuteRequestWasNotSent(); + } + + @Test + void executesRemotelyUsingNetworkCommunication() throws Exception { + respondWithExecuteResponseWhenExecuteRequestIsSent(); + + String result = computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42).get(); + + assertThat(result, is("remoteResponse")); + + assertThatExecuteRequestWasSent(); + } + + private void respondWithExecuteResponseWhenExecuteRequestIsSent() { + ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse() + .result("remoteResponse") + .build(); + when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong())) + .thenReturn(CompletableFuture.completedFuture(executeResponse)); + } + + private void assertThatExecuteRequestWasSent() { + verify(messagingService).invoke(eq(remoteNode), executeRequestCaptor.capture(), anyLong()); + + ExecuteRequest capturedRequest = executeRequestCaptor.getValue(); + + assertThat(capturedRequest.jobClassName(), is(SimpleJob.class.getName())); + assertThat(capturedRequest.args(), is(equalTo(new Object[]{"a", 42}))); + } + + @Test + void executesRemotelyWithException() { + ExecuteResponse executeResponse = new ComputeMessagesFactory().executeResponse() + .throwable(new JobException("Oops", new Exception())) + .build(); + when(messagingService.invoke(any(ClusterNode.class), any(ExecuteRequest.class), anyLong())) + .thenReturn(CompletableFuture.completedFuture(executeResponse)); + + ExecutionException ex = assertThrows( + ExecutionException.class, + () -> computeComponent.executeRemotely(remoteNode, FailingJob.class).get() + ); + + assertThat(ex.getCause(), is(instanceOf(JobException.class))); + assertThat(ex.getCause().getMessage(), is("Oops")); + assertThat(ex.getCause().getCause(), is(notNullValue())); + } + + @Test + void executesRemotelyByClassNameUsingNetworkCommunication() throws Exception { + respondWithExecuteResponseWhenExecuteRequestIsSent(); + + String result = computeComponent.<String>executeRemotely(remoteNode, SimpleJob.class.getName(), "a", 42).get(); + + assertThat(result, is("remoteResponse")); + + assertThatExecuteRequestWasSent(); + } + + @Test + void executesJobAndRespondsWhenGetsExecuteRequest() throws Exception { + markResponseSentOnResponseSend(); + assertThat(computeMessageHandlerRef.get(), is(notNullValue())); + + NetworkAddress senderAddress = new NetworkAddress("some-host", 1); + + ExecuteRequest request = new ComputeMessagesFactory().executeRequest() + .jobClassName(SimpleJob.class.getName()) + .args(new Object[]{"a", 42}) + .build(); + computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L); + + assertThatExecuteResponseIsSentTo(senderAddress); + } + + private void markResponseSentOnResponseSend() { + when(messagingService.respond(any(NetworkAddress.class), any(), anyLong())) + .thenAnswer(invocation -> { + responseSent.set(true); + return null; + }); + } + + private void assertThatExecuteResponseIsSentTo(NetworkAddress senderAddress) throws InterruptedException { + assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent"); + + verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L)); + ExecuteResponse response = executeResponseCaptor.getValue(); + + assertThat(response.result(), is("jobResponse")); + assertThat(response.throwable(), is(nullValue())); + } + + @Test + void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws Exception { + computeComponent.stop(); + + Object result = computeComponent.executeLocally(SimpleJob.class) + .handle((s, ex) -> ex != null ? ex : s) + .get(); + + assertThat(result, is(instanceOf(NodeStoppingException.class))); + } + + @Test + void localExecutionReleasesStopLock() throws Exception { + computeComponent.executeLocally(SimpleJob.class).get(); + + assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop()); + } + + @Test + void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws Exception { + computeComponent.stop(); + + Object result = computeComponent.executeRemotely(remoteNode, SimpleJob.class) + .handle((s, ex) -> ex != null ? ex : s) + .get(); + + assertThat(result, is(instanceOf(NodeStoppingException.class))); + } + + @Test + void remoteExecutionReleasesStopLock() throws Exception { + respondWithExecuteResponseWhenExecuteRequestIsSent(); + + computeComponent.executeRemotely(remoteNode, SimpleJob.class).get(); + + assertTimeoutPreemptively(Duration.ofSeconds(3), () -> computeComponent.stop()); + } + + @Test + void stoppedComponentReturnsExceptionOnExecuteRequestAttempt() throws Exception { + computeComponent.stop(); + + markResponseSentOnResponseSend(); + assertThat(computeMessageHandlerRef.get(), is(notNullValue())); + + NetworkAddress senderAddress = new NetworkAddress("some-host", 1); + + ExecuteRequest request = new ComputeMessagesFactory().executeRequest() + .jobClassName(SimpleJob.class.getName()) + .args(new Object[]{"a", 42}) + .build(); + computeMessageHandlerRef.get().onReceived(request, senderAddress, 123L); + + assertThatNodeStoppingExceptionIsSentTo(senderAddress); + } + + private void assertThatNodeStoppingExceptionIsSentTo(NetworkAddress senderAddress) throws InterruptedException { + assertTrue(IgniteTestUtils.waitForCondition(responseSent::get, 1000), "No response sent"); + + verify(messagingService).respond(eq(senderAddress), executeResponseCaptor.capture(), eq(123L)); + ExecuteResponse response = executeResponseCaptor.getValue(); + + assertThat(response.result(), is(nullValue())); + assertThat(response.throwable(), is(instanceOf(NodeStoppingException.class))); + } + + @Test + void executorThreadsAreNamedAccordingly() throws Exception { + String threadName = computeComponent.executeLocally(GetThreadNameJob.class).get(); + + assertThat(threadName, startsWith("[" + INSTANCE_NAME + "] Compute-")); + } + + @Test + void executionRejectionCausesExceptionToBeReturnedViaFuture() throws Exception { + restrictPoolSizeTo1(); + + computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) { + @Override + BlockingQueue<Runnable> newExecutorServiceTaskQueue() { + return new SynchronousQueue<>(); + } + + @Override + long stopTimeoutMillis() { + return 100; + } + }; + computeComponent.start(); + + // take the only executor thread + computeComponent.executeLocally(LongJob.class); + + Object result = computeComponent.executeLocally(SimpleJob.class) + .handle((res, ex) -> ex != null ? ex : res) + .get(); + + assertThat(result, is(instanceOf(RejectedExecutionException.class))); + } + + private void restrictPoolSizeTo1() { + when(threadPoolSizeValue.value()).thenReturn(1); + } + + // TODO: IGNITE-16705 - enable this test + @Test + @Disabled("IGNITE-16705") + void taskDropByExecutorServiceDueToStopCausesCancellationExceptionToBeReturnedViaFuture() throws Exception { + restrictPoolSizeTo1(); + + computeComponent = new ComputeComponentImpl(ignite, messagingService, computeConfiguration) { + @Override + long stopTimeoutMillis() { + return 100; + } + }; + computeComponent.start(); + + // take the only executor thread + computeComponent.executeLocally(LongJob.class); + + // the corresponding task goes to work queue + CompletableFuture<Object> resultFuture = computeComponent.executeLocally(SimpleJob.class) + .handle((res, ex) -> ex != null ? ex : res); + + computeComponent.stop(); + + // now work queue is dropped to the floor, so the future should be resolved with a cancellation + + Object result = resultFuture.get(3, TimeUnit.SECONDS); + + assertThat(result, is(instanceOf(CancellationException.class))); + assertThat(((CancellationException) result).getMessage(), is("Cancelled due to node stop")); + } + + @Test + void executionOfJobOfNonExistentClassResultsInException() throws Exception { + Object result = computeComponent.executeLocally("no-such-class") + .handle((res, ex) -> ex != null ? ex : res) + .get(); + + assertThat(result, is(instanceOf(Exception.class))); + assertThat(((Exception) result).getMessage(), containsString("Cannot load job class by name 'no-such-class'")); + } + + @Test + void executionOfNonJobClassResultsInException() throws Exception { + Object result = computeComponent.executeLocally(Object.class.getName()) + .handle((res, ex) -> ex != null ? ex : res) + .get(); + + assertThat(result, is(instanceOf(Exception.class))); + assertThat(((Exception) result).getMessage(), containsString("'java.lang.Object' does not implement ComputeJob interface")); + } + + private static class SimpleJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + return "jobResponse"; + } + } + + private static class FailingJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + throw new JobException("Oops", new Exception()); + } + } + + private static class JobException extends RuntimeException { + public JobException(String message, Throwable cause) { + super(message, cause); + } + } + + private static class GetThreadNameJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + return Thread.currentThread().getName(); + } + } + + private static class LongJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + try { + Thread.sleep(1_000_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return null; + } + } +} diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java new file mode 100644 index 0000000..42ceb2b --- /dev/null +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static java.util.Collections.singleton; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.TopologyService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class IgniteComputeImplTest { + @Mock + private TopologyService topologyService; + + @Mock + private ComputeComponent computeComponent; + + @InjectMocks + private IgniteComputeImpl compute; + + private final ClusterNode localNode = new ClusterNode("local", "local", new NetworkAddress("local-host", 1, "local")); + private final ClusterNode remoteNode = new ClusterNode("remote", "remote", new NetworkAddress("remote-host", 1, "remote")); + + @BeforeEach + void setupMocks() { + lenient().when(topologyService.localMember()).thenReturn(localNode); + } + + @Test + void whenNodeIsLocalThenExecutesLocally() throws Exception { + when(computeComponent.executeLocally(SimpleJob.class, "a", 42)) + .thenReturn(CompletableFuture.completedFuture("jobResponse")); + + String result = compute.execute(singleton(localNode), SimpleJob.class, "a", 42).get(); + + assertThat(result, is("jobResponse")); + + verify(computeComponent).executeLocally(SimpleJob.class, "a", 42); + } + + @Test + void whenNodeIsRemoteThenExecutesRemotely() throws Exception { + when(computeComponent.executeRemotely(remoteNode, SimpleJob.class, "a", 42)) + .thenReturn(CompletableFuture.completedFuture("remoteResponse")); + + String result = compute.execute(singleton(remoteNode), SimpleJob.class, "a", 42).get(); + + assertThat(result, is("remoteResponse")); + + verify(computeComponent).executeRemotely(remoteNode, SimpleJob.class, "a", 42); + } + + private static class SimpleJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + return "jobResponse"; + } + } +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java similarity index 55% copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java copy to modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java index 5ba9061..70f24d2 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java @@ -17,20 +17,26 @@ package org.apache.ignite.internal.compute; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.IgniteCompute; -import org.apache.ignite.network.ClusterNode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; -/** - * Implementation of {@link IgniteCompute}. - */ -public class IgniteComputeImpl implements IgniteCompute { - /** {@inheritDoc} */ - @Override - public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) { - // TODO: IGNITE-16616 - implement this method - throw new UnsupportedOperationException("Not implemented yet"); +import org.apache.ignite.Ignite; +import org.apache.ignite.compute.JobExecutionContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class JobExecutionContextImplTest { + @Mock + private Ignite ignite; + + @Test + void returnsIgnite() { + JobExecutionContext context = new JobExecutionContextImpl(ignite); + + assertThat(context.ignite(), is(sameInstance(ignite))); } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index aad0360..e027665 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -143,7 +143,7 @@ public class MetaStorageManager implements IgniteComponent { private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); /** Prevents double stopping the component. */ - AtomicBoolean stopGuard = new AtomicBoolean(); + private final AtomicBoolean stopGuard = new AtomicBoolean(); /** * The constructor. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java new file mode 100644 index 0000000..a3d5ef5 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgnitionManager; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.lang.IgniteStringFormatter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Abstract integration test that starts and stops a cluster. + */ +@ExtendWith(WorkDirectoryExtension.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest { + private static final IgniteLogger LOG = IgniteLogger.forClass(AbstractClusterIntegrationTest.class); + + /** Base port number. */ + private static final int BASE_PORT = 3344; + + /** Nodes bootstrap configuration pattern. */ + private static final String NODE_BOOTSTRAP_CFG = "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ {} ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":{},\n" + + " \"nodeFinder\":{\n" + + " \"netClusterNodes\": [ {} ]\n" + + " }\n" + + " }\n" + + "}"; + + /** Cluster nodes. */ + protected final List<Ignite> clusterNodes = new ArrayList<>(); + + /** Work directory. */ + @WorkDirectory + private static Path WORK_DIR; + + /** + * Before all. + * + * @param testInfo Test information oject. + */ + @BeforeEach + void startNodes(TestInfo testInfo) { + //TODO: IGNITE-16034 Here we assume that Metastore consists of one node, and it starts first. + String metastorageNodes = '\"' + IgniteTestUtils.testNodeName(testInfo, 0) + '\"'; + + String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"'; + + for (int i = 0; i < nodes(); i++) { + String curNodeName = IgniteTestUtils.testNodeName(testInfo, i); + + clusterNodes.add(IgnitionManager.start(curNodeName, IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, + metastorageNodes, + BASE_PORT + i, + connectNodeAddr + ), WORK_DIR.resolve(curNodeName))); + } + } + + /** + * Get a count of nodes in the Ignite cluster. + * + * @return Count of nodes. + */ + protected int nodes() { + return 3; + } + + /** + * After all. + */ + @AfterEach + void stopNodes() throws Exception { + LOG.info("Start tearDown()"); + + IgniteUtils.closeAll(ItUtils.reverse(clusterNodes)); + + clusterNodes.clear(); + + LOG.info("End tearDown()"); + } + + /** + * Invokes before the test will start. + * + * @param testInfo Test information oject. + * @throws Exception If failed. + */ + @BeforeEach + public void setup(TestInfo testInfo) throws Exception { + setupBase(testInfo, WORK_DIR); + } + + /** + * Invokes after the test has finished. + * + * @param testInfo Test information oject. + * @throws Exception If failed. + */ + @AfterEach + public void tearDown(TestInfo testInfo) throws Exception { + tearDownBase(testInfo); + } + + protected final IgniteImpl node(int index) { + return (IgniteImpl) clusterNodes.get(index); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java new file mode 100644 index 0000000..21be9fb --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import static java.util.stream.Collectors.joining; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.internal.AbstractClusterIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for Compute functionality. + */ +class ItComputeTest extends AbstractClusterIntegrationTest { + @Test + void executesJobLocally() throws Exception { + IgniteImpl entryNode = node(0); + + String result = entryNode.compute() + .execute(Set.of(entryNode.node()), ConcatJob.class, "a", 42) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is("a42")); + } + + @Test + void executesJobLocallyByClassName() throws Exception { + IgniteImpl entryNode = node(0); + + String result = entryNode.compute() + .<String>execute(Set.of(entryNode.node()), ConcatJob.class.getName(), "a", 42) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is("a42")); + } + + @Test + void executesJobOnRemoteNodes() throws Exception { + Ignite entryNode = node(0); + + String result = entryNode.compute() + .execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class, "a", 42) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is("a42")); + } + + @Test + void executesJobByClassNameOnRemoteNodes() throws Exception { + Ignite entryNode = node(0); + + String result = entryNode.compute() + .<String>execute(Set.of(node(1).node(), node(2).node()), ConcatJob.class.getName(), "a", 42) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is("a42")); + } + + @Test + void localExecutionActuallyUsesLocalNode() throws Exception { + IgniteImpl entryNode = node(0); + + String result = entryNode.compute() + .execute(Set.of(entryNode.node()), GetNodeNameJob.class) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is(entryNode.name())); + } + + @Test + void remoteExecutionActuallyUsesRemoteNode() throws Exception { + IgniteImpl entryNode = node(0); + IgniteImpl remoteNode = node(1); + + String result = entryNode.compute() + .execute(Set.of(remoteNode.node()), GetNodeNameJob.class) + .get(1, TimeUnit.SECONDS); + + assertThat(result, is(remoteNode.name())); + } + + @Test + void executesFailingJobLocally() { + IgniteImpl entryNode = node(0); + + ExecutionException ex = assertThrows(ExecutionException.class, () -> { + entryNode.compute() + .execute(Set.of(entryNode.node()), FailingJob.class) + .get(1, TimeUnit.SECONDS); + }); + + assertThat(ex.getCause(), is(instanceOf(JobException.class))); + assertThat(ex.getCause().getMessage(), is("Oops")); + assertThat(ex.getCause().getCause(), is(notNullValue())); + } + + @Test + void executesFailingJobOnRemoteNodes() { + Ignite entryNode = node(0); + + ExecutionException ex = assertThrows(ExecutionException.class, () -> { + entryNode.compute() + .execute(Set.of(node(1).node(), node(2).node()), FailingJob.class) + .get(1, TimeUnit.SECONDS); + }); + + assertThat(ex.getCause(), is(instanceOf(JobException.class))); + assertThat(ex.getCause().getMessage(), is("Oops")); + assertThat(ex.getCause().getCause(), is(notNullValue())); + } + + private static class ConcatJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + return Arrays.stream(args) + .map(Object::toString) + .collect(joining()); + } + } + + private static class GetNodeNameJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + return context.ignite().name(); + } + } + + private static class FailingJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + throw new JobException("Oops", new Exception()); + } + } + + private static class JobException extends RuntimeException { + public JobException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index f641354..38bc296 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -32,11 +32,15 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.client.handler.ClientHandlerModule; import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration; import org.apache.ignite.configuration.schemas.network.NetworkConfiguration; import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration; import org.apache.ignite.configuration.schemas.table.TablesConfiguration; import org.apache.ignite.internal.baseline.BaselineManager; import org.apache.ignite.internal.components.LongJvmPauseDetector; +import org.apache.ignite.internal.compute.ComputeComponent; +import org.apache.ignite.internal.compute.ComputeComponentImpl; +import org.apache.ignite.internal.compute.ComputeMessagesSerializationRegistryInitializer; import org.apache.ignite.internal.compute.IgniteComputeImpl; import org.apache.ignite.internal.configuration.ConfigurationManager; import org.apache.ignite.internal.configuration.ConfigurationModule; @@ -69,6 +73,7 @@ import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteLogger; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterLocalConfiguration; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessageSerializationRegistryImpl; import org.apache.ignite.network.NettyBootstrapFactory; @@ -78,6 +83,7 @@ import org.apache.ignite.table.manager.IgniteTables; import org.apache.ignite.tx.IgniteTransactions; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Ignite internal implementation. @@ -116,6 +122,8 @@ public class IgniteImpl implements Ignite { /** Cluster service (cluster network manager). */ private final ClusterService clusterSvc; + private final ComputeComponent computeComponent; + /** Netty bootstrap factory. */ private final NettyBootstrapFactory nettyBootstrapFactory; @@ -186,6 +194,7 @@ public class IgniteImpl implements Ignite { RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry); SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry); TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry); + ComputeMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry); var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry); @@ -199,6 +208,9 @@ public class IgniteImpl implements Ignite { nettyBootstrapFactory ); + computeComponent = new ComputeComponentImpl(this, clusterSvc.messagingService(), + nodeCfgMgr.configurationRegistry().getConfiguration(ComputeConfiguration.KEY)); + raftMgr = new Loza(clusterSvc, workDir); txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager()); @@ -346,6 +358,7 @@ public class IgniteImpl implements Ignite { List<IgniteComponent> otherComponents = List.of( nettyBootstrapFactory, clusterSvc, + computeComponent, raftMgr, txManager, metaStorageMgr, @@ -385,8 +398,8 @@ public class IgniteImpl implements Ignite { */ public void stop() { if (status.getAndSet(Status.STOPPING) == Status.STARTED) { - doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, - baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory)); + doStopNode(List.of(longJvmPauseDetector, vaultMgr, nodeCfgMgr, clusterSvc, computeComponent, raftMgr, txManager, metaStorageMgr, + clusterCfgMgr, baselineMgr, distributedTblMgr, qryEngine, restComponent, clientHandlerModule, nettyBootstrapFactory)); } } @@ -432,7 +445,7 @@ public class IgniteImpl implements Ignite { @Override public IgniteCompute compute() { if (compute == null) { - compute = new IgniteComputeImpl(); + compute = new IgniteComputeImpl(clusterSvc.topologyService(), computeComponent); } return compute; } @@ -570,6 +583,11 @@ public class IgniteImpl implements Ignite { return partitionsStore; } + @TestOnly + public ClusterNode node() { + return clusterSvc.topologyService().localMember(); + } + /** * Node state. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java index 63a8f7d..8ef320e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModule.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.ignite.configuration.RootKey; import org.apache.ignite.configuration.annotation.ConfigurationType; import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration; +import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration; import org.apache.ignite.configuration.schemas.network.NetworkConfiguration; import org.apache.ignite.configuration.schemas.rest.RestConfiguration; import org.apache.ignite.configuration.schemas.runner.NodeConfiguration; @@ -43,7 +44,8 @@ public class CoreLocalConfigurationModule implements ConfigurationModule { NetworkConfiguration.KEY, NodeConfiguration.KEY, RestConfiguration.KEY, - ClientConnectorConfiguration.KEY + ClientConnectorConfiguration.KEY, + ComputeConfiguration.KEY ); } } diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java index 21f2796..75c87d7 100644 --- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java +++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/CoreLocalConfigurationModuleTest.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.ServiceLoader.Provider; import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration; +import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration; import org.apache.ignite.configuration.schemas.network.NetworkConfiguration; import org.apache.ignite.configuration.schemas.rest.RestConfiguration; import org.apache.ignite.configuration.schemas.runner.NodeConfiguration; @@ -66,6 +67,11 @@ class CoreLocalConfigurationModuleTest { } @Test + void hasComputeConfigurationRoot() { + assertThat(module.rootKeys(), hasItem(ComputeConfiguration.KEY)); + } + + @Test void providesNoValidators() { assertThat(module.validators(), is(anEmptyMap())); } diff --git a/parent/pom.xml b/parent/pom.xml index 12f5e0b..911c381 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -161,12 +161,6 @@ <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-compute-api</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.ignite</groupId> <artifactId>ignite-sql-engine</artifactId> <version>${project.version}</version> </dependency> diff --git a/pom.xml b/pom.xml index 1dbc17d..233be44 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,6 @@ <module>modules/client-common</module> <module>modules/client-handler</module> <module>modules/compute</module> - <module>modules/compute-api</module> <module>modules/configuration</module> <module>modules/configuration-annotation-processor</module> <module>modules/configuration-api</module>