This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch ignite-2.9.1 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.9.1 by this push: new 7821365 IGNITE-13760 .NET: Fix NullPointerException in GetAffinity on client nodes 7821365 is described below commit 7821365b05a22dcea17f1885d33874a22231f8a0 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Nov 26 18:42:24 2020 +0300 IGNITE-13760 .NET: Fix NullPointerException in GetAffinity on client nodes Cache context does not exist on client nodes unless `GetCache` was called, so `GridCacheAffinityManager` retrieval in `PlatformAffinity` constructor can cause NPE. `GridCacheAffinityManager` was added to `PlatformAffinity` solely for Platform Cache needs and is not used otherwise, so it makes sense to move this functionality to a dedicated `PlatformAffinityManager` class. This fixes the bug, because cache context always exists by the time Platform Cache is created. (cherry picked from commit 1d2d76051cbf36ebdb8cbfeca0695078234d55fb) --- .../processors/platform/PlatformProcessorImpl.java | 12 +++- .../platform/cache/affinity/PlatformAffinity.java | 48 ++----------- .../cache/affinity/PlatformAffinityManager.java | 78 ++++++++++++++++++++++ .../Cache/Affinity/AffinityTest.cs | 40 ++++++++--- .../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 3 +- .../Impl/Cache/CacheAffinityImpl.cs | 23 +------ .../Impl/Cache/CacheAffinityManager.cs | 53 +++++++++++++++ .../Impl/Cache/Platform/PlatformCache.cs | 28 ++++---- .../Impl/Cache/Platform/PlatformCacheManager.cs | 36 +++++----- .../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 6 ++ .../Apache.Ignite.Core/Impl/IIgniteInternal.cs | 7 ++ .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 47 ++++++++----- 12 files changed, 257 insertions(+), 124 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index 61dec3a..266b9de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCache; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheExtension; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager; import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinity; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityManager; import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore; import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGroup; import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer; @@ -193,6 +194,9 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf /** */ private static final int OP_GET_OR_CREATE_LOCK = 38; + /** */ + private static final int OP_GET_AFFINITY_MANAGER = 39; + /** Start latch. */ private final CountDownLatch startLatch = new CountDownLatch(1); @@ -637,7 +641,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } case OP_GET_AFFINITY: { - return new PlatformAffinity(platformCtx, ctx, reader.readString()); + return new PlatformAffinity(platformCtx, reader.readString()); } case OP_GET_DATA_STREAMER: { @@ -739,6 +743,12 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf return lock == null ? null : new PlatformLock(platformCtx, lock); } + + case OP_GET_AFFINITY_MANAGER: { + int cacheId = reader.readInt(); + + return new PlatformAffinityManager(platformCtx, cacheId); + } } return PlatformAbstractTarget.throwUnsupported(type); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java index e18be64..a0d79f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java @@ -23,22 +23,17 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** - * Native cache wrapper implementation. + * Affinity wrapper for platforms. */ public class PlatformAffinity extends PlatformAbstractTarget { /** */ @@ -86,45 +81,28 @@ public class PlatformAffinity extends PlatformAbstractTarget { /** */ public static final int OP_PARTITIONS = 15; - /** */ - public static final int OP_IS_ASSIGNMENT_VALID = 16; - - /** */ - private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() { - @Nullable @Override public UUID apply(ClusterNode node) { - return node != null ? node.id() : null; - } - }; - /** Underlying cache affinity. */ private final Affinity<Object> aff; /** Discovery manager */ private final GridDiscoveryManager discovery; - /** Affinity manager. */ - private final GridCacheAffinityManager affMgr; - /** * Constructor. * * @param platformCtx Context. - * @param igniteCtx Ignite context. * @param name Cache name. */ - public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx, @Nullable String name) + public PlatformAffinity(PlatformContext platformCtx, @Nullable String name) throws IgniteCheckedException { super(platformCtx); - this.aff = igniteCtx.grid().affinity(name); + aff = platformCtx.kernalContext().grid().affinity(name); if (aff == null) throw new IgniteCheckedException("Cache with the given name doesn't exist: " + name); - this.affMgr = this.platformCtx.kernalContext().cache().context().cacheContext(GridCacheUtils.cacheId(name)) - .affinity(); - - discovery = igniteCtx.discovery(); + discovery = platformCtx.kernalContext().discovery(); } /** {@inheritDoc} */ @@ -172,24 +150,6 @@ public class PlatformAffinity extends PlatformAbstractTarget { return aff.isPrimaryOrBackup(node, key) ? TRUE : FALSE; } - case OP_IS_ASSIGNMENT_VALID: { - AffinityTopologyVersion ver = new AffinityTopologyVersion(reader.readLong(), reader.readInt()); - int part = reader.readInt(); - AffinityTopologyVersion endVer = affMgr.affinityTopologyVersion(); - - if (!affMgr.primaryChanged(part, ver, endVer)) { - return TRUE; - } - - if (!affMgr.partitionLocalNode(part, endVer)) { - return FALSE; - } - - // Special case: late affinity assignment when primary changes to local node due to a node join. - // Specified partition is local, and near cache entries are valid for primary keys. - return ver.topologyVersion() == endVer.topologyVersion() ? TRUE : FALSE; - } - default: return super.processInStreamOutLong(type, reader); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityManager.java new file mode 100644 index 0000000..92306a63 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * AffinityManager wrapper for platforms. + */ +public class PlatformAffinityManager extends PlatformAbstractTarget { + /** */ + public static final int OP_IS_ASSIGNMENT_VALID = 1; + + /** Affinity manager. */ + private final GridCacheAffinityManager affMgr; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + public PlatformAffinityManager(PlatformContext platformCtx, int cacheId) { + super(platformCtx); + + GridCacheContext<Object, Object> ctx = platformCtx.kernalContext().cache().context().cacheContext(cacheId); + + if (ctx == null) + throw new IgniteException("Cache doesn't exist: " + cacheId); + + affMgr = ctx.affinity(); + } + + /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + if (type == OP_IS_ASSIGNMENT_VALID) + { + AffinityTopologyVersion ver = new AffinityTopologyVersion(reader.readLong(), reader.readInt()); + int part = reader.readInt(); + AffinityTopologyVersion endVer = affMgr.affinityTopologyVersion(); + + if (!affMgr.primaryChanged(part, ver, endVer)) { + return TRUE; + } + + if (!affMgr.partitionLocalNode(part, endVer)) { + return FALSE; + } + + // Special case: late affinity assignment when primary changes to local node due to a node join. + // Specified partition is local, and near cache entries are valid for primary keys. + return ver.topologyVersion() == endVer.topologyVersion() ? TRUE : FALSE; + } + + return super.processInStreamOutLong(type, reader); + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs index d62c0b1..b336b99 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs @@ -34,15 +34,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity [TestFixtureSetUp] public void StartGrids() { - for (int i = 0; i < 3; i++) + for (var i = 0; i < 3; i++) { - var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) - { - SpringConfigUrl = Path.Combine("Config", "native-client-test-cache-affinity.xml"), - IgniteInstanceName = "grid-" + i - }; - - Ignition.Start(cfg); + Ignition.Start(GetConfig(i, client: i == 2)); } } @@ -72,6 +66,21 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity } /// <summary> + /// Tests that affinity can be retrieved from client node right after the cache has been started on server node. + /// </summary> + [Test] + public void TestAffinityRetrievalForNewCache() + { + var server = Ignition.GetIgnite("grid-0"); + var client = Ignition.GetIgnite("grid-2"); + + var serverCache = server.CreateCache<int, int>(TestUtils.TestName); + var clientAff = client.GetAffinity(serverCache.Name); + + Assert.IsNotNull(clientAff); + } + + /// <summary> /// Test affinity with binary flag. /// </summary> [Test] @@ -79,7 +88,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity { IIgnite g = Ignition.GetIgnite("grid-0"); - ICacheAffinity aff = g.GetAffinity("default"); + ICacheAffinity aff = g.GetAffinity("default"); IBinaryObject affKey = g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(0, 1)); @@ -95,6 +104,19 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity } /// <summary> + /// Gets Ignite config. + /// </summary> + private static IgniteConfiguration GetConfig(int idx, bool client = false) + { + return new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + SpringConfigUrl = Path.Combine("Config", "native-client-test-cache-affinity.xml"), + IgniteInstanceName = "grid-" + idx, + ClientMode = client + }; + } + + /// <summary> /// Affinity key. /// </summary> private class AffinityTestKey diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index d110c85..fdd8ebd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -92,6 +92,7 @@ <Compile Include="IIgniteLock.cs" /> <Compile Include="Impl\Binary\BinaryHashCodeUtils.cs" /> <Compile Include="Impl\Binary\IgniteBiTuple.cs" /> + <Compile Include="Impl\Cache\CacheAffinityManager.cs" /> <Compile Include="Impl\Cache\Platform\IPlatformCache.cs" /> <Compile Include="Impl\Cache\Platform\PlatformCache.cs" /> <Compile Include="Impl\Cache\Platform\PlatformCacheEntry.cs" /> @@ -618,4 +619,4 @@ <Target Name="AfterBuild"> </Target> --> -</Project> \ No newline at end of file +</Project> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs index 869518e..a49074b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs @@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache using System; using System.Collections.Generic; using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cache.Affinity; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; @@ -77,11 +76,8 @@ namespace Apache.Ignite.Core.Impl.Cache private const int OpPartitions = 15; /** */ - private const int OpIsAssignmentValid = 16; - - /** */ private readonly bool _keepBinary; - + /** Grid. */ private readonly IIgniteInternal _ignite; @@ -115,7 +111,7 @@ namespace Apache.Ignite.Core.Impl.Cache public bool IsPrimary<TK>(IClusterNode n, TK key) { IgniteArgumentCheck.NotNull(n, "n"); - + IgniteArgumentCheck.NotNull(key, "key"); return DoOutOp(OpIsPrimary, n.Id, key) == True; @@ -220,19 +216,6 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOp(OpMapPartitionToPrimaryAndBackups, w => w.WriteObject(part), r => ReadNodes(r)); } - /// <summary> - /// Checks whether given partition is still assigned to the same node as in specified version. - /// </summary> - internal bool IsAssignmentValid(AffinityTopologyVersion version, int partition) - { - return DoOutOp(OpIsAssignmentValid, (IBinaryStream s) => - { - s.WriteLong(version.Version); - s.WriteInt(version.MinorVersion); - s.WriteInt(partition); - }) != 0; - } - /** <inheritDoc /> */ protected override T Unmarshal<T>(IBinaryStream stream) { @@ -283,4 +266,4 @@ namespace Apache.Ignite.Core.Impl.Cache return dict; } } -} \ No newline at end of file +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityManager.cs new file mode 100644 index 0000000..e09a09c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityManager.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache +{ + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Affinity manager. + /// </summary> + internal class CacheAffinityManager : PlatformTargetAdapter + { + /** */ + private const int OpIsAssignmentValid = 1; + + /// <summary> + /// Initializes a new instance of <see cref="CacheAffinityManager"/> class. + /// </summary> + /// <param name="target">Target.</param> + internal CacheAffinityManager(IPlatformTargetInternal target) : base(target) + { + // No-op. + } + + /// <summary> + /// Checks whether given partition is still assigned to the same node as in specified version. + /// </summary> + internal bool IsAssignmentValid(AffinityTopologyVersion version, int partition) + { + return DoOutOp(OpIsAssignmentValid, (IBinaryStream s) => + { + s.WriteLong(version.Version); + s.WriteInt(version.MinorVersion); + s.WriteInt(partition); + }) != 0; + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCache.cs index d18aad9..c7bb09f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCache.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCache.cs @@ -32,8 +32,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform internal sealed class PlatformCache<TK, TV> : IPlatformCache { /** Affinity. */ - private readonly CacheAffinityImpl _affinity; - + private readonly CacheAffinityManager _affinity; + /** Keep binary flag. */ private readonly bool _keepBinary; @@ -44,7 +44,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform private readonly Func<object> _affinityTopologyVersionFunc; /** Underlying map. */ - private readonly ConcurrentDictionary<TK, PlatformCacheEntry<TV>> _map = + private readonly ConcurrentDictionary<TK, PlatformCacheEntry<TV>> _map = new ConcurrentDictionary<TK, PlatformCacheEntry<TV>>(); /** Stopped flag. */ @@ -52,9 +52,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform /// <summary> /// Initializes a new instance of the <see cref="PlatformCache{TK,TV}"/> class. - /// Called via reflection from <see cref="PlatformCacheManager.CreatePlatformCache"/>. + /// Called via reflection from <see cref="PlatformCacheManager.CreatePlatformCache"/>. /// </summary> - public PlatformCache(Func<object> affinityTopologyVersionFunc, CacheAffinityImpl affinity, bool keepBinary) + public PlatformCache(Func<object> affinityTopologyVersionFunc, CacheAffinityManager affinity, bool keepBinary) { _affinityTopologyVersionFunc = affinityTopologyVersionFunc; _affinity = affinity; @@ -78,7 +78,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform PlatformCacheEntry<TV> entry; var key0 = (TK) (object) key; - + if (_map.TryGetValue(key0, out entry)) { if (IsValid(entry)) @@ -106,7 +106,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform } var count = 0; - + foreach (var e in _map) { if (!IsValid(e.Value)) @@ -118,7 +118,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform { continue; } - + count++; } @@ -179,7 +179,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform _stopped = true; Clear(); } - + /** <inheritdoc /> */ public void Clear() { @@ -227,19 +227,19 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform /// When primary node changes for a key, GridNearCacheEntry stops receiving updates for that key, /// because reader ("subscription") on new primary is not yet established. /// <para /> - /// This method is similar to GridNearCacheEntry.valid(). + /// This method is similar to GridNearCacheEntry.valid(). /// </summary> /// <param name="entry">Entry to validate.</param> /// <typeparam name="TVal">Value type.</typeparam> /// <returns>True if entry is valid and can be returned to the user; false otherwise.</returns> private bool IsValid<TVal>(PlatformCacheEntry<TVal> entry) { - // See comments on _affinityTopologyVersionFunc about boxed copy approach. + // See comments on _affinityTopologyVersionFunc about boxed copy approach. var currentVerBoxed = _affinityTopologyVersionFunc(); var entryVerBoxed = entry.Version; - + Debug.Assert(currentVerBoxed != null); - + if (ReferenceEquals(currentVerBoxed, entryVerBoxed)) { // Happy path: true on stable topology. @@ -267,7 +267,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform return valid; } - + /// <summary> /// Gets boxed affinity version. Reuses existing boxing copy to reduce allocations. /// </summary> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCacheManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCacheManager.cs index f1fe492..e7f9130 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCacheManager.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Platform/PlatformCacheManager.cs @@ -38,12 +38,12 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform /// Holds thread-local key/val pair to be used for updating platform cache. /// </summary> internal static readonly ThreadLocal<object> ThreadLocalPair = new ThreadLocal<object>(); - + /// <summary> /// Platform caches per cache id. /// Multiple <see cref="CacheImpl{TK,TV}"/> instances can point to the same Ignite cache, - /// and share one <see cref="PlatformCache{TK,TV}"/> instance. - /// </summary> + /// and share one <see cref="PlatformCache{TK,TV}"/> instance. + /// </summary> private readonly CopyOnWriteConcurrentDictionary<int, IPlatformCache> _caches = new CopyOnWriteConcurrentDictionary<int, IPlatformCache>(); @@ -56,9 +56,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform /// Current topology version. Store as object for atomic updates. /// </summary> private volatile object _affinityTopologyVersion; - + /// <summary> - /// Initializes a new instance of the <see cref="PlatformCacheManager"/> class. + /// Initializes a new instance of the <see cref="PlatformCacheManager"/> class. /// </summary> /// <param name="ignite">Ignite.</param> public PlatformCacheManager(IIgniteInternal ignite) @@ -77,7 +77,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform Debug.Assert(cacheConfiguration != null); var cacheId = BinaryUtils.GetCacheId(cacheConfiguration.Name); - + return _caches.GetOrAdd(cacheId, _ => CreatePlatformCache(cacheConfiguration)); } @@ -89,15 +89,15 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform IPlatformCache platformCache; return _caches.TryGetValue(cacheId, out platformCache) ? platformCache : null; } - + /// <summary> /// Reads cache entry from a stream and updates the platform cache. /// </summary> public void Update(int cacheId, IBinaryStream stream, Marshaller marshaller) { - var cache = _caches.GetOrAdd(cacheId, + var cache = _caches.GetOrAdd(cacheId, _ => CreatePlatformCache(_ignite.GetCacheConfiguration(cacheId))); - + cache.Update(stream, marshaller); } @@ -133,7 +133,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform { _affinityTopologyVersion = affinityTopologyVersion; } - + /// <summary> /// Creates platform cache. /// </summary> @@ -141,9 +141,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform { var platformCfg = cacheConfiguration.PlatformCacheConfiguration; Debug.Assert(platformCfg != null); - + Func<object> affinityTopologyVersionFunc = () => _affinityTopologyVersion; - var affinity = _ignite.GetAffinity(cacheConfiguration.Name); + var affinity = _ignite.GetAffinityManager(cacheConfiguration.Name); var keepBinary = platformCfg.KeepBinary; TypeResolver resolver = null; @@ -164,7 +164,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform if (resolved == null) { throw new InvalidOperationException(string.Format( - "Can not create .NET Platform Cache: {0}.{1} is invalid. Failed to resolve type: '{2}'", + "Can not create .NET Platform Cache: {0}.{1} is invalid. Failed to resolve type: '{2}'", typeof(PlatformCacheConfiguration).Name, fieldName, typeName)); } @@ -174,16 +174,16 @@ namespace Apache.Ignite.Core.Impl.Cache.Platform var keyType = resolve(platformCfg.KeyTypeName, "KeyTypeName"); var valType = resolve(platformCfg.ValueTypeName, "ValueTypeName"); var cacheType = typeof(PlatformCache<,>).MakeGenericType(keyType, valType); - + var platformCache = Activator.CreateInstance( - cacheType, - affinityTopologyVersionFunc, + cacheType, + affinityTopologyVersionFunc, affinity, keepBinary); - + return (IPlatformCache) platformCache; } - + /// <summary> /// Handles client disconnect. /// </summary> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs index 0815b65..4102e1a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs @@ -203,6 +203,12 @@ namespace Apache.Ignite.Core.Impl.Client } /** <inheritDoc /> */ + public CacheAffinityManager GetAffinityManager(string cacheName) + { + throw GetClientNotSupportedException(); + } + + /** <inheritDoc /> */ public CacheConfiguration GetCacheConfiguration(int cacheId) { throw GetClientNotSupportedException(); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IIgniteInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IIgniteInternal.cs index e62b89c..d727908 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IIgniteInternal.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IIgniteInternal.cs @@ -93,6 +93,13 @@ namespace Apache.Ignite.Core.Impl CacheAffinityImpl GetAffinity(string cacheName); /// <summary> + /// Gets internal affinity manager for a given cache. + /// </summary> + /// <param name="cacheName">Cache name.</param> + /// <returns>Cache affinity manager.</returns> + CacheAffinityManager GetAffinityManager(string cacheName); + + /// <summary> /// Gets cache name by id. /// </summary> /// <param name="cacheId">Cache id.</param> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index 9dfa40b..640f472 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -35,6 +35,7 @@ namespace Apache.Ignite.Core.Impl using Apache.Ignite.Core.DataStructures; using Apache.Ignite.Core.Events; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Cache; using Apache.Ignite.Core.Impl.Cache.Platform; using Apache.Ignite.Core.Impl.Cluster; @@ -101,7 +102,8 @@ namespace Apache.Ignite.Core.Impl SetBaselineAutoAdjustTimeout = 35, GetCacheConfig = 36, GetThreadLocal = 37, - GetOrCreateLock = 38 + GetOrCreateLock = 38, + GetAffinityManager = 39, } /** */ @@ -139,7 +141,7 @@ namespace Apache.Ignite.Core.Impl new ConcurrentDictionary<Guid, ClusterNodeImpl>(); /** Client reconnect task completion source. */ - private volatile TaskCompletionSource<bool> _clientReconnectTaskCompletionSource = + private volatile TaskCompletionSource<bool> _clientReconnectTaskCompletionSource = new TaskCompletionSource<bool>(); /** Plugin processor. */ @@ -189,7 +191,7 @@ namespace Apache.Ignite.Core.Impl SetCompactFooter(); _pluginProcessor = new PluginProcessor(this); - + _platformCacheManager = new PlatformCacheManager(this); } @@ -470,7 +472,7 @@ namespace Apache.Ignite.Core.Impl public ICache<TK, TV> GetOrCreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration, PlatformCacheConfiguration platformCacheConfiguration) { - return GetOrCreateCache<TK, TV>(configuration, nearConfiguration, platformCacheConfiguration, + return GetOrCreateCache<TK, TV>(configuration, nearConfiguration, platformCacheConfiguration, Op.GetOrCreateCacheFromConfig); } @@ -491,7 +493,7 @@ namespace Apache.Ignite.Core.Impl } /** <inheritdoc /> */ - public ICache<TK, TV> CreateCache<TK, TV>(CacheConfiguration configuration, + public ICache<TK, TV> CreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration) { return CreateCache<TK, TV>(configuration, nearConfiguration, null); @@ -501,14 +503,14 @@ namespace Apache.Ignite.Core.Impl public ICache<TK, TV> CreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration, PlatformCacheConfiguration platformCacheConfiguration) { - return GetOrCreateCache<TK, TV>(configuration, nearConfiguration, platformCacheConfiguration, + return GetOrCreateCache<TK, TV>(configuration, nearConfiguration, platformCacheConfiguration, Op.CreateCacheFromConfig); } /// <summary> /// Gets or creates the cache. /// </summary> - private ICache<TK, TV> GetOrCreateCache<TK, TV>(CacheConfiguration configuration, + private ICache<TK, TV> GetOrCreateCache<TK, TV>(CacheConfiguration configuration, NearCacheConfiguration nearConfiguration, PlatformCacheConfiguration platformCacheConfiguration, Op op) { IgniteArgumentCheck.NotNull(configuration, "configuration"); @@ -646,11 +648,22 @@ namespace Apache.Ignite.Core.Impl IgniteArgumentCheck.NotNull(cacheName, "cacheName"); var aff = DoOutOpObject((int) Op.GetAffinity, w => w.WriteString(cacheName)); - + return new CacheAffinityImpl(aff, false); } /** <inheritdoc /> */ + public CacheAffinityManager GetAffinityManager(string cacheName) + { + IgniteArgumentCheck.NotNull(cacheName, "cacheName"); + + var mgr = DoOutOpObject((int) Op.GetAffinityManager, + (IBinaryStream s) => s.WriteInt(BinaryUtils.GetCacheId(cacheName))); + + return new CacheAffinityManager(mgr); + } + + /** <inheritdoc /> */ public ICacheAffinity GetAffinity(string cacheName) { return ((IIgniteInternal) this).GetAffinity(cacheName); @@ -918,7 +931,7 @@ namespace Apache.Ignite.Core.Impl public void EnableWal(string cacheName) { IgniteArgumentCheck.NotNull(cacheName, "cacheName"); - + DoOutOp((int) Op.EnableWal, w => w.WriteString(cacheName)); } @@ -933,7 +946,7 @@ namespace Apache.Ignite.Core.Impl /** <inheritdoc /> */ public void SetTxTimeoutOnPartitionMapExchange(TimeSpan timeout) { - DoOutOp((int) Op.SetTxTimeoutOnPartitionMapExchange, + DoOutOp((int) Op.SetTxTimeoutOnPartitionMapExchange, (BinaryWriter w) => w.WriteLong((long) timeout.TotalMilliseconds)); } @@ -1005,7 +1018,7 @@ namespace Apache.Ignite.Core.Impl { Name = name }; - + return GetOrCreateLock(configuration, true); } @@ -1014,7 +1027,7 @@ namespace Apache.Ignite.Core.Impl { IgniteArgumentCheck.NotNull(configuration, "configuration"); IgniteArgumentCheck.NotNullOrEmpty(configuration.Name, "configuration.Name"); - + // Create a copy to ignore modifications from outside. var cfg = new LockConfiguration(configuration); @@ -1025,7 +1038,7 @@ namespace Apache.Ignite.Core.Impl w.WriteBoolean(configuration.IsFair); w.WriteBoolean(create); }); - + return target == null ? null : new IgniteLock(target, cfg); } @@ -1123,13 +1136,13 @@ namespace Apache.Ignite.Core.Impl internal ITransactions GetTransactionsWithLabel(string label) { Debug.Assert(label != null); - + var platformTargetInternal = DoOutOpObject((int) Op.GetTransactions, s => { var w = BinaryUtils.Marshaller.StartMarshal(s); w.WriteString(label); }); - + return new TransactionsImpl(this, platformTargetInternal, GetLocalNode().Id, label); } @@ -1163,7 +1176,7 @@ namespace Apache.Ignite.Core.Impl // Raise events. _clientReconnectTaskCompletionSource = new TaskCompletionSource<bool>(); - + var handler = ClientDisconnected; if (handler != null) handler.Invoke(this, EventArgs.Empty); @@ -1176,7 +1189,7 @@ namespace Apache.Ignite.Core.Impl internal void OnClientReconnected(bool clusterRestarted) { _marsh.OnClientReconnected(clusterRestarted); - + _clientReconnectTaskCompletionSource.TrySetResult(clusterRestarted); var handler = ClientReconnected;