Repository: ignite Updated Branches: refs/heads/master a2a902684 -> 740a9b5bd
ignite-6214 resolve problem with concurrent metadata updates Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/740a9b5b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/740a9b5b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/740a9b5b Branch: refs/heads/master Commit: 740a9b5bd7344ae099e6359f175759bb1a75835a Parents: a2a9026 Author: Denis Mekhanikov <[email protected]> Authored: Thu Aug 31 10:32:25 2017 +0300 Committer: Andrey Gura <[email protected]> Committed: Mon Sep 4 19:08:41 2017 +0300 ---------------------------------------------------------------------- .../cache/binary/BinaryMetadataTransport.java | 14 +++++- .../binary/BinaryMetadataUpdatesFlowTest.java | 48 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/740a9b5b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 77190a4..010ab0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -30,6 +30,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -392,7 +394,7 @@ final class BinaryMetadataTransport { * @param pendingVer Pending version. * @param fut Future. */ - private void initSyncFor(int typeId, int pendingVer, MetadataUpdateResultFuture fut) { + private void initSyncFor(int typeId, int pendingVer, final MetadataUpdateResultFuture fut) { if (stopping) { fut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); @@ -401,7 +403,15 @@ final class BinaryMetadataTransport { SyncKey key = new SyncKey(typeId, pendingVer); - syncMap.put(key, fut); + MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, fut); + + if (oldFut != null) { + oldFut.listen(new IgniteInClosure<IgniteInternalFuture<MetadataUpdateResult>>() { + @Override public void apply(IgniteInternalFuture<MetadataUpdateResult> doneFut) { + fut.onDone(doneFut.result(), doneFut.error()); + } + }); + } fut.key(key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/740a9b5b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java index 9ec48cf..3ee51c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java @@ -16,6 +16,8 @@ */ package org.apache.ignite.internal.processors.cache.binary; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingDeque; @@ -25,6 +27,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -36,6 +39,7 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -52,6 +56,8 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + /** * */ @@ -343,6 +349,48 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testConcurrentMetadataUpdates() throws Exception { + startGrid(0); + + final Ignite client = startGrid(getConfiguration("client").setClientMode(true)); + + final IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary(); + + int threadsNum = 10; + final int updatesNum = 2000; + + List<IgniteInternalFuture> futs = new ArrayList<>(); + + for (int i = 0; i < threadsNum; i++) { + final int threadId = i; + + IgniteInternalFuture fut = runAsync(new Runnable() { + @Override public void run() { + try { + for (int j = 0; j < updatesNum; j++) { + BinaryObjectBuilder bob = client.binary().builder(BINARY_TYPE_NAME); + + bob.setField("field" + j, threadId); + + cache.put(threadId, bob.build()); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }, "updater-" + i); + + futs.add(fut); + } + + for (IgniteInternalFuture fut : futs) + fut.get(); + } + + /** * Runnable responsible for stopping (gracefully) server nodes during metadata updates process. */ private final class ServerNodeKiller implements Runnable {
