This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4984f181e4f IGNITE-27871 Fix local classes deployment lookup 
contention when peerClassLoadingEnabled=true - Fixes #12760.
4984f181e4f is described below

commit 4984f181e4fabde51d6888dbba9b339381782263
Author: Valuyskiy.O.Y <[email protected]>
AuthorDate: Tue Apr 7 09:59:44 2026 +0300

    IGNITE-27871 Fix local classes deployment lookup contention when 
peerClassLoadingEnabled=true - Fixes #12760.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../deployment/GridDeploymentLocalStore.java       | 196 ++++++++++-----------
 .../GridDeploymentLocalStoreReuseTest.java         | 114 ++++++++++++
 .../GridDifferentLocalDeploymentSelfTest.java      |   6 +-
 .../ignite/testsuites/IgniteP2PSelfTestSuite.java  |   4 +-
 4 files changed, 217 insertions(+), 103 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index 06e6d2653bb..d9972a36bd8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,19 +17,13 @@
 
 package org.apache.ignite.internal.managers.deployment;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Deque;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskName;
@@ -39,7 +33,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.util.GridAnnotationsCache;
 import org.apache.ignite.internal.util.GridClassLoaderCache;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -60,8 +53,11 @@ import static 
org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
  * Storage for local deployments.
  */
 class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
-    /** Deployment cache by class name. */
-    private final ConcurrentMap<String, Deque<GridDeployment>> cache = new 
ConcurrentHashMap<>();
+    /** Primary index: deployment by class loader. Not thread-safe, access 
must be guarded by {@link #mux}. */
+    private final Map<ClassLoader, GridDeployment> depByLdr = new 
IdentityHashMap<>();
+
+    /** Secondary index: deployments by alias or class name. Not thread-safe, 
access must be guarded by {@link #mux}. */
+    private final Map<String, Map<ClassLoader, GridDeployment>> depsByAlias = 
new HashMap<>();
 
     /** Mutex. */
     private final Object mux = new Object();
@@ -87,19 +83,14 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
     @Override public void stop() {
         spi.setListener(null);
 
-        Map<String, Collection<GridDeployment>> cp;
+        Set<ClassLoader> ldrs = U.newIdentityHashSet();
 
         synchronized (mux) {
-            cp = new HashMap<>(cache);
-
-            for (Entry<String, Collection<GridDeployment>> entry : 
cp.entrySet())
-                entry.setValue(new ArrayList<>(entry.getValue()));
+            ldrs.addAll(depByLdr.keySet());
         }
 
-        for (Collection<GridDeployment> deps : cp.values()) {
-            for (GridDeployment cls : deps)
-                undeploy(cls.classLoader());
-        }
+        for (ClassLoader ldr : ldrs)
+            undeploy(ldr);
 
         if (log.isDebugEnabled())
             log.debug(stopInfo());
@@ -110,11 +101,10 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
         Set<ClassLoader> obsoleteClsLdrs = U.newIdentityHashSet();
 
         synchronized (mux) {
-            // There can be obsolete class loaders in cache after client node 
reconnect with the new node id.
-            for (Entry<String, Deque<GridDeployment>> entry : cache.entrySet())
-                for (GridDeployment dep : entry.getValue())
-                    if 
(!dep.classLoaderId().globalId().equals(ctx.localNodeId()))
-                        obsoleteClsLdrs.add(dep.classLoader());
+            // There can be obsolete class loaders in deployment indexes after 
client node reconnect with the new node id.
+            for (GridDeployment dep : depByLdr.values())
+                if (!dep.classLoaderId().globalId().equals(ctx.localNodeId()))
+                    obsoleteClsLdrs.add(dep.classLoader());
         }
 
         for (ClassLoader clsLdr : obsoleteClsLdrs)
@@ -123,13 +113,10 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
 
     /** {@inheritDoc} */
     @Override public Collection<GridDeployment> getDeployments() {
-        Collection<GridDeployment> deps = new ArrayList<>();
+        Collection<GridDeployment> deps = U.newIdentityHashSet();
 
         synchronized (mux) {
-            for (Deque<GridDeployment> depList : cache.values())
-                for (GridDeployment d : depList)
-                    if (!deps.contains(d))
-                        deps.add(d);
+            deps.addAll(depByLdr.values());
         }
 
         return deps;
@@ -138,10 +125,9 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
     /** {@inheritDoc} */
     @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
         synchronized (mux) {
-            for (Deque<GridDeployment> deps : cache.values())
-                for (GridDeployment dep : deps)
-                    if (dep.classLoaderId().equals(ldrId))
-                        return dep;
+            for (GridDeployment dep : depByLdr.values())
+                if (dep.classLoaderId().equals(ldrId))
+                    return dep;
         }
 
         for (GridDeployment dep : ctx.task().getUsedDeployments())
@@ -248,22 +234,55 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
      * @return Deployment.
      */
     @Nullable private GridDeployment deployment(final GridDeploymentMetadata 
meta) {
-        Deque<GridDeployment> deps = cache.get(meta.alias());
+        Map<ClassLoader, GridDeployment> deps;
+
+        synchronized (mux) {
+            Map<ClassLoader, GridDeployment> cached = 
depsByAlias.get(meta.alias());
+
+            deps = cached == null ? null : new IdentityHashMap<>(cached);
+        }
 
         if (deps != null) {
-            for (GridDeployment dep : deps) {
-                if (dep.undeployed())
-                    continue;
+            GridDeployment dep = null;
 
-                // local or remote deployment.
-                if (dep.classLoaderId() == meta.classLoaderId() || 
dep.classLoader() == meta.classLoader()) {
-                    if (log.isTraceEnabled())
-                        log.trace("Deployment was found for class with 
specific class loader [alias=" + meta.alias() +
-                            ", clsLdrId=" + meta.classLoaderId() + "]");
+            if (meta.classLoader() != null)
+                dep = deps.get(meta.classLoader());
 
-                    return dep;
+            if ((dep == null || dep.undeployed()) && meta.classLoaderId() != 
null) {
+                for (GridDeployment d : deps.values()) {
+                    if (d.classLoaderId().equals(meta.classLoaderId()) && 
!d.undeployed()) {
+                        dep = d;
+
+                        break;
+                    }
                 }
             }
+
+            if (dep != null && !dep.undeployed()) {
+                if (log.isTraceEnabled())
+                    log.trace("Deployment was found for class with specific 
class loader [alias=" + meta.alias() +
+                        ", clsLdrId=" + meta.classLoaderId() + "]");
+
+                return dep;
+            }
+
+            ClassLoader appLdr = 
Thread.currentThread().getContextClassLoader();
+
+            if (appLdr == null)
+                appLdr = U.resolveClassLoader(ctx.config());
+
+            appLdr = (appLdr instanceof GridDeploymentClassLoader) ? null : 
appLdr;
+
+            if (appLdr != null)
+                dep = deps.get(appLdr);
+
+            if (dep != null && !dep.undeployed()) {
+                if (log.isTraceEnabled())
+                    log.trace("Deployment was found for class with the local 
app class loader [alias="
+                        + meta.alias() + "]");
+
+                return dep;
+            }
         }
 
         if (log.isDebugEnabled())
@@ -288,42 +307,21 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
         String alias,
         boolean recordEvt
     ) {
-        GridDeployment dep = null;
+        GridDeployment dep;
 
         synchronized (mux) {
             boolean fireEvt = false;
 
             try {
-                Deque<GridDeployment> cachedDeps = null;
-
-                // Find existing class loader info.
-                for (Deque<GridDeployment> deps : cache.values()) {
-                    for (GridDeployment d : deps) {
-                        if (d.classLoader() == ldr) {
-                            // Cache class and alias.
-                            fireEvt = d.addDeployedClass(cls, alias);
-
-                            cachedDeps = deps;
-
-                            dep = d;
-
-                            break;
-                        }
-                    }
-
-                    if (cachedDeps != null)
-                        break;
-                }
+                dep = depByLdr.get(ldr);
 
-                if (cachedDeps != null) {
-                    assert dep != null;
+                if (dep != null) {
+                    fireEvt = dep.addDeployedClass(cls, alias);
 
-                    cache.put(alias, cachedDeps);
+                    addAliasMapping(alias, ldr, dep);
 
-                    if (!cls.getName().equals(alias)) {
-                        // Cache by class name as well.
-                        cache.put(cls.getName(), cachedDeps);
-                    }
+                    if (!cls.getName().equals(alias))
+                        addAliasMapping(cls.getName(), ldr, dep);
 
                     return dep;
                 }
@@ -339,19 +337,12 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
                 assert fireEvt : "Class was not added to newly created 
deployment [cls=" + cls +
                     ", depMode=" + depMode + ", dep=" + dep + ']';
 
-                Deque<GridDeployment> deps = F.<String, 
Deque<GridDeployment>>addIfAbsent(
-                    cache,
-                    alias,
-                    ConcurrentLinkedDeque::new
-                );
+                depByLdr.put(ldr, dep);
 
-                // Add at the beginning of the list for future fast access.
-                deps.addFirst(dep);
+                addAliasMapping(alias, ldr, dep);
 
-                if (!cls.getName().equals(alias)) {
-                    // Cache by class name as well.
-                    cache.put(cls.getName(), deps);
-                }
+                if (!cls.getName().equals(alias))
+                    addAliasMapping(cls.getName(), ldr, dep);
 
                 if (log.isDebugEnabled())
                     log.debug("Created new deployment: " + dep);
@@ -543,33 +534,29 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
      * @param ldr Class loader to undeploy.
      */
     private void undeploy(ClassLoader ldr) {
-        Collection<GridDeployment> doomed = new HashSet<>();
+        GridDeployment dep;
 
         synchronized (mux) {
-            for (Iterator<Deque<GridDeployment>> i1 = 
cache.values().iterator(); i1.hasNext();) {
-                Deque<GridDeployment> deps = i1.next();
+            dep = depByLdr.remove(ldr);
 
-                for (Iterator<GridDeployment> i2 = deps.iterator(); 
i2.hasNext();) {
-                    GridDeployment dep = i2.next();
+            if (dep != null) {
+                dep.undeploy();
 
-                    if (dep.classLoader() == ldr) {
-                        dep.undeploy();
+                if (log.isInfoEnabled())
+                    log.info("Removed undeployed class: " + dep);
 
-                        i2.remove();
+                for (Iterator<Map<ClassLoader, GridDeployment>> it = 
depsByAlias.values().iterator(); it.hasNext();) {
+                    Map<ClassLoader, GridDeployment> deps = it.next();
 
-                        doomed.add(dep);
+                    deps.remove(ldr);
 
-                        if (log.isInfoEnabled())
-                            log.info("Removed undeployed class: " + dep);
-                    }
+                    if (deps.isEmpty())
+                        it.remove();
                 }
-
-                if (deps.isEmpty())
-                    i1.remove();
             }
         }
 
-        for (GridDeployment dep : doomed) {
+        if (dep != null) {
             if (dep.obsolete()) {
                 // Resource cleanup.
                 ctx.resource().onUndeployed(dep);
@@ -588,6 +575,19 @@ class GridDeploymentLocalStore extends 
GridDeploymentStoreAdapter {
         }
     }
 
+    /**
+     * Adds deployment to the alias-based index. Must be called under {@link 
#mux}.
+     *
+     * @param key Alias or classname.
+     * @param ldr Class loader.
+     * @param dep Deployment.
+     */
+    private void addAliasMapping(String key, ClassLoader ldr, GridDeployment 
dep) {
+        Map<ClassLoader, GridDeployment> deps = 
depsByAlias.computeIfAbsent(key, k -> new IdentityHashMap<>());
+
+        deps.put(ldr, dep);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDeploymentLocalStore.class, this);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
new file mode 100644
index 00000000000..450fced40aa
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStoreReuseTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.deployment;
+
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
+import org.apache.ignite.internal.client.thin.TestTask;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.Test;
+
+/** */
+public class GridDeploymentLocalStoreReuseTest extends AbstractThinClientTest {
+    /** */
+    private static final String LOG_NAME = "org.apache.ignite";
+
+    /** */
+    private static final int EXEC_CNT = 3;
+
+    /** */
+    private static Level initLogLevel;
+
+    /** */
+    private static ListeningTestLogger testLog;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setClientConnectorConfiguration(
+                new ClientConnectorConfiguration().setThinClientConfiguration(
+                    new 
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(1000)))
+            .setGridLogger(testLog)
+            .setPeerClassLoadingEnabled(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        LoggerConfig logCfg = 
LoggerContext.getContext(false).getConfiguration().getLoggerConfig(LOG_NAME);
+
+        initLogLevel = logCfg.getLevel();
+
+        Configurator.setLevel(LOG_NAME, Level.TRACE);
+
+        testLog = new ListeningTestLogger(log);
+
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        Configurator.setLevel(LOG_NAME, initLogLevel);
+
+        initLogLevel = null;
+
+        super.afterTest();
+    }
+
+    /**
+     * Verifies that multiple task executions do not cause excessive local 
deployment cache misses. The "deployment not
+     * found ... clsLdrId=null" message is allowed only once upon initial task 
execution. The trace-level "deployment
+     * was found for class with the local app class loader" messages must be 
present in the output marking the subsequent
+     * task executions without cache misses.
+     */
+    @Test
+    public void testNoExcessiveLocalDeploymentCacheMisses() throws Exception {
+        String taskClsName = TestTask.class.getName();
+
+        String notFoundMsg = String.format(
+            "Deployment was not found for class with specific class loader 
[alias=%s, clsLdrId=null]", taskClsName);
+
+        String foundLocLdrMsg = String.format(
+            "Deployment was found for class with the local app class loader 
[alias=%s]", taskClsName);
+
+        LogListener lsnr0 = LogListener.matches(notFoundMsg).times(1).build();
+        LogListener lsnr1 = 
LogListener.matches(foundLocLdrMsg).atLeast(EXEC_CNT - 1).build();
+
+        testLog.registerListener(lsnr0);
+        testLog.registerListener(lsnr1);
+
+        try (IgniteClient client = startClient(0)) {
+            for (int i = 0; i < EXEC_CNT; i++)
+                client.compute().execute(TestTask.class.getName(), null);
+        }
+
+        assertTrue(lsnr0.check());
+        assertTrue(lsnr1.check());
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
index b11923fab41..da40645ed2d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDifferentLocalDeploymentSelfTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.managers.deployment;
 
-import java.util.Deque;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -143,9 +141,9 @@ public class GridDifferentLocalDeploymentSelfTest extends 
GridCommonAbstractTest
 
         GridDeploymentStore store = GridTestUtils.getFieldValue(deploymentMgr, 
"locStore");
 
-        ConcurrentMap<String, Deque<GridDeployment>> cache = 
GridTestUtils.getFieldValue(store, "cache");
+        Map<String, Map<ClassLoader, GridDeployment>> depsByAlias = 
GridTestUtils.getFieldValue(store, "depsByAlias");
 
-        assertEquals(2, cache.get(TASK_NAME1).size());
+        assertEquals(2, depsByAlias.get(TASK_NAME1).size());
 
         deploymentMgr = server.context().deploy();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index fffa334aca2..c167e16de97 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.GridP2PAffinitySelfTest;
+import 
org.apache.ignite.internal.managers.deployment.GridDeploymentLocalStoreReuseTest;
 import 
org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest;
 import 
org.apache.ignite.internal.managers.deployment.GridDifferentLocalDeploymentSelfTest;
 import 
org.apache.ignite.internal.managers.deployment.P2PCacheOperationIntoComputeTest;
@@ -84,7 +85,8 @@ import org.junit.runners.Suite;
     P2PUnsupportedClassVersionTest.class,
     P2PClassLoadingFailureHandlingTest.class,
     P2PClassLoadingIssuesTest.class,
-    GridP2PComputeExceptionTest.class
+    GridP2PComputeExceptionTest.class,
+    GridDeploymentLocalStoreReuseTest.class,
 })
 public class IgniteP2PSelfTestSuite {
 }

Reply via email to