This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 4928ca5 IGNITE-14131 IgniteCompute tasks with same name, running from one node and different ClassLoaders can lead to OOM - Fixes #9020. 4928ca5 is described below commit 4928ca57955a4ab9dd3bbcbad8adf29ddaa36cf2 Author: zstan <stanilov...@gmail.com> AuthorDate: Tue May 11 11:32:46 2021 +0300 IGNITE-14131 IgniteCompute tasks with same name, running from one node and different ClassLoaders can lead to OOM - Fixes #9020. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../main/java/org/apache/ignite/IgniteCompute.java | 4 + .../deployment/GridDeploymentLocalStore.java | 174 ++++++++++---------- .../deployment/GridDeploymentPerLoaderStore.java | 2 +- .../apache/ignite/internal/util/IgniteUtils.java | 14 ++ .../spi/deployment/local/LocalDeploymentSpi.java | 100 +++++++----- .../GridMultipleVersionsDeploymentSelfTest.java | 2 +- .../ignite/internal/GridSpiExceptionSelfTest.java | 3 +- .../IgniteExplicitImplicitDeploymentSelfTest.java | 178 ++++++++++++--------- .../RaceOnDeployClassesWithSameAliases.java | 147 ----------------- .../GridDeploymentManagerStopSelfTest.java | 4 +- .../GridDifferentLocalDeploymentSelfTest.java | 162 +++++++++++++++++++ .../distributed/IgniteCacheSizeFailoverTest.java | 2 +- .../apache/ignite/p2p/GridP2PUndeploySelfTest.java | 41 ++++- .../local/GridLocalDeploymentSpiSelfTest.java | 54 +++++-- .../ignite/testsuites/IgniteP2PSelfTestSuite.java | 6 +- 15 files changed, 523 insertions(+), 370 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index fb2db2f..f686d77 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -371,6 +371,10 @@ public interface IgniteCompute extends IgniteAsyncSupport { * <p> * If task for given name has not been deployed yet, then {@code taskName} will be * used as task class name to auto-deploy the task (see {@link #localDeployTask(Class, ClassLoader)} method). + * <p> + * If class with the same name was deployed more than once, the last deployed version is used. + * If method is called when other threads are deploying other versions of class with the same name there are no + * guarantees which version of the class will be executed. * * @param taskName Name of the task to execute. * @param arg Optional argument of task execution, can be {@code null}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java index 89d7bcf..ecf9bf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java @@ -90,7 +90,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { Map<String, Collection<GridDeployment>> cp; synchronized (mux) { - cp = new HashMap<String, Collection<GridDeployment>>(cache); + cp = new HashMap<>(cache); for (Entry<String, Collection<GridDeployment>> entry : cp.entrySet()) entry.setValue(new ArrayList<>(entry.getValue())); @@ -145,7 +145,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { // Validate metadata. assert alias != null : "Meta is invalid: " + meta; - GridDeployment dep = deployment(alias); + GridDeployment dep = deployment(meta); if (dep != null) { if (log.isDebugEnabled()) @@ -154,74 +154,66 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { return dep; } - DeploymentResource rsrc = spi.findResource(alias); + if (meta.classLoader() == null) { + DeploymentResource rsrc = spi.findResource(alias); - if (rsrc != null) { - dep = deploy(ctx.config().getDeploymentMode(), rsrc.getClassLoader(), rsrc.getResourceClass(), alias, - meta.record()); + if (rsrc != null) { + dep = deploy(ctx.config().getDeploymentMode(), rsrc.getClassLoader(), rsrc.getResourceClass(), alias, + meta.record()); - assert dep != null; + assert dep != null; - if (log.isDebugEnabled()) - log.debug("Acquired deployment class from SPI: " + dep); - } - // Auto-deploy. - else { - ClassLoader ldr = meta.classLoader(); - - if (ldr == null) { - ldr = Thread.currentThread().getContextClassLoader(); - - // Safety. - if (ldr == null) - ldr = U.resolveClassLoader(ctx.config()); - } - - if (ldr instanceof GridDeploymentClassLoader) { if (log.isDebugEnabled()) - log.debug("Skipping local auto-deploy (nested execution) [ldr=" + ldr + ", meta=" + meta + ']'); + log.debug("Acquired deployment class from SPI: " + dep); - return null; + return dep; } + } - while (true) { - try { - // Check that class can be loaded. - String clsName = meta.className(); + // Auto-deploy. + ClassLoader ldr = meta.classLoader(); - Class<?> cls = U.forName(clsName != null ? clsName : alias, ldr); + if (ldr == null) { + ldr = Thread.currentThread().getContextClassLoader(); - spi.register(ldr, cls); + // Safety. + if (ldr == null) + ldr = U.resolveClassLoader(ctx.config()); + } - rsrc = spi.findResource(cls.getName()); + if (ldr instanceof GridDeploymentClassLoader) { + if (log.isDebugEnabled()) + log.debug("Skipping local auto-deploy (nested execution) [ldr=" + ldr + ", meta=" + meta + ']'); - if (rsrc != null && rsrc.getResourceClass().equals(cls)) { - if (log.isDebugEnabled()) - log.debug("Retrieved auto-loaded resource from spi: " + rsrc); + return null; + } - dep = deploy(ctx.config().getDeploymentMode(), ldr, cls, meta.alias(), meta.record()); + try { + // Check that class can be loaded. + String clsName = meta.className(); - if (dep != null) - return dep; - } - else { - U.warn(log, "Failed to find resource from deployment SPI even after registering: " + meta); + Class<?> cls = U.forName(clsName != null ? clsName : alias, ldr); - return null; - } + if (spi.register(ldr, cls)) { + if (log.isDebugEnabled()) { + log.debug("Resource registered automatically: [name=" + U.getResourceName(cls) + + ", class=" + cls.getName() + + ", ldr=" + ldr + ']'); } - catch (ClassNotFoundException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to load class for local auto-deployment [ldr=" + ldr + ", meta=" + meta + ']'); + } - return null; - } - catch (IgniteSpiException e) { - U.error(log, "Failed to deploy local class with meta: " + meta, e); + dep = deploy(ctx.config().getDeploymentMode(), ldr, cls, alias, meta.record()); + } + catch (ClassNotFoundException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to load class for local auto-deployment [ldr=" + ldr + ", meta=" + meta + ']'); - return null; - } - } + return null; + } + catch (IgniteSpiException e) { + U.error(log, "Failed to deploy local class with meta: " + meta, e); + + return null; } if (log.isDebugEnabled()) @@ -232,23 +224,36 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** {@inheritDoc} */ @Override public GridDeployment searchDeploymentCache(GridDeploymentMetadata meta) { - return deployment(meta.alias()); + return deployment(meta); } /** - * @param alias Class alias. + * @param meta Deployment meta. * @return Deployment. */ - @Nullable private GridDeployment deployment(String alias) { - Deque<GridDeployment> deps = cache.get(alias); + @Nullable private GridDeployment deployment(final GridDeploymentMetadata meta) { + Deque<GridDeployment> deps = cache.get(meta.alias()); if (deps != null) { - GridDeployment dep = deps.peekFirst(); + for (GridDeployment dep : deps) { + if (dep.undeployed()) + continue; - if (dep != null && !dep.undeployed()) - return dep; + // local or remote deployment. + if (dep.classLoaderId() == meta.classLoaderId() || dep.classLoader() == meta.classLoader()) { + if (log.isTraceEnabled()) + log.trace("Deployment was found for class with specific class loader [alias=" + meta.alias() + + ", clsLdrId=" + meta.classLoaderId() + "]"); + + return dep; + } + } } + if (log.isDebugEnabled()) + log.debug("Deployment was not found for class with specific class loader [alias=" + meta.alias() + + ", clsLdrId=" + meta.classLoaderId() + "]"); + return null; } @@ -260,8 +265,13 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { * @param recordEvt {@code True} to record event. * @return Deployment. */ - private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class<?> cls, String alias, - boolean recordEvt) { + private GridDeployment deploy( + DeploymentMode depMode, + ClassLoader ldr, + Class<?> cls, + String alias, + boolean recordEvt + ) { GridDeployment dep = null; synchronized (mux) { @@ -294,9 +304,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { cache.put(alias, cachedDeps); - if (!cls.getName().equals(alias)) + if (!cls.getName().equals(alias)) { // Cache by class name as well. cache.put(cls.getName(), cachedDeps); + } return dep; } @@ -318,23 +329,13 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { ConcurrentLinkedDeque::new ); - if (!deps.isEmpty()) { - for (GridDeployment d : deps) { - if (!d.undeployed()) { - U.error(log, "Found more than one active deployment for the same resource " + - "[cls=" + cls + ", depMode=" + depMode + ", dep=" + d + ']'); - - return null; - } - } - } - // Add at the beginning of the list for future fast access. deps.addFirst(dep); - if (!cls.getName().equals(alias)) + if (!cls.getName().equals(alias)) { // Cache by class name as well. cache.put(cls.getName(), deps); + } if (log.isDebugEnabled()) log.debug("Created new deployment: " + dep); @@ -356,20 +357,23 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { if (clsLdr.getClass().equals(GridDeploymentClassLoader.class)) clsLdr = clsLdr.getParent(); - GridDeployment dep = null; + if (spi.register(clsLdr, cls)) { + if (log.isDebugEnabled()) { + log.debug("Resource registered automatically: [name=" + U.getResourceName(cls) + + ", class=" + cls.getName() + ", ldr=" + clsLdr + ']'); + } + } - while (dep == null) { - spi.register(clsLdr, cls); + GridDeploymentMetadata meta = new GridDeploymentMetadata(); - dep = deployment(cls.getName()); + meta.alias(cls.getName()); + meta.classLoader(clsLdr); - if (dep == null) { - DeploymentResource rsrc = spi.findResource(cls.getName()); + GridDeployment dep = deployment(meta); - if (rsrc != null && rsrc.getClassLoader() == clsLdr) - dep = deploy(ctx.config().getDeploymentMode(), rsrc.getClassLoader(), - rsrc.getResourceClass(), rsrc.getName(), true); - } + if (dep == null) { + dep = deploy(ctx.config().getDeploymentMode(), clsLdr, + cls, U.getResourceName(cls), true); } return dep; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java index 0477523..ba68e1bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java @@ -183,7 +183,7 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { /** {@inheritDoc} */ @Override public Collection<GridDeployment> getDeployments() { synchronized (mux) { - return new LinkedList<GridDeployment>(cache.values()); + return new LinkedList<>(cache.values()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index de79bf8..37d6505 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -6024,6 +6024,20 @@ public abstract class IgniteUtils { } /** + * Gets resource name. + * Returns a task name if it is a Compute task or a class name otherwise. + * + * @param rscCls Class of resource. + * @return Name of resource. + */ + public static String getResourceName(Class rscCls) { + if (ComputeTask.class.isAssignableFrom(rscCls)) + return getTaskName(rscCls); + + return rscCls.getName(); + } + + /** * Creates SPI attribute name by adding prefix to the attribute name. * Prefix is an SPI name + '.'. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java index d726baf..1748506 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -82,8 +81,7 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp private IgniteLogger log; /** Map of all resources. */ - private ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs = - new ConcurrentLinkedHashMap<>(16, 0.75f, 64); + private volatile ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs = new ConcurrentLinkedHashMap<>(); /** Deployment SPI listener. */ private volatile DeploymentListener lsnr; @@ -110,36 +108,55 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp log.debug(stopInfo()); } - /** {@inheritDoc} */ + /** + * Finds class loader for the given class. + * + * @param rsrcName Class name or class alias to find class loader for. + * @return Deployed class loader, or {@code null} if not deployed. + */ @Nullable @Override public DeploymentResource findResource(String rsrcName) { - assert rsrcName != null; - - // Last updated class loader has highest priority in search. for (Entry<ClassLoader, ConcurrentMap<String, String>> e : ldrRsrcs.descendingEntrySet()) { ClassLoader ldr = e.getKey(); ConcurrentMap<String, String> rsrcs = e.getValue(); - String clsName = rsrcs.get(rsrcName); + DeploymentResourceAdapter res = findResource0(rsrcs, rsrcName, ldr); - // Return class if it was found in resources map. - if (clsName != null) { - // Recalculate resource name in case if access is performed by - // class name and not the resource name. - rsrcName = getResourceName(clsName, rsrcs); + if (res != null) + return res; + } - assert clsName != null; + return null; + } - try { - Class<?> cls = Class.forName(clsName, true, ldr); + /** + * Finds appropriate resource. + * + * @param rsrcs Resources. + * @param rsrcName Class name or class alias to find class loader for. + * @param clsLdr desired class loader. + * @return Deployed class loader, or {@code null} if not deployed. + */ + @Nullable private DeploymentResourceAdapter findResource0(Map<String, String> rsrcs, String rsrcName, ClassLoader clsLdr) { + String clsName = rsrcs.get(rsrcName); - assert cls != null; + // Return class if it was found in resources map. + if (clsName != null) { + // Recalculate resource name in case if access is performed by + // class name and not the resource name. + rsrcName = getResourceName(clsName, rsrcs); - // Return resource. - return new DeploymentResourceAdapter(rsrcName, cls, ldr); - } - catch (ClassNotFoundException ignored) { - // No-op. - } + assert clsName != null; + + try { + Class<?> cls = U.forName(clsName, clsLdr); + + assert cls != null; + + // Return resource. + return new DeploymentResourceAdapter(rsrcName, cls, clsLdr); + } + catch (ClassNotFoundException e) { + log.warning("Can`t find appropriate class. ", e); } } @@ -175,31 +192,31 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp if (log.isDebugEnabled()) log.debug("Registering [ldrRsrcs=" + ldrRsrcs + ", ldr=" + ldr + ", rsrc=" + rsrc + ']'); - ConcurrentMap<String, String> clsLdrRsrcs = ldrRsrcs.getSafe(ldr); - - if (clsLdrRsrcs == null) { - ConcurrentMap<String, String> old = ldrRsrcs.putIfAbsent(ldr, - clsLdrRsrcs = new ConcurrentHashMap<>()); - - if (old != null) - clsLdrRsrcs = old; - } + Map<String, String> newRsrcs; - Map<String, String> newRsrcs = addResource(ldr, clsLdrRsrcs, rsrc); + ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs0 = + new ConcurrentLinkedHashMap<>(ldrRsrcs); - Collection<ClassLoader> rmvClsLdrs = null; + ConcurrentMap<String, String> clsLdrRsrcs = ldrRsrcs0.getSafe(ldr); - if (!F.isEmpty(newRsrcs)) { - rmvClsLdrs = new LinkedList<>(); + // move forward, localDeployTask compatibility issue. + if (clsLdrRsrcs != null) { + if (ldrRsrcs0.remove(ldr) != null) { + ldrRsrcs0.put(ldr, clsLdrRsrcs); - removeResources(ldr, newRsrcs, rmvClsLdrs); + ldrRsrcs = ldrRsrcs0; + } } + else { + ConcurrentMap<String, String> old = ldrRsrcs.putIfAbsent(ldr, + clsLdrRsrcs == null ? clsLdrRsrcs = new ConcurrentLinkedHashMap<>() : clsLdrRsrcs); - if (rmvClsLdrs != null) { - for (ClassLoader cldLdr : rmvClsLdrs) - onClassLoaderReleased(cldLdr); + if (old != null) + clsLdrRsrcs = old; } + newRsrcs = addResource(ldr, clsLdrRsrcs, rsrc); + return !F.isEmpty(newRsrcs); } @@ -267,11 +284,12 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp String oldCls = ldrRsrcs.putIfAbsent(entry.getKey(), entry.getValue()); if (oldCls != null) { - if (!oldCls.equals(entry.getValue())) + if (!oldCls.equals(entry.getValue())) { throw new IgniteSpiException("Failed to register resources with given task name " + "(found another class with same task name in the same class loader) " + "[taskName=" + entry.getKey() + ", existingCls=" + oldCls + ", newCls=" + entry.getValue() + ", ldr=" + ldr + ']'); + } } else { // New resource was added. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java index 95258cc..64f0742 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultipleVersionsDeploymentSelfTest.java @@ -211,7 +211,7 @@ public class GridMultipleVersionsDeploymentSelfTest extends GridCommonAbstractTe // Since we loader task/job classes with different class loaders we cannot // use any kind of mutex because of the illegal state exception. // We have to use timer here. DO NOT CHANGE 2 seconds here. - Thread.sleep(2000); + Thread.sleep(1000); // Deploy new one - this should move first task to the obsolete list. g1.compute().localDeployTask(taskCls2, ldr2); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java index 0c8bf18..c7002fe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java @@ -143,8 +143,7 @@ public class GridSpiExceptionSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Nullable @Override public DeploymentResource findResource(String rsrcName) { - // No-op. + @Override public DeploymentResource findResource(String rsrcName) { return null; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteExplicitImplicitDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteExplicitImplicitDeploymentSelfTest.java index b38ede2..31839eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteExplicitImplicitDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteExplicitImplicitDeploymentSelfTest.java @@ -39,11 +39,11 @@ import org.apache.ignite.compute.ComputeTaskAdapter; import org.apache.ignite.compute.ComputeTaskName; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.testframework.GridTestClassLoader; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; import org.jetbrains.annotations.NotNull; @@ -104,17 +104,59 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract execExplicitDeployP2P(true, true, true); } - /** - * @param ignite Grid. + /** Calls async compute execution with Class of the task. + * + * @param ignite Ignite server instance. + * @param client Ignite client instance. + * @param taskCls Class to compute. + * @param expected Expected result. */ - private void stopGrid(Ignite ignite) { - try { - if (ignite != null) - G.stop(ignite.name(), true); - } - catch (Throwable e) { - error("Got error when stopping grid.", e); - } + private IgniteInternalFuture runAsyncByClass( + final IgniteEx ignite, + final IgniteEx client, + Class<? extends ComputeTask<String, Integer>> taskCls, + int expected + ) { + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + for (int i = 0; i < 10; ++i) { + Integer res1 = ignite.compute().execute(taskCls, null); + assertNotNull(res1); + assertEquals("Invalid res1: ", expected, (int)res1); + + res1 = client.compute(ignite.compute().clusterGroup().forNodeId(ignite.localNode().id())).execute(taskCls, null); + assertNotNull(res1); + assertEquals("Invalid res1: ", expected, (int)res1); + } + }); + + return f; + } + + /** Calls async compute execution with class instance. + * @param ignite Ignite server instance. + * @param client Ignite client instance. + * @param taskCls Instance to compute. + * @param expected Expected result. + */ + private IgniteInternalFuture runAsyncByInstance( + final IgniteEx ignite, + final IgniteEx client, + ComputeTask<String, Integer> taskCls, + int expected + ) { + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + for (int i = 0; i < 10; ++i) { + Integer res1 = ignite.compute().execute(taskCls, null); + assertNotNull(res1); + assertEquals("Invalid res: ", expected, (int)res1); + + res1 = client.compute(ignite.compute().clusterGroup().forNodeId(ignite.localNode().id())).execute(taskCls, null); + assertNotNull(res1); + assertEquals("Invalid res: ", expected, (int)res1); + } + }); + + return f; } /** @@ -124,7 +166,7 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract * @throws Exception If test failed. */ private void execExplicitDeployLocally(boolean byCls, boolean byTask, boolean byName) throws Exception { - Ignite ignite = null; + Ignite ignite; try { ignite = startGrid(); @@ -171,14 +213,14 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract if (byName) { ignite.compute().localDeployTask(taskCls, ldr1); - Integer res = (Integer) ignite.compute().execute(taskCls.getName(), null); + Integer res = ignite.compute().execute(taskCls.getName(), null); assert res != null; assert res == 1 : "Invalid response: " + res; } } finally { - stopGrid(ignite); + stopAllGrids(); } } @@ -189,11 +231,11 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract * @throws Exception If test failed. */ private void execImplicitDeployLocally(boolean byCls, boolean byTask, boolean byName) throws Exception { - Ignite ignite = null; + final IgniteEx ignite = startGrids(1); - try { - ignite = startGrid(); + final IgniteEx client = startClientGrid(1); + try { // First task class loader. ClassLoader ldr1 = new GridTestClassLoader( Collections.singletonMap("testResource", "1"), @@ -218,45 +260,44 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); if (byCls) { - Integer res1 = ignite.compute().execute(taskCls1, null); - Integer res2 = ignite.compute().execute(taskCls2, null); + IgniteInternalFuture f1 = runAsyncByClass(ignite, client, taskCls1, 1); - assert res1 != null; - assert res2 != null; + IgniteInternalFuture f2 = runAsyncByClass(ignite, client, taskCls2, 2); - assert res1 == 1 : "Invalid res1: " + res1; - assert res2 == 2 : "Invalid res2: " + res2; + f1.get(); f2.get(); } if (byTask) { - Integer res1 = ignite.compute().execute(taskCls1.newInstance(), null); - Integer res2 = ignite.compute().execute(taskCls2.newInstance(), null); + final ComputeTask<String, Integer> tc1 = taskCls1.newInstance(); + final ComputeTask<String, Integer> tc2 = taskCls2.newInstance(); - assert res1 != null; - assert res2 != null; + IgniteInternalFuture f1 = runAsyncByInstance(ignite, client, tc1, 1); - assert res1 == 1 : "Invalid res1: " + res1; - assert res2 == 2 : "Invalid res2: " + res2; + IgniteInternalFuture f2 = runAsyncByInstance(ignite, client, tc2, 2); + + f1.get(); f2.get(); } if (byName) { ignite.compute().localDeployTask(taskCls1, ldr1); - Integer res1 = (Integer) ignite.compute().execute(taskCls1.getName(), null); + Integer res1 = ignite.compute().execute(taskCls1.getName(), null); + + assert res1 != null; + + assert res1 == 1 : "Invalid res1: " + res1; ignite.compute().localDeployTask(taskCls2, ldr2); - Integer res2 = (Integer) ignite.compute().execute(taskCls2.getName(), null); + Integer res2 = ignite.compute().execute(taskCls2.getName(), null); - assert res1 != null; assert res2 != null; - assert res1 == 1 : "Invalid res1: " + res1; assert res2 == 2 : "Invalid res2: " + res2; } } finally { - stopGrid(ignite); + stopAllGrids(); } } @@ -267,12 +308,8 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract * @throws Exception If test failed. */ private void execExplicitDeployP2P(boolean byCls, boolean byTask, boolean byName) throws Exception { - Ignite ignite1 = null; - Ignite ignite2 = null; - try { - ignite1 = startGrid(1); - ignite2 = startGrid(2); + IgniteEx ignite = startGrids(2); ClassLoader ldr1 = new GridTestClassLoader( Collections.singletonMap("testResource", "1"), @@ -292,41 +329,40 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); if (byCls) { - ignite1.compute().localDeployTask(taskCls, ldr1); + ignite.compute().localDeployTask(taskCls, ldr1); // Even though the task is deployed with resource class loader, // when we execute it, it will be redeployed with task class-loader. - Integer res = ignite1.compute().execute(taskCls, null); + Integer res = ignite.compute().execute(taskCls, null); assert res != null; assert res == 2 : "Invalid response: " + res; } if (byTask) { - ignite1.compute().localDeployTask(taskCls, ldr1); + ignite.compute().localDeployTask(taskCls, ldr1); // Even though the task is deployed with resource class loader, // when we execute it, it will be redeployed with task class-loader. - Integer res = ignite1.compute().execute(taskCls.newInstance(), null); + Integer res = ignite.compute().execute(taskCls.newInstance(), null); assert res != null; assert res == 2 : "Invalid response: " + res; } if (byName) { - ignite1.compute().localDeployTask(taskCls, ldr1); + ignite.compute().localDeployTask(taskCls, ldr1); // Even though the task is deployed with resource class loader, // when we execute it, it will be redeployed with task class-loader. - Integer res = (Integer) ignite1.compute().execute(taskCls.getName(), null); + Integer res = ignite.compute().execute(taskCls.getName(), null); assert res != null; assert res == 1 : "Invalid response: " + res; } } finally { - stopGrid(ignite2); - stopGrid(ignite1); + stopAllGrids(); } } @@ -337,12 +373,10 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract * @throws Exception If test failed. */ private void execImplicitDeployP2P(boolean byCls, boolean byTask, boolean byName) throws Exception { - Ignite ignite1 = null; - Ignite ignite2 = null; - try { - ignite1 = startGrid(1); - ignite2 = startGrid(2); + IgniteEx ignite = startGrids(1); + + final IgniteEx client = startClientGrid(1); ClassLoader ldr1 = new GridTestClassLoader( Collections.singletonMap("testResource", "1"), @@ -365,46 +399,40 @@ public class IgniteExplicitImplicitDeploymentSelfTest extends GridCommonAbstract ldr2.loadClass(GridDeploymentResourceTestTask.class.getName()); if (byCls) { - Integer res1 = ignite1.compute().execute(taskCls1, null); - Integer res2 = ignite1.compute().execute(taskCls2, null); + IgniteInternalFuture f1 = runAsyncByClass(ignite, client, taskCls1, 1); - assert res1 != null; - assert res2 != null; + IgniteInternalFuture f2 = runAsyncByClass(ignite, client, taskCls2, 2); - assert res1 == 1 : "Invalid res1: " + res1; - assert res2 == 2 : "Invalid res2: " + res2; + f1.get(); f2.get(); } if (byTask) { - Integer res1 = ignite1.compute().execute(taskCls1.newInstance(), null); - Integer res2 = ignite1.compute().execute(taskCls2.newInstance(), null); + final ComputeTask<String, Integer> tc1 = taskCls1.newInstance(); + final ComputeTask<String, Integer> tc2 = taskCls2.newInstance(); - assert res1 != null; - assert res2 != null; + IgniteInternalFuture f1 = runAsyncByInstance(ignite, client, tc1, 1); - assert res1 == 1 : "Invalid res1: " + res1; - assert res2 == 2 : "Invalid res2: " + res2; + IgniteInternalFuture f2 = runAsyncByInstance(ignite, client, tc2, 2); + + f1.get(); f2.get(); } if (byName) { - ignite1.compute().localDeployTask(taskCls1, ldr1); - - Integer res1 = (Integer) ignite1.compute().execute(taskCls1.getName(), null); + ignite.compute().localDeployTask(taskCls1, ldr1); + Integer res1 = ignite.compute().execute(taskCls1.getName(), null); - ignite1.compute().localDeployTask(taskCls2, ldr2); + assert res1 != null; + assertEquals("Invalid res1: ", 1, (int)res1); - Integer res2 = (Integer) ignite1.compute().execute(taskCls2.getName(), null); + ignite.compute().localDeployTask(taskCls2, ldr2); + Integer res2 = ignite.compute().execute(taskCls2.getName(), null); - assert res1 != null; assert res2 != null; - - assert res1 == 1 : "Invalid res1: " + res1; - assert res2 == 2 : "Invalid res2: " + res2; + assertEquals("Invalid res2: ", 2, (int)res2); } } finally { - stopGrid(ignite1); - stopGrid(ignite2); + stopAllGrids(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/RaceOnDeployClassesWithSameAliases.java b/modules/core/src/test/java/org/apache/ignite/internal/RaceOnDeployClassesWithSameAliases.java deleted file mode 100644 index 3f9b923..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/RaceOnDeployClassesWithSameAliases.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; -import org.apache.ignite.Ignite; -import org.apache.ignite.cache.CacheEntryProcessor; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.compute.ComputeTaskAdapter; -import org.apache.ignite.compute.ComputeTaskName; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.failure.StopNodeFailureHandler; -import org.apache.ignite.testframework.ListeningTestLogger; -import org.apache.ignite.testframework.LogListener; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.testframework.junits.common.GridCommonTest; -import org.junit.Test; - -/** - * Tets reproduces issue which can happens deploing classes to local store from difference class loaders. - */ -@GridCommonTest(group = "P2P") -public class RaceOnDeployClassesWithSameAliases extends GridCommonAbstractTest { - /** Listening logger. */ - private final ListeningTestLogger listeningLog = new ListeningTestLogger(true, log); - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - listeningLog.clearListeners(); - - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return super.getConfiguration(igniteInstanceName) - .setGridLogger(listeningLog) - .setPeerClassLoadingEnabled(true) - .setFailureHandler(new StopNodeFailureHandler()) - .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); - } - - /** - * Test loads class with same alias in time another class deploying. - * - * @throws Exception If failed. - */ - @Test - public void test() throws Exception { - setRootLoggerDebugLevel(); - - IgniteEx crd = startGrids(1); - - Ignite client = startClientGrid("client"); - - awaitPartitionMapExchange(); - - AtomicBoolean deployed = new AtomicBoolean(); - - LogListener logLsnr = LogListener.matches(logStr -> { - if (logStr.startsWith("Retrieved auto-loaded resource from spi:") && - logStr.contains(TestCacheEntryProcessor.class.getSimpleName())) { - - if (deployed.compareAndSet(false, true)) { - System.out.println("dirty getting a breakdown location " + logStr); - - crd.compute().localDeployTask(TestTask.class, new TestClassLoader()); - } - return true; - } - - return false; - }).build(); - - listeningLog.registerListener(logLsnr); - - client.cache(DEFAULT_CACHE_NAME).invoke(1, new TestCacheEntryProcessor()); - - assertTrue(logLsnr.check()); - } - - /** - * Test entry processor. - */ - private static class TestCacheEntryProcessor implements CacheEntryProcessor<Object, Object, Object> { - /** {@inheritDoc} */ - @Override public Object process( - MutableEntry<Object, Object> entry, - Object... objects - ) throws EntryProcessorException { - return 2; - } - } - - /** - * That is a compute task with same aliase as entry processor {@link TestCacheEntryProcessor}. - */ - @ComputeTaskName("org.apache.ignite.internal.RaceOnDeployClassesWithSameAliases$TestCacheEntryProcessor") - private static class TestTask extends ComputeTaskAdapter<Object, Object> { - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) { - assert false; - - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ - @Override public Object reduce(List<ComputeJobResult> results) { - return new Object(); - } - } - - /** - * Test {@link ClassLoader}. - */ - private static class TestClassLoader extends ClassLoader { - /** {@inheritDoc} */ - @Override protected Class<?> findClass(String name) throws ClassNotFoundException { - return Thread.currentThread().getContextClassLoader().loadClass(name); - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java index 29ce8ad..73bb3e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java @@ -94,7 +94,9 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest { @Override public String getName() { return getClass().getSimpleName(); } /** {@inheritDoc} */ - @Override public DeploymentResource findResource(String rsrcName) { return null; } + @Override public DeploymentResource findResource(String rsrcName) { + return null; + } /** {@inheritDoc} */ @Override public boolean register(ClassLoader ldr, Class<?> rsrc) throws IgniteSpiException { return false; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java new file mode 100644 index 0000000..bc02380 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.deployment; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.configuration.DeploymentMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** Multiple local deployments. */ +public class GridDifferentLocalDeploymentSelfTest extends GridCommonAbstractTest { + /** Task name. */ + private static final String TASK_NAME1 = "org.apache.ignite.tests.p2p.P2PTestTaskExternalPath1"; + + /** Task name. */ + private static final String TASK_NAME2 = "org.apache.ignite.tests.p2p.P2PTestTaskExternalPath2"; + + /** */ + private DeploymentMode depMode = DeploymentMode.PRIVATE; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDeploymentMode(depMode); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Test GridDeploymentMode.SHARED mode. + * + * @throws Exception if error occur. + */ + @Test + public void testCheckTaskClassloaderCacheSharedMode() throws Exception { + testCheckTaskClassloaderCache(DeploymentMode.SHARED); + } + + /** + * Test GridDeploymentMode.PRIVATE mode. + * + * @throws Exception if error occur. + */ + @Test + public void testCheckTaskClassloaderCachePrivateMode() throws Exception { + testCheckTaskClassloaderCache(DeploymentMode.PRIVATE); + } + + /** + * Test GridDeploymentMode.ISOLATED mode. + * + * @throws Exception if error occur. + */ + @Test + public void testCheckTaskClassloaderCacheIsolatedMode() throws Exception { + testCheckTaskClassloaderCache(DeploymentMode.ISOLATED); + } + + /** + * Test GridDeploymentMode.CONTINUOUS mode. + * + * @throws Exception if error occur. + */ + @Test + public void testCheckTaskClassloaderCacheContinuousMode() throws Exception { + testCheckTaskClassloaderCache(DeploymentMode.CONTINUOUS); + } + + /** */ + public void testCheckTaskClassloaderCache(DeploymentMode depMode) throws Exception { + this.depMode = depMode; + + IgniteEx server = startGrid(0); + + IgniteEx client = startClientGrid(1); + + ClassLoader clsLdr1 = getExternalClassLoader(); + + ClassLoader clsLdr2 = getExternalClassLoader(); + + Class<ComputeTask> taskCls11 = (Class<ComputeTask>) clsLdr1.loadClass(TASK_NAME1); + Class<ComputeTask> taskCls12 = (Class<ComputeTask>) clsLdr2.loadClass(TASK_NAME1); + Class<ComputeTask> taskCls21 = (Class<ComputeTask>) clsLdr2.loadClass(TASK_NAME2); + + IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> { + for (int i = 0; i < 10; ++i) { + try { + client.compute().execute(taskCls11.newInstance(), server.localNode().id()); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + IgniteInternalFuture f2 = GridTestUtils.runAsync(() -> { + for (int i = 0; i < 10; ++i) { + try { + client.compute().execute(taskCls12.newInstance(), server.localNode().id()); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + f1.get(); f2.get(); + + client.compute().execute(taskCls21.newInstance(), server.localNode().id()); + + GridDeploymentManager deploymentMgr = client.context().deploy(); + + GridDeploymentStore store = GridTestUtils.getFieldValue(deploymentMgr, "locStore"); + + ConcurrentMap<String, Deque<GridDeployment>> cache = GridTestUtils.getFieldValue(store, "cache"); + + assertEquals(2, cache.get(TASK_NAME1).size()); + + deploymentMgr = server.context().deploy(); + + GridDeploymentStore verStore = GridTestUtils.getFieldValue(deploymentMgr, "verStore"); + + // deployments per userVer map. + Map<String, List<Object>> varCache = GridTestUtils.getFieldValue(verStore, "cache"); + + if (depMode == DeploymentMode.CONTINUOUS || depMode == DeploymentMode.SHARED) { + for (List<Object> deps : varCache.values()) + assertEquals(2, deps.size()); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java index 99c2b7d..c8c17e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java @@ -75,7 +75,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest { final AtomicInteger cntr = new AtomicInteger(); IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { + @Override public Object call() { int idx = cntr.getAndIncrement() % 2; IgniteCache<Object, Object> cache = ignite(idx).cache(DEFAULT_CACHE_NAME); diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PUndeploySelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PUndeploySelfTest.java index 17ed730..7c8c490 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PUndeploySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PUndeploySelfTest.java @@ -21,16 +21,19 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import org.apache.ignite.Ignite; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi; import org.apache.ignite.testframework.GridTestClassLoader; import org.apache.ignite.testframework.config.GridTestProperties; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; +import org.jsr166.ConcurrentLinkedHashMap; import org.junit.Test; /** @@ -95,6 +98,9 @@ public class GridP2PUndeploySelfTest extends GridCommonAbstractTest { assert spi1.findResource(task1.getName()) != null; assert spi2.findResource(task1.getName()) != null; + checkResourceRegisteredInSpi(tstClsLdr, task1, spi1, true); + checkResourceRegisteredInSpi(tstClsLdr, task1, spi2, true); + assert ignite1.compute().localTasks().containsKey(task1.getName()); assert ignite2.compute().localTasks().containsKey(task1.getName()); @@ -106,6 +112,9 @@ public class GridP2PUndeploySelfTest extends GridCommonAbstractTest { assert spi1.findResource(task1.getName()) == null; assert spi2.findResource(task1.getName()) == null; + checkResourceRegisteredInSpi(tstClsLdr, task1, spi1, false); + checkResourceRegisteredInSpi(tstClsLdr, task1, spi2, false); + assert !ignite1.compute().localTasks().containsKey(task1.getName()); assert !ignite2.compute().localTasks().containsKey(task1.getName()); } @@ -116,6 +125,30 @@ public class GridP2PUndeploySelfTest extends GridCommonAbstractTest { } /** + * Checks the resource is registered in SPI. + * + * @param tstClsLdr Class loader. + * @param task Task resource. + * @param spi Deployment SPI. + * @param registered True id the resource registered, false otherwise. + */ + private void checkResourceRegisteredInSpi(ClassLoader tstClsLdr, Class<? extends ComputeTask<?, ?>> task, + LocalDeploymentSpi spi, boolean registered) { + ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs = U.field(spi, "ldrRsrcs"); + + ConcurrentMap<String, String> rcsAliasMap = ldrRsrcs.get(tstClsLdr); + + if (registered) { + assertNotNull(rcsAliasMap.get(U.getResourceName(task))); + assertNotNull(rcsAliasMap.get(task.getName())); + } + else { + assertTrue(rcsAliasMap == null || rcsAliasMap.get(U.getResourceName(task)) == null); + assertTrue(rcsAliasMap == null || rcsAliasMap.get(task.getName()) == null); + } + } + + /** * @param depMode deployment mode. * @throws Exception If failed. */ @@ -139,20 +172,20 @@ public class GridP2PUndeploySelfTest extends GridCommonAbstractTest { LocalDeploymentSpi spi1 = spis.get(ignite1.name()); LocalDeploymentSpi spi2 = spis.get(ignite2.name()); - assert spi1.findResource(task1.getName()) != null; + checkResourceRegisteredInSpi(ldr, task1, spi1, true); assert ignite1.compute().localTasks().containsKey(task1.getName()); // P2P deployment will not deploy task into the SPI. - assert spi2.findResource(task1.getName()) == null; + checkResourceRegisteredInSpi(ldr, task1, spi2, false); ignite1.compute().undeployTask(task1.getName()); // Wait for undeploy. Thread.sleep(1000); - assert spi1.findResource(task1.getName()) == null; - assert spi2.findResource(task1.getName()) == null; + checkResourceRegisteredInSpi(ldr, task1, spi1, false); + checkResourceRegisteredInSpi(ldr, task1, spi2, false); assert !ignite1.compute().localTasks().containsKey(task1.getName()); assert !ignite2.compute().localTasks().containsKey(task1.getName()); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/deployment/local/GridLocalDeploymentSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/deployment/local/GridLocalDeploymentSpiSelfTest.java index 0972898..e0fede4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/deployment/local/GridLocalDeploymentSpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/deployment/local/GridLocalDeploymentSpiSelfTest.java @@ -25,16 +25,19 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskName; import org.apache.ignite.compute.ComputeTaskSplitAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.deployment.DeploymentListener; import org.apache.ignite.spi.deployment.DeploymentResource; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; +import org.jsr166.ConcurrentLinkedHashMap; import org.junit.Test; /** @@ -96,20 +99,34 @@ public class GridLocalDeploymentSpiSelfTest extends GridSpiAbstractTest<LocalDep deploy(task); + checkResourceRegisteredInSpi(task.getClassLoader(), taskName, getSpi(), true); + + Map<String, String> rcsMap = new HashMap<>(2); + rcsMap.put(taskName, task.getName()); + rcsMap.put(task.getName(), task.getName()); + // Note we use task name instead of class name. - DeploymentResource t1 = getSpi().findResource(taskName); + DeploymentResource t1 = U.invoke( + getSpi().getClass(), + getSpi(), + "findResource0", + rcsMap, + taskName, + task.getClassLoader() + ); + + assertNotNull(t1); - assert t1 != null; + assertSame(t1.getResourceClass(), task); - assert t1.getResourceClass().equals(task); - assert t1.getName().equals(taskName); + assertEquals(t1.getName(), taskName); getSpi().unregister(taskName); checkUndeployed(task); - assert getSpi().findResource(taskName) == null; - assert getSpi().findResource(task.getName()) == null; + checkResourceRegisteredInSpi(task.getClassLoader(), taskName, getSpi(), false); + checkResourceRegisteredInSpi(task.getClassLoader(), task.getName(), getSpi(), false); } /** @@ -139,7 +156,7 @@ public class GridLocalDeploymentSpiSelfTest extends GridSpiAbstractTest<LocalDep checkUndeployed(t1); - assert getSpi().findResource("GridDeploymentTestTask") == null; + checkResourceRegisteredInSpi(t1.getClassLoader(), "GridDeploymentTestTask", getSpi(), false); tasks.clear(); @@ -158,8 +175,27 @@ public class GridLocalDeploymentSpiSelfTest extends GridSpiAbstractTest<LocalDep checkUndeployed(t1); - assert getSpi().findResource(taskName) == null; - assert getSpi().findResource(t1.getName()) == null; + checkResourceRegisteredInSpi(t1.getClassLoader(), taskName, getSpi(), false); + checkResourceRegisteredInSpi(t1.getClassLoader(), t1.getName(), getSpi(), false); + } + + /** + * Checks the resource is registered in SPI. + * + * @param tstClsLdr Class loader. + * @param taskName Name of resource. + * @param spi Deployment SPI. + * @param registered True id the resource registered, false otherwise. + */ + private void checkResourceRegisteredInSpi(ClassLoader tstClsLdr, String taskName, LocalDeploymentSpi spi, boolean registered) { + ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs = U.field(spi, "ldrRsrcs"); + + ConcurrentMap<String, String> rcsAliasMap = ldrRsrcs.get(tstClsLdr); + + if (registered) + assertNotNull(rcsAliasMap.get(taskName)); + else + assertTrue(rcsAliasMap == null || rcsAliasMap.get(taskName) == null); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java index abee2b2..e788aea 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java @@ -18,8 +18,8 @@ package org.apache.ignite.testsuites; import org.apache.ignite.internal.GridP2PAffinitySelfTest; -import org.apache.ignite.internal.RaceOnDeployClassesWithSameAliases; import org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest; +import org.apache.ignite.internal.managers.deployment.GridDifferentLocalDeploymentSelfTest; import org.apache.ignite.internal.managers.deployment.P2PCacheOperationIntoComputeTest; import org.apache.ignite.p2p.DeploymentClassLoaderCallableTest; import org.apache.ignite.p2p.GridP2PClassLoadingSelfTest; @@ -73,10 +73,10 @@ import org.junit.runners.Suite; GridDeploymentMessageCountSelfTest.class, GridP2PComputeWithNestedEntryProcessorTest.class, GridP2PCountTiesLoadClassDirectlyFromClassLoaderTest.class, - RaceOnDeployClassesWithSameAliases.class, GridP2PScanQueryWithTransformerTest.class, P2PCacheOperationIntoComputeTest.class, - GridP2PContinuousDeploymentClientDisconnectTest.class + GridP2PContinuousDeploymentClientDisconnectTest.class, + GridDifferentLocalDeploymentSelfTest.class, }) public class IgniteP2PSelfTestSuite { }