This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6e1db89f79a IGNITE-16136 Fix deadlock on system thread pool during marshaller mapping and binary metadata requests (#10204) 6e1db89f79a is described below commit 6e1db89f79a8837f5db23d86008cdc968d619442 Author: Maxim Muzafarov <maxmu...@gmail.com> AuthorDate: Wed Aug 31 22:09:41 2022 +0300 IGNITE-16136 Fix deadlock on system thread pool during marshaller mapping and binary metadata requests (#10204) --- .../ignite/internal/MarshallerContextImpl.java | 32 +- .../cache/binary/BinaryMetadataTransport.java | 113 +++---- .../processors/marshaller/ClientRequestFuture.java | 29 +- .../marshaller/GridMarshallerMappingProcessor.java | 7 +- .../internal/processors/marshaller/MappedName.java | 19 ++ .../marshaller/MarshallerMappingItem.java | 3 +- .../marshaller/MarshallerMappingTransport.java | 27 +- ...teMarshallerCacheClientRequestsMappingTest.java | 332 +++++++++++++++++++++ .../testframework/junits/GridAbstractTest.java | 30 +- .../ignite/testsuites/IgniteBasicTestSuite2.java | 2 + .../apache/ignite/tests/p2p/compute/Result.java | 31 ++ .../apache/ignite/tests/p2p/compute/ResultV0.java | 28 ++ .../apache/ignite/tests/p2p/compute/ResultV1.java | 28 ++ .../apache/ignite/tests/p2p/compute/ResultV2.java | 28 ++ .../apache/ignite/tests/p2p/compute/ResultV3.java | 28 ++ 15 files changed, 604 insertions(+), 133 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index ceac3efbe77..327e40d307a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -363,7 +363,13 @@ public class MarshallerContextImpl implements MarshallerContext { public void onMappingAccepted(final MarshallerMappingItem item) { ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId()); - cache.replace(item.typeId(), new MappedName(item.className(), true)); + MappedName oldMappedName = cache.put(item.typeId(), new MappedName(item.className(), true)); + + assert oldMappedName == null || item.className().equals(oldMappedName.className()) : + "Class name resolved from cluster: " + + item.className() + + ", class name from local cache: " + + oldMappedName.className(); closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className())); } @@ -498,30 +504,6 @@ public class MarshallerContextImpl implements MarshallerContext { return null; } - /** - * @param item Item. - * @param resolvedClsName Resolved class name. - */ - public void onMissedMappingResolved(final MarshallerMappingItem item, String resolvedClsName) { - ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId()); - - int typeId = item.typeId(); - MappedName mappedName = cache.get(typeId); - - if (mappedName != null) - assert resolvedClsName.equals(mappedName.className()) : - "Class name resolved from cluster: " - + resolvedClsName - + ", class name from local cache: " - + mappedName.className(); - else { - mappedName = new MappedName(resolvedClsName, true); - cache.putIfAbsent(typeId, mappedName); - - closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), resolvedClsName)); - } - } - /** {@inheritDoc} */ @Override public boolean isSystemType(String typeName) { return sysTypesSet.contains(typeName); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index d96b502b930..cc6c01452d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; - import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata; @@ -354,13 +353,15 @@ final class BinaryMetadataTransport { } /** - * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case + * Allows client node to request the latest version of binary metadata for a given typeId from the cluster in case * client is able to detect that it has obsolete metadata in its local cache. * * @param typeId ID of binary type. * @return future to wait for request arrival on. */ GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) { + assert ctx.clientNode(); + ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap); ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut); @@ -604,35 +605,12 @@ final class BinaryMetadataTransport { fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError())); else { if (clientNode) { - BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer); - - holder = metaLocCache.putIfAbsent(typeId, newHolder); - - if (holder != null) { - boolean obsoleteUpd = false; + boolean success = casBinaryMetadata(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer)); - do { - holder = metaLocCache.get(typeId); - - if (obsoleteUpdate( - holder.pendingVersion(), - holder.acceptedVersion(), - pendingVer, - acceptedVer)) { - obsoleteUpd = true; - - fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); - - break; - } - } - while (!metaLocCache.replace(typeId, holder, newHolder)); - - if (!obsoleteUpd) - initSyncFor(typeId, pendingVer, fut); - } - else + if (success) initSyncFor(typeId, pendingVer, fut); + else + fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); } else { initSyncFor(typeId, pendingVer, fut); @@ -659,24 +637,8 @@ final class BinaryMetadataTransport { BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer); - if (clientNode) { - holder = metaLocCache.putIfAbsent(typeId, newHolder); - - if (holder != null) { - do { - holder = metaLocCache.get(typeId); - - if (obsoleteUpdate( - holder.pendingVersion(), - holder.acceptedVersion(), - pendingVer, - acceptedVer)) - break; - - } - while (!metaLocCache.replace(typeId, holder, newHolder)); - } - } + if (clientNode) + casBinaryMetadata(typeId, newHolder); else { if (log.isDebugEnabled()) log.debug("Updated metadata on server node [holder=" + newHolder + @@ -717,18 +679,13 @@ final class BinaryMetadataTransport { int newAcceptedVer = msg.acceptedVersion(); if (clientNode) { - BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(), - holder.pendingVersion(), newAcceptedVer); - - do { - holder = metaLocCache.get(typeId); + boolean success = casBinaryMetadata(typeId, + new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer)); - int oldAcceptedVer = holder.acceptedVersion(); + ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId); - if (oldAcceptedVer > newAcceptedVer) - break; - } - while (!metaLocCache.replace(typeId, holder, newHolder)); + if (success && fut != null) + fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); } else { int oldAcceptedVer = holder.acceptedVersion(); @@ -783,6 +740,29 @@ final class BinaryMetadataTransport { } } + /** + * @param typeId Type id. + * @param newHolder New binary metadata holder. + * @return {@code true} if new holder was added successfully. + */ + private boolean casBinaryMetadata(int typeId, BinaryMetadataHolder newHolder) { + BinaryMetadataHolder oldHolder; + + do { + oldHolder = metaLocCache.putIfAbsent(typeId, newHolder); + + if (oldHolder == null) + return true; + + if (obsoleteUpdate(oldHolder.pendingVersion(), oldHolder.acceptedVersion(), + newHolder.pendingVersion(), newHolder.acceptedVersion())) + return false; + } + while (!metaLocCache.replace(typeId, oldHolder, newHolder)); + + return true; + } + /** * Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving * {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response. @@ -915,24 +895,7 @@ final class BinaryMetadataTransport { } try { - BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config())); - - BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder); - - if (oldHolder != null) { - do { - oldHolder = metaLocCache.get(typeId); - - // typeId metadata cannot be removed after initialization. - if (obsoleteUpdate( - oldHolder.pendingVersion(), - oldHolder.acceptedVersion(), - newHolder.pendingVersion(), - newHolder.acceptedVersion())) - break; - } - while (!metaLocCache.replace(typeId, oldHolder, newHolder)); - } + casBinaryMetadata(typeId, U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()))); fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java index 0be4e094dbc..f7d97328ead 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java @@ -97,12 +97,12 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult> try { ioMgr.sendToGridTopic( - srvNode, - GridTopic.TOPIC_MAPPING_MARSH, - new MissingMappingRequestMessage( - item.platformId(), - item.typeId()), - GridIoPolicy.SYSTEM_POOL); + srvNode, + GridTopic.TOPIC_MAPPING_MARSH, + new MissingMappingRequestMessage( + item.platformId(), + item.typeId()), + GridIoPolicy.SYSTEM_POOL); if (discoMgr.node(srvNode.id()) == null) continue; @@ -113,21 +113,22 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult> } catch (IgniteCheckedException ignored) { U.warn(log, - "Failed to request marshaller mapping from remote node (proceeding with the next one): " - + srvNode); + "Failed to request marshaller mapping from remote node (proceeding with the next one): " + + srvNode); } } noSrvsInCluster = pendingNode == null; } - if (noSrvsInCluster) + if (noSrvsInCluster) { onDone(MappingExchangeResult.createFailureResult( - new IgniteCheckedException( - "All server nodes have left grid, cannot request mapping [platformId: " - + item.platformId() - + "; typeId: " - + item.typeId() + "]"))); + new IgniteCheckedException( + "All server nodes have left grid, cannot request mapping [platformId: " + + item.platformId() + + "; typeId: " + + item.typeId() + "]"))); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 6231da604a7..f6895734545 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -216,7 +216,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { if (fut != null) { if (resolvedClsName != null) { - marshallerCtx.onMissedMappingResolved(item, resolvedClsName); + marshallerCtx.onMappingAccepted(new MarshallerMappingItem(platformId, typeId, resolvedClsName)); fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName)); } @@ -312,6 +312,11 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { } }); + ClientRequestFuture rqFut = clientReqSyncMap.get(new MarshallerMappingItem(item.platformId(), item.typeId(), null)); + + if (rqFut != null) + rqFut.onDone(MappingExchangeResult.createSuccessfulResult(item.className())); + GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item); if (fut != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java index c13c48eb835..eae07d2d01b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.marshaller; import java.io.Serializable; +import java.util.Objects; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -56,6 +57,24 @@ public final class MappedName implements Serializable { return accepted; } + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MappedName name = (MappedName)o; + + return accepted == name.accepted && Objects.equals(clsName, name.clsName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(clsName, accepted); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MappedName.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java index 609c5370c34..f715eb2fa7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.marshaller; import java.io.Serializable; +import java.util.Objects; import org.jetbrains.annotations.Nullable; /** @@ -84,7 +85,7 @@ public final class MarshallerMappingItem implements Serializable { return platformId == that.platformId && typeId == that.typeId - && (clsName != null ? clsName.equals(that.clsName) : that.clsName == null); + && (Objects.equals(clsName, that.clsName)); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java index eb50850a87c..bd0f9b0b3d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java @@ -135,27 +135,28 @@ public final class MarshallerMappingTransport { * @param cache Cache. */ public GridFutureAdapter<MappingExchangeResult> requestMapping( - MarshallerMappingItem item, - ConcurrentMap<Integer, MappedName> cache + MarshallerMappingItem item, + ConcurrentMap<Integer, MappedName> cache ) { - ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap); + assert ctx.clientNode(); + assert item.className() == null; + ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap); ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut); - - if (oldFut != null) - return oldFut; + ClientRequestFuture resFut = oldFut == null ? newFut : oldFut; MappedName mappedName = cache.get(item.typeId()); - if (mappedName != null) { - newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className())); - - return newFut; + if (mappedName == null) { + if (oldFut == null) + newFut.requestMapping(); + } + else { + if (mappedName.accepted()) + resFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className())); } - newFut.requestMapping(); - - return newFut; + return resFut; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java new file mode 100644 index 00000000000..5f19e41ca94 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; +import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage; +import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem; +import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.events.EventType.EVTS_CACHE; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** */ +public class IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAbstractTest { + /** Waiting timeout. */ + private static final long AWAIT_PROCESSING_TIMEOUT_MS = 10_000L; + + /** Limited thread pool size. */ + private static final int LIMITED_SYSTEM_THREAD_POOL = 4; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** External class loader. */ + private static final ClassLoader extClsLdr = getExternalClassLoader(); + + /** Person class name. */ + private static final String PERSON_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Person"; + + /** Organization class name. */ + private static final String ORGANIZATION_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Organization"; + + /** Address class name. */ + private static final String ADDRESS_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Address"; + + /** Compute job result class name. */ + private static final String JOB_RESULT_CLASS_NAME_PREFIX = "org.apache.ignite.tests.p2p.compute.ResultV"; + + /** Client work directory absolute path. */ + private String clntWorkDir; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (cfg.isClientMode()) + cfg.setWorkDirectory(clntWorkDir); + + cfg.setClassLoader(extClsLdr); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setWriteSynchronizationMode(FULL_SYNC)); + cfg.setIncludeEventTypes(EVTS_CACHE); + + cfg.setSystemThreadPoolSize(LIMITED_SYSTEM_THREAD_POOL); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + clntWorkDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "clnt", true).getAbsolutePath(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + Path path = Paths.get(clntWorkDir, DataStorageConfiguration.DFLT_MARSHALLER_PATH); + + for (File file : Objects.requireNonNull(path.toFile().listFiles())) + Files.delete(file.toPath()); + + Files.deleteIfExists(path); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDiscoeryMarshallerDelayedWithOverfloodThreadPool() throws Exception { + doTestMarshallingBinaryMappingsLoadedFromClient(true); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDiscoeryBinaryMetaDelayedWithOverfloodThreadPool() throws Exception { + doTestMarshallingBinaryMappingsLoadedFromClient(false); + } + + /** + * @param receiveMetadataOnClientJoin If {@code true} than binary metadata will exist on the server node and loaded + * by the client node on the node join exchange, otherwise it will be requested by client peer-2-peer though the TcpCommunicationSpi. + * @throws Exception If fails. + */ + private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMetadataOnClientJoin) throws Exception { + CountDownLatch delayMappingLatch = new CountDownLatch(1); + AtomicInteger loadKeys = new AtomicInteger(100); + CountDownLatch evtReceiveLatch = new CountDownLatch(1); + int initialKeys = receiveMetadataOnClientJoin ? 10 : 0; + + IgniteEx srv1 = startGrid(0); + + TestRecordingCommunicationSpi.spi(srv1) + .blockMessages((IgniteBiPredicate<ClusterNode, Message>)(node, msg) -> msg instanceof MissingMappingResponseMessage || + msg instanceof MetadataResponseMessage); + + // Load data pior to the client note starts, so the client will receive the binary metadata on the client node join. + for (int i = 0; i < initialKeys; i++) + srv1.cache(DEFAULT_CACHE_NAME).put(i, createOrganization(extClsLdr, i)); + + Ignite cl1 = startClientGrid(1, + (UnaryOperator<IgniteConfiguration>)cfg -> cfg.setDiscoverySpi(new TcpDiscoverySpi() { + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryCustomEventMessage) { + try { + DiscoverySpiCustomMessage custom = + ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); + + if (custom instanceof CustomMessageWrapper) { + DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); + + if (delegate instanceof MappingAcceptedMessage) { + MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "item"); + + if (item.className().equals(PERSON_CLASS_NAME) || + item.className().equals(ORGANIZATION_CLASS_NAME) || + item.className().equals(ADDRESS_CLASS_NAME) + ) { + try { + U.await(delayMappingLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + fail("Mapping proposed message must be released."); + } + } + } + } + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + super.startMessageProcess(msg); + } + }.setIpFinder(IP_FINDER))); + + awaitPartitionMapExchange(); + + cl1.events().remoteListen( + (IgniteBiPredicate<UUID, Event>)(uuid, evt) -> { + info("Event [" + evt.shortDisplay() + ']'); + + evtReceiveLatch.countDown(); + + return true; + }, + t -> true, + EVT_CACHE_OBJECT_PUT); + + // Flood system thread pool with cache events. + GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> { + int key; + + while ((key = loadKeys.decrementAndGet()) > initialKeys && !Thread.currentThread().isInterrupted()) + srv1.cache(DEFAULT_CACHE_NAME).put(key, createOrganization(extClsLdr, key)); + + return true; + }, 8, "cache-adder-thread").get(); + + assertTrue(GridTestUtils.waitForCondition(() -> TestRecordingCommunicationSpi.spi(srv1).hasBlockedMessages(), + AWAIT_PROCESSING_TIMEOUT_MS)); + + delayMappingLatch.countDown(); + + assertTrue(U.await(evtReceiveLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testBinaryMetaDelayedForComputeJobResult() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + startGrid(0); + + Ignite cl1 = startClientGrid(1, (UnaryOperator<IgniteConfiguration>)cfg -> + cfg.setDiscoverySpi(new TcpDiscoverySpi() { + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryCustomEventMessage) { + try { + DiscoverySpiCustomMessage custom = + ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); + + if (custom instanceof CustomMessageWrapper) { + DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); + + if (delegate instanceof MappingProposedMessage) { + MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "mappingItem"); + + if (item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) { + try { + U.await(latch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + fail("Exception must never be thrown: " + e.getMessage()); + } + } + } + } + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + super.startMessageProcess(msg); + } + }.setIpFinder(IP_FINDER))); + + AtomicInteger results = new AtomicInteger(4); + + // Flood system thread pool with task results. + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> { + int v; + + while ((v = results.decrementAndGet()) >= 0) { + int v0 = v; + + Object ignore = cl1.compute().call(() -> createResult(extClsLdr, v0)); + } + + return true; + }, LIMITED_SYSTEM_THREAD_POOL, "compute-thread"); + + latch.countDown(); + + fut.get(AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + + /** + * @param extClsLdr Class loader. + * @param key Cache key. + * @return Organization. + * @throws Exception If failed. + */ + private static Object createOrganization(ClassLoader extClsLdr, int key) throws Exception { + Class<?> personCls = extClsLdr.loadClass(PERSON_CLASS_NAME); + Class<?> addrCls = extClsLdr.loadClass(ADDRESS_CLASS_NAME); + + Constructor<?> personConstructor = personCls.getConstructor(String.class); + Constructor<?> addrConstructor = addrCls.getConstructor(String.class, Integer.TYPE); + Constructor<?> organizationConstructor = extClsLdr.loadClass(ORGANIZATION_CLASS_NAME) + .getConstructor(String.class, personCls, addrCls); + + return organizationConstructor.newInstance("Organization " + key, + personConstructor.newInstance("Persone name " + key), + addrConstructor.newInstance("Street " + key, key)); + } + + /** + * @param extClsLdr Class loader. + * @param ver Class type version. + * @return Result. + * @throws Exception If fails. + */ + public static Object createResult(ClassLoader extClsLdr, int ver) throws Exception { + Class<?> resCls = extClsLdr.loadClass(JOB_RESULT_CLASS_NAME_PREFIX + ver); + + return resCls.getConstructor(int.class).newInstance(ver); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index cf53ea150e2..d61a62e982e 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1002,7 +1002,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware { * @throws Exception If anything failed. */ protected IgniteEx startClientGrid() throws Exception { - return startClientGrid(getTestIgniteInstanceName()); + return startClientGrid(getTestIgniteInstanceName(), (UnaryOperator<IgniteConfiguration>)null); } /** @@ -1060,6 +1060,16 @@ public abstract class GridAbstractTest extends JUnitAssertAware { return startGrid(getTestIgniteInstanceName(idx), cfgOp); } + /** + * @param idx Index of the grid to start. + * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}. + * @return Started grid. + * @throws Exception If anything failed. + */ + protected IgniteEx startClientGrid(int idx, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception { + return startClientGrid(getTestIgniteInstanceName(idx), cfgOp); + } + /** * Starts new client grid with given index. * @@ -1068,7 +1078,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware { * @throws Exception If anything failed. */ protected IgniteEx startClientGrid(int idx) throws Exception { - return startClientGrid(getTestIgniteInstanceName(idx)); + return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null); } /** @@ -1083,7 +1093,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware { IgnitionEx.dependencyResolver(rslvr); try { - return startClientGrid(getTestIgniteInstanceName(idx)); + return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null); } finally { IgnitionEx.dependencyResolver(null); @@ -1098,10 +1108,22 @@ public abstract class GridAbstractTest extends JUnitAssertAware { * @throws Exception If anything failed. */ protected IgniteEx startClientGrid(String igniteInstanceName) throws Exception { + return startClientGrid(igniteInstanceName, (UnaryOperator<IgniteConfiguration>)null); + } + + /** + * Starts new client grid with given name. + * + * @param igniteInstanceName Ignite instance name. + * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}. + * @return Started grid. + * @throws Exception If anything failed. + */ + protected IgniteEx startClientGrid(String igniteInstanceName, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception { IgnitionEx.setClientMode(true); try { - return (IgniteEx)startGrid(igniteInstanceName, (GridSpringResourceContext)null); + return (IgniteEx)startGrid(igniteInstanceName, cfgOp, null); } finally { IgnitionEx.setClientMode(false); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java index e2419dc0839..8492bd34f0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheLockCandidatesThreadTest import org.apache.ignite.internal.processors.cache.GridLongRunningInitNewCrdFutureDiagnosticsTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest; +import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheSeparateDirectoryTest; import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadPoolSizeTest; @@ -143,6 +144,7 @@ import org.junit.runners.Suite; IgniteMarshallerCacheFSRestoreTest.class, IgniteMarshallerCacheClassNameConflictTest.class, IgniteMarshallerCacheClientRequestsMappingOnMissTest.class, + IgniteMarshallerCacheClientRequestsMappingTest.class, IgniteMarshallerCacheSeparateDirectoryTest.class, IgniteDiagnosticMessagesTest.class, diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java new file mode 100644 index 00000000000..cf680e7b272 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.compute; + +/** */ +public class Result { + /** Param. */ + final int v; + + /** + * @param v Param. + */ + Result(int v) { + this.v = v; + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java new file mode 100644 index 00000000000..1495d07a512 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.compute; + +/** */ +public class ResultV0 extends Result { + /** + * @param v Param. + */ + public ResultV0(int v) { + super(v); + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java new file mode 100644 index 00000000000..d56a31e449a --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.compute; + +/** */ +public class ResultV1 extends Result { + /** + * @param v Param. + */ + public ResultV1(int v) { + super(v); + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java new file mode 100644 index 00000000000..eb7e5313358 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.compute; + +/** */ +public class ResultV2 extends Result { + /** + * @param v Param. + */ + public ResultV2(int v) { + super(v); + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java new file mode 100644 index 00000000000..b0080d1184c --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.compute; + +/** */ +public class ResultV3 extends Result { + /** + * @param v Param. + */ + public ResultV3(int v) { + super(v); + } +}