This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8478994e5c7 IGNITE-27451 Fix AssertionError on concurrent services
undeployment (#12610)
8478994e5c7 is described below
commit 8478994e5c75735a68adf3e1a7cb4891736530e7
Author: Nikolay <[email protected]>
AuthorDate: Fri Dec 26 17:16:03 2025 +0300
IGNITE-27451 Fix AssertionError on concurrent services undeployment (#12610)
---
.../processors/service/IgniteServiceProcessor.java | 15 ++-
.../service/ServiceConcurrentUndeployTest.java | 111 +++++++++++++++++++++
.../testsuites/IgniteServiceGridTestSuite.java | 2 +
3 files changed, 119 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 7bf646d8d18..d1b458b94e2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -1641,11 +1641,8 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
}
});
- depActions.servicesToUndeploy().forEach((srvcId, desc) -> {
- ServiceInfo rmv = removeFromServicesMap(deployedServices,
deployedServicesByName, srvcId);
-
- assert rmv == desc : "Concurrent map modification.";
- });
+ depActions.servicesToUndeploy().forEach((srvcId, desc) ->
+ removeFromServicesMap(deployedServices,
deployedServicesByName, srvcId));
}
finally {
leaveBusy();
@@ -1873,9 +1870,8 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
else if (req instanceof ServiceUndeploymentRequest) {
ServiceInfo rmv = removeFromServicesMap(registeredServices,
registeredServicesByName, reqSrvcId);
- assert oldDesc == rmv : "Concurrent map modification.";
-
- toUndeploy.put(reqSrvcId, rmv);
+ if (oldDesc == rmv && rmv != null)
+ toUndeploy.put(reqSrvcId, rmv);
}
}
@@ -2038,7 +2034,8 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
) {
ServiceInfo desc = srvcsMap.remove(srvcId);
- assert desc != null : "Concurrent map modification.";
+ if (desc == null)
+ return null;
srvcsByNameMap.remove(desc.name());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
new file mode 100644
index 00000000000..a040aca7cb1
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.DiscoverySpiTestListener;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import
org.apache.ignite.internal.processors.service.inner.LongInitializedTestService;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * Tests concurrent deploy/undeploy services.
+ */
+public class ServiceConcurrentUndeployTest extends GridCommonAbstractTest {
+ /** */
+ private final CountDownLatch waitLatch = new CountDownLatch(2);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+ disco.setInternalListener(new DiscoverySpiTestListener() {
+ @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoverySpiCustomMessage msg) {
+ if (spi.isClientMode()) {
+ boolean isUndeployMsg = msg instanceof CustomMessageWrapper
+ && ((CustomMessageWrapper)msg).delegate() instanceof
ServiceChangeBatchRequest;
+
+ if (isUndeployMsg) {
+ ServiceChangeBatchRequest batch =
(ServiceChangeBatchRequest)((CustomMessageWrapper)msg).delegate();
+
+ long undeployReqCnt = batch.requests().stream()
+ .filter(r -> r instanceof
ServiceUndeploymentRequest)
+ .count();
+
+ if (undeployReqCnt > 0) {
+ assertEquals(1, undeployReqCnt);
+ assertTrue(waitLatch.getCount() > 0);
+
+ waitLatch.countDown();
+
+ try {
+ assertTrue(waitLatch.await(1,
TimeUnit.MINUTES));
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+ }
+
+ return super.beforeSendCustomEvent(spi, log, msg);
+ }
+ });
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void test() throws Exception {
+ try (IgniteEx ignite = startGrid(0); IgniteEx client0 =
startClientGrid(1); IgniteEx client1 = startClientGrid(2)) {
+ client0.services().deployNodeSingletonAsync(
+ "myservice",
+ new
LongInitializedTestService(ThreadLocalRandom.current().nextLong(1001))
+ ).get(1, TimeUnit.MINUTES);
+
+ // 1. Each client sees deployed service.
+ // 2. Each client sends request to undeploy service.
+ // 3. On second undeploy error throws.
+ IgniteInternalFuture<Void> fut0 = runAsync(() ->
client0.services().cancelAllAsync().get());
+ IgniteInternalFuture<Void> fut1 = runAsync(() ->
client1.services().cancelAllAsync().get());
+
+ fut0.get(1, TimeUnit.MINUTES);
+ fut1.get(1, TimeUnit.MINUTES);
+
+ assertEquals(0, waitLatch.getCount());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
index 3a0753b4eb8..d6855825f3d 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
@@ -46,6 +46,7 @@ import
org.apache.ignite.internal.processors.service.IgniteServiceDeploymentFail
import
org.apache.ignite.internal.processors.service.IgniteServiceDynamicCachesSelfTest;
import
org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest;
import
org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest;
+import
org.apache.ignite.internal.processors.service.ServiceConcurrentUndeployTest;
import
org.apache.ignite.internal.processors.service.ServiceDeploymentDiscoveryListenerNotificationOrderTest;
import
org.apache.ignite.internal.processors.service.ServiceDeploymentNonSerializableStaticConfigurationTest;
import
org.apache.ignite.internal.processors.service.ServiceDeploymentOnActivationTest;
@@ -113,6 +114,7 @@ import org.junit.runners.Suite;
ServiceInfoSelfTest.class,
ServiceDeploymentProcessIdSelfTest.class,
ServiceHotRedeploymentViaDeploymentSpiTest.class,
+ ServiceConcurrentUndeployTest.class,
GridServiceProxyTopologyInitializationTest.class,
GridServiceDeployClusterReadOnlyModeTest.class,
GridServiceClusterReadOnlyModeTest.class,