This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4984f181e4f IGNITE-27871 Fix local classes deployment lookup
contention when peerClassLoadingEnabled=true - Fixes #12760.
4984f181e4f is described below
commit 4984f181e4fabde51d6888dbba9b339381782263
Author: Valuyskiy.O.Y <[email protected]>
AuthorDate: Tue Apr 7 09:59:44 2026 +0300
IGNITE-27871 Fix local classes deployment lookup contention when
peerClassLoadingEnabled=true - Fixes #12760.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../deployment/GridDeploymentLocalStore.java | 196 ++++++++++-----------
.../GridDeploymentLocalStoreReuseTest.java | 114 ++++++++++++
.../GridDifferentLocalDeploymentSelfTest.java | 6 +-
.../ignite/testsuites/IgniteP2PSelfTestSuite.java | 4 +-
4 files changed, 217 insertions(+), 103 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index 06e6d2653bb..d9972a36bd8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,19 +17,13 @@
package org.apache.ignite.internal.managers.deployment;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Deque;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
@@ -39,7 +33,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.util.GridAnnotationsCache;
import org.apache.ignite.internal.util.GridClassLoaderCache;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -60,8 +53,11 @@ import static
org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
* Storage for local deployments.
*/
class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
- /** Deployment cache by class name. */
- private final ConcurrentMap<String, Deque<GridDeployment>> cache = new
ConcurrentHashMap<>();
+ /** Primary index: deployment by class loader. Not thread-safe, access
must be guarded by {@link #mux}. */
+ private final Map<ClassLoader, GridDeployment> depByLdr = new
IdentityHashMap<>();
+
+ /** Secondary index: deployments by alias or class name. Not thread-safe,
access must be guarded by {@link #mux}. */
+ private final Map<String, Map<ClassLoader, GridDeployment>> depsByAlias =
new HashMap<>();
/** Mutex. */
private final Object mux = new Object();
@@ -87,19 +83,14 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
@Override public void stop() {
spi.setListener(null);
- Map<String, Collection<GridDeployment>> cp;
+ Set<ClassLoader> ldrs = U.newIdentityHashSet();
synchronized (mux) {
- cp = new HashMap<>(cache);
-
- for (Entry<String, Collection<GridDeployment>> entry :
cp.entrySet())
- entry.setValue(new ArrayList<>(entry.getValue()));
+ ldrs.addAll(depByLdr.keySet());
}
- for (Collection<GridDeployment> deps : cp.values()) {
- for (GridDeployment cls : deps)
- undeploy(cls.classLoader());
- }
+ for (ClassLoader ldr : ldrs)
+ undeploy(ldr);
if (log.isDebugEnabled())
log.debug(stopInfo());
@@ -110,11 +101,10 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
Set<ClassLoader> obsoleteClsLdrs = U.newIdentityHashSet();
synchronized (mux) {
- // There can be obsolete class loaders in cache after client node
reconnect with the new node id.
- for (Entry<String, Deque<GridDeployment>> entry : cache.entrySet())
- for (GridDeployment dep : entry.getValue())
- if
(!dep.classLoaderId().globalId().equals(ctx.localNodeId()))
- obsoleteClsLdrs.add(dep.classLoader());
+ // There can be obsolete class loaders in deployment indexes after
client node reconnect with the new node id.
+ for (GridDeployment dep : depByLdr.values())
+ if (!dep.classLoaderId().globalId().equals(ctx.localNodeId()))
+ obsoleteClsLdrs.add(dep.classLoader());
}
for (ClassLoader clsLdr : obsoleteClsLdrs)
@@ -123,13 +113,10 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
/** {@inheritDoc} */
@Override public Collection<GridDeployment> getDeployments() {
- Collection<GridDeployment> deps = new ArrayList<>();
+ Collection<GridDeployment> deps = U.newIdentityHashSet();
synchronized (mux) {
- for (Deque<GridDeployment> depList : cache.values())
- for (GridDeployment d : depList)
- if (!deps.contains(d))
- deps.add(d);
+ deps.addAll(depByLdr.values());
}
return deps;
@@ -138,10 +125,9 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
/** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
synchronized (mux) {
- for (Deque<GridDeployment> deps : cache.values())
- for (GridDeployment dep : deps)
- if (dep.classLoaderId().equals(ldrId))
- return dep;
+ for (GridDeployment dep : depByLdr.values())
+ if (dep.classLoaderId().equals(ldrId))
+ return dep;
}
for (GridDeployment dep : ctx.task().getUsedDeployments())
@@ -248,22 +234,55 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
* @return Deployment.
*/
@Nullable private GridDeployment deployment(final GridDeploymentMetadata
meta) {
- Deque<GridDeployment> deps = cache.get(meta.alias());
+ Map<ClassLoader, GridDeployment> deps;
+
+ synchronized (mux) {
+ Map<ClassLoader, GridDeployment> cached =
depsByAlias.get(meta.alias());
+
+ deps = cached == null ? null : new IdentityHashMap<>(cached);
+ }
if (deps != null) {
- for (GridDeployment dep : deps) {
- if (dep.undeployed())
- continue;
+ GridDeployment dep = null;
- // local or remote deployment.
- if (dep.classLoaderId() == meta.classLoaderId() ||
dep.classLoader() == meta.classLoader()) {
- if (log.isTraceEnabled())
- log.trace("Deployment was found for class with
specific class loader [alias=" + meta.alias() +
- ", clsLdrId=" + meta.classLoaderId() + "]");
+ if (meta.classLoader() != null)
+ dep = deps.get(meta.classLoader());
- return dep;
+ if ((dep == null || dep.undeployed()) && meta.classLoaderId() !=
null) {
+ for (GridDeployment d : deps.values()) {
+ if (d.classLoaderId().equals(meta.classLoaderId()) &&
!d.undeployed()) {
+ dep = d;
+
+ break;
+ }
}
}
+
+ if (dep != null && !dep.undeployed()) {
+ if (log.isTraceEnabled())
+ log.trace("Deployment was found for class with specific
class loader [alias=" + meta.alias() +
+ ", clsLdrId=" + meta.classLoaderId() + "]");
+
+ return dep;
+ }
+
+ ClassLoader appLdr =
Thread.currentThread().getContextClassLoader();
+
+ if (appLdr == null)
+ appLdr = U.resolveClassLoader(ctx.config());
+
+ appLdr = (appLdr instanceof GridDeploymentClassLoader) ? null :
appLdr;
+
+ if (appLdr != null)
+ dep = deps.get(appLdr);
+
+ if (dep != null && !dep.undeployed()) {
+ if (log.isTraceEnabled())
+ log.trace("Deployment was found for class with the local
app class loader [alias="
+ + meta.alias() + "]");
+
+ return dep;
+ }
}
if (log.isDebugEnabled())
@@ -288,42 +307,21 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
String alias,
boolean recordEvt
) {
- GridDeployment dep = null;
+ GridDeployment dep;
synchronized (mux) {
boolean fireEvt = false;
try {
- Deque<GridDeployment> cachedDeps = null;
-
- // Find existing class loader info.
- for (Deque<GridDeployment> deps : cache.values()) {
- for (GridDeployment d : deps) {
- if (d.classLoader() == ldr) {
- // Cache class and alias.
- fireEvt = d.addDeployedClass(cls, alias);
-
- cachedDeps = deps;
-
- dep = d;
-
- break;
- }
- }
-
- if (cachedDeps != null)
- break;
- }
+ dep = depByLdr.get(ldr);
- if (cachedDeps != null) {
- assert dep != null;
+ if (dep != null) {
+ fireEvt = dep.addDeployedClass(cls, alias);
- cache.put(alias, cachedDeps);
+ addAliasMapping(alias, ldr, dep);
- if (!cls.getName().equals(alias)) {
- // Cache by class name as well.
- cache.put(cls.getName(), cachedDeps);
- }
+ if (!cls.getName().equals(alias))
+ addAliasMapping(cls.getName(), ldr, dep);
return dep;
}
@@ -339,19 +337,12 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
assert fireEvt : "Class was not added to newly created
deployment [cls=" + cls +
", depMode=" + depMode + ", dep=" + dep + ']';
- Deque<GridDeployment> deps = F.<String,
Deque<GridDeployment>>addIfAbsent(
- cache,
- alias,
- ConcurrentLinkedDeque::new
- );
+ depByLdr.put(ldr, dep);
- // Add at the beginning of the list for future fast access.
- deps.addFirst(dep);
+ addAliasMapping(alias, ldr, dep);
- if (!cls.getName().equals(alias)) {
- // Cache by class name as well.
- cache.put(cls.getName(), deps);
- }
+ if (!cls.getName().equals(alias))
+ addAliasMapping(cls.getName(), ldr, dep);
if (log.isDebugEnabled())
log.debug("Created new deployment: " + dep);
@@ -543,33 +534,29 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
* @param ldr Class loader to undeploy.
*/
private void undeploy(ClassLoader ldr) {
- Collection<GridDeployment> doomed = new HashSet<>();
+ GridDeployment dep;
synchronized (mux) {
- for (Iterator<Deque<GridDeployment>> i1 =
cache.values().iterator(); i1.hasNext();) {
- Deque<GridDeployment> deps = i1.next();
+ dep = depByLdr.remove(ldr);
- for (Iterator<GridDeployment> i2 = deps.iterator();
i2.hasNext();) {
- GridDeployment dep = i2.next();
+ if (dep != null) {
+ dep.undeploy();
- if (dep.classLoader() == ldr) {
- dep.undeploy();
+ if (log.isInfoEnabled())
+ log.info("Removed undeployed class: " + dep);
- i2.remove();
+ for (Iterator<Map<ClassLoader, GridDeployment>> it =
depsByAlias.values().iterator(); it.hasNext();) {
+ Map<ClassLoader, GridDeployment> deps = it.next();
- doomed.add(dep);
+ deps.remove(ldr);
- if (log.isInfoEnabled())
- log.info("Removed undeployed class: " + dep);
- }
+ if (deps.isEmpty())
+ it.remove();
}
-
- if (deps.isEmpty())
- i1.remove();
}
}
- for (GridDeployment dep : doomed) {
+ if (dep != null) {
if (dep.obsolete()) {
// Resource cleanup.
ctx.resource().onUndeployed(dep);
@@ -588,6 +575,19 @@ class GridDeploymentLocalStore extends
GridDeploymentStoreAdapter {
}
}
+ /**
+ * Adds deployment to the alias-based index. Must be called under {@link
#mux}.
+ *
+ * @param key Alias or classname.
+ * @param ldr Class loader.
+ * @param dep Deployment.
+ */
+ private void addAliasMapping(String key, ClassLoader ldr, GridDeployment
dep) {
+ Map<ClassLoader, GridDeployment> deps =
depsByAlias.computeIfAbsent(key, k -> new IdentityHashMap<>());
+
+ deps.put(ldr, dep);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDeploymentLocalStore.class, this);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
new file mode 100644
index 00000000000..450fced40aa
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.deployment;
+
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
+import org.apache.ignite.internal.client.thin.TestTask;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.Test;
+
+/** */
+public class GridDeploymentLocalStoreReuseTest extends AbstractThinClientTest {
+ /** */
+ private static final String LOG_NAME = "org.apache.ignite";
+
+ /** */
+ private static final int EXEC_CNT = 3;
+
+ /** */
+ private static Level initLogLevel;
+
+ /** */
+ private static ListeningTestLogger testLog;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setClientConnectorConfiguration(
+ new ClientConnectorConfiguration().setThinClientConfiguration(
+ new
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(1000)))
+ .setGridLogger(testLog)
+ .setPeerClassLoadingEnabled(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ LoggerConfig logCfg =
LoggerContext.getContext(false).getConfiguration().getLoggerConfig(LOG_NAME);
+
+ initLogLevel = logCfg.getLevel();
+
+ Configurator.setLevel(LOG_NAME, Level.TRACE);
+
+ testLog = new ListeningTestLogger(log);
+
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ Configurator.setLevel(LOG_NAME, initLogLevel);
+
+ initLogLevel = null;
+
+ super.afterTest();
+ }
+
+ /**
+ * Verifies that multiple task executions do not cause excessive local
deployment cache misses. The "deployment not
+ * found ... clsLdrId=null" message is allowed only once upon initial task
execution. The trace-level "deployment
+ * was found for class with the local app class loader" messages must be
present in the output marking the subsequent
+ * task executions without cache misses.
+ */
+ @Test
+ public void testNoExcessiveLocalDeploymentCacheMisses() throws Exception {
+ String taskClsName = TestTask.class.getName();
+
+ String notFoundMsg = String.format(
+ "Deployment was not found for class with specific class loader
[alias=%s, clsLdrId=null]", taskClsName);
+
+ String foundLocLdrMsg = String.format(
+ "Deployment was found for class with the local app class loader
[alias=%s]", taskClsName);
+
+ LogListener lsnr0 = LogListener.matches(notFoundMsg).times(1).build();
+ LogListener lsnr1 =
LogListener.matches(foundLocLdrMsg).atLeast(EXEC_CNT - 1).build();
+
+ testLog.registerListener(lsnr0);
+ testLog.registerListener(lsnr1);
+
+ try (IgniteClient client = startClient(0)) {
+ for (int i = 0; i < EXEC_CNT; i++)
+ client.compute().execute(TestTask.class.getName(), null);
+ }
+
+ assertTrue(lsnr0.check());
+ assertTrue(lsnr1.check());
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
index b11923fab41..da40645ed2d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.managers.deployment;
-import java.util.Deque;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -143,9 +141,9 @@ public class GridDifferentLocalDeploymentSelfTest extends
GridCommonAbstractTest
GridDeploymentStore store = GridTestUtils.getFieldValue(deploymentMgr,
"locStore");
- ConcurrentMap<String, Deque<GridDeployment>> cache =
GridTestUtils.getFieldValue(store, "cache");
+ Map<String, Map<ClassLoader, GridDeployment>> depsByAlias =
GridTestUtils.getFieldValue(store, "depsByAlias");
- assertEquals(2, cache.get(TASK_NAME1).size());
+ assertEquals(2, depsByAlias.get(TASK_NAME1).size());
deploymentMgr = server.context().deploy();
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index fffa334aca2..c167e16de97 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import org.apache.ignite.internal.GridP2PAffinitySelfTest;
+import
org.apache.ignite.internal.managers.deployment.GridDeploymentLocalStoreReuseTest;
import
org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest;
import
org.apache.ignite.internal.managers.deployment.GridDifferentLocalDeploymentSelfTest;
import
org.apache.ignite.internal.managers.deployment.P2PCacheOperationIntoComputeTest;
@@ -84,7 +85,8 @@ import org.junit.runners.Suite;
P2PUnsupportedClassVersionTest.class,
P2PClassLoadingFailureHandlingTest.class,
P2PClassLoadingIssuesTest.class,
- GridP2PComputeExceptionTest.class
+ GridP2PComputeExceptionTest.class,
+ GridDeploymentLocalStoreReuseTest.class,
})
public class IgniteP2PSelfTestSuite {
}