Repository: ignite Updated Branches: refs/heads/master 8beb18ab0 -> e375ef937
IGNITE-8075 .NET: Transaction API parity setRollbackOnTopologyChangeTimeout, withLabel, localActiveTransactions, setTxTimeoutOnPartitionMapExchange This closes #3902 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e375ef93 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e375ef93 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e375ef93 Branch: refs/heads/master Commit: e375ef9370e0a4207017ad3b6af3150aa16f866d Parents: 8beb18a Author: Ivan Daschinskiy <[email protected]> Authored: Sat Apr 28 10:15:11 2018 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Sat Apr 28 10:15:11 2018 +0300 ---------------------------------------------------------------------- .../platform/PlatformProcessorImpl.java | 14 +++ .../transactions/PlatformTransactions.java | 36 ++++++ .../utils/PlatformConfigurationUtils.java | 2 + .../Apache.Ignite.Core.Tests.csproj | 1 + .../ApiParity/ClusterParityTest.cs | 3 +- .../ApiParity/TransactionsParityTest.cs | 6 +- .../Cache/CacheAbstractTest.cs | 2 +- .../Cache/CacheAbstractTransactionalTest.cs | 3 +- .../Cache/CacheTimeoutOnPmeTransactionalTest.cs | 108 ++++++++++++++++++ .../Cache/Store/CacheStoreTest.cs | 1 + .../IgniteConfigurationTest.cs | 4 +- .../Apache.Ignite.Core.csproj | 2 + .../Apache.Ignite.Core/Cluster/ICluster.cs | 8 +- .../Apache.Ignite.Core/IgniteConfiguration.cs | 4 +- .../IgniteConfigurationSection.xsd | 7 ++ .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 36 ++++-- .../Transactions/TransactionCollectionImpl.cs | 112 +++++++++++++++++++ .../Impl/Transactions/TransactionImpl.cs | 6 +- .../Impl/Transactions/TransactionsImpl.cs | 68 ++++++++++- .../Transactions/ITransactionCollection.cs | 29 +++++ .../Transactions/ITransactions.cs | 20 +++- .../Transactions/TransactionConfiguration.cs | 11 ++ 22 files changed, 456 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index 87ec97c..1c0878c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -164,6 +164,9 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf /** */ private static final int OP_IS_WAL_ENABLED = 29; + /** */ + private static final int OP_SET_TX_TIMEOUT_ON_PME = 30; + /** Start latch. */ private final CountDownLatch startLatch = new CountDownLatch(1); @@ -491,6 +494,11 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf return 0; + case OP_SET_TX_TIMEOUT_ON_PME: + ctx.grid().cluster().setTxTimeoutOnPartitionMapExchange(reader.readLong()); + + return 0; + case OP_IS_WAL_ENABLED: return ctx.grid().cluster().isWalEnabled(reader.readString()) ? TRUE : FALSE; } @@ -641,6 +649,12 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf return createPlatformCache(cache); } + + case OP_GET_TRANSACTIONS: { + String lbl = reader.readString(); + + return new PlatformTransactions(platformCtx, lbl); + } } return PlatformAbstractTarget.throwUnsupported(type); http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 8baca9b..24c0f62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -25,6 +25,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.lang.IgniteFuture; @@ -34,6 +36,7 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionMetrics; import java.sql.Timestamp; +import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -79,6 +82,9 @@ public class PlatformTransactions extends PlatformAbstractTarget { public static final int OP_PREPARE = 12; /** */ + public static final int OP_LOCAL_ACTIVE_TX = 13; + + /** */ private final IgniteTransactions txs; /** Map with currently active transactions. */ @@ -99,6 +105,18 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** + * Constructor. + * + * @param platformCtx Context. + * @param lbl Label. + */ + public PlatformTransactions(PlatformContext platformCtx, String lbl) { + super(platformCtx); + + txs = platformCtx.kernalContext().grid().transactions().withLabel(lbl); + } + + /** * Register transaction. * * @param tx Transaction. @@ -261,6 +279,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { writer.writeInt(txCfg.getDefaultTxConcurrency().ordinal()); writer.writeInt(txCfg.getDefaultTxIsolation().ordinal()); writer.writeLong(txCfg.getDefaultTxTimeout()); + writer.writeLong(txCfg.getTxTimeoutOnPartitionMapExchange()); break; @@ -274,6 +293,23 @@ public class PlatformTransactions extends PlatformAbstractTarget { break; + case OP_LOCAL_ACTIVE_TX: + Collection<Transaction> activeTxs = txs.localActiveTransactions(); + + PlatformUtils.writeCollection(writer, activeTxs, new PlatformWriterClosure<Transaction>() { + @Override public void write(BinaryRawWriterEx writer, Transaction tx) { + writer.writeLong(registerTx(tx)); + + writer.writeInt(tx.concurrency().ordinal()); + + writer.writeInt(tx.isolation().ordinal()); + + writer.writeLong(tx.timeout()); + } + }); + + break; + default: super.processOutStream(type, writer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index fa8e509..3cf14677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -734,6 +734,7 @@ public class PlatformConfigurationUtils { tx.setDefaultTxIsolation(TransactionIsolation.fromOrdinal(in.readInt())); tx.setDefaultTxTimeout(in.readLong()); tx.setPessimisticTxLogLinger(in.readInt()); + tx.setTxTimeoutOnPartitionMapExchange(in.readLong()); cfg.setTransactionConfiguration(tx); } @@ -1251,6 +1252,7 @@ public class PlatformConfigurationUtils { writeEnumInt(w, tx.getDefaultTxIsolation(), TransactionConfiguration.DFLT_TX_ISOLATION); w.writeLong(tx.getDefaultTxTimeout()); w.writeInt(tx.getPessimisticTxLogLinger()); + w.writeLong(tx.getTxTimeoutOnPartitionMapExchange()); } else w.writeBoolean(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index a04509a..fe7781d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -107,6 +107,7 @@ <Compile Include="Cache\Affinity\AffinityAttributeTest.cs" /> <Compile Include="Cache\CacheCreateTest.cs" /> <Compile Include="Cache\CacheQueryMetricsTest.cs" /> + <Compile Include="Cache\CacheTimeoutOnPmeTransactionalTest.cs" /> <Compile Include="Cache\DataRegionMetricsTest.cs" /> <Compile Include="Cache\DataStorageMetricsTest.cs" /> <Compile Include="Cache\PersistenceTest.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs index 2267540..b940d09 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs @@ -38,8 +38,7 @@ /** Members that are missing on .NET side and should be added in future. */ private static readonly string[] MissingMembers = { - "enableStatistics", // IGNITE-7276 - "setTxTimeoutOnPartitionMapExchange" // IGNITE-8075 + "enableStatistics" // IGNITE-7276 }; /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/TransactionsParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/TransactionsParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/TransactionsParityTest.cs index e519ab3..8707c63 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/TransactionsParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/TransactionsParityTest.cs @@ -26,11 +26,7 @@ namespace Apache.Ignite.Core.Tests.ApiParity public class TransactionsParityTest { /** Members that are missing on .NET side and should be added in future. */ - private static readonly string[] MissingMembers = - { - "localActiveTransactions", // IGNITE-8075 - "withLabel" // IGNITE-8075 - }; + private static readonly string[] MissingMembers = {}; /// <summary> /// Tests the API parity. http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs index fea37f7..9fee2b9 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs @@ -144,7 +144,7 @@ namespace Apache.Ignite.Core.Tests.Cache return GetIgnite(0).GetAffinity(CacheName()); } - protected ITransactions Transactions + protected virtual ITransactions Transactions { get { return GetIgnite(0).GetTransactions(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs index a414a2f..a70d9b7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs @@ -346,7 +346,8 @@ namespace Apache.Ignite.Core.Tests.Cache Assert.AreEqual(TransactionState.Active, tx.State); Assert.IsTrue(tx.StartTime.Ticks > 0); Assert.AreEqual(tx.NodeId, GetIgnite(0).GetCluster().GetLocalNode().Id); - + Assert.AreEqual(Transactions.DefaultTimeoutOnPartitionMapExchange, TimeSpan.Zero); + DateTime startTime1 = tx.StartTime; tx.Commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTimeoutOnPmeTransactionalTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTimeoutOnPmeTransactionalTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTimeoutOnPmeTransactionalTest.cs new file mode 100644 index 0000000..c0bade0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTimeoutOnPmeTransactionalTest.cs @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache +{ + using System; + using System.Linq; + using Apache.Ignite.Core.Transactions; + using NUnit.Framework; + + [Category(TestUtils.CategoryIntensive)] + public class CacheTimeoutOnPmeTransactionalTest : CacheAbstractTransactionalTest + { + /// <summary> + /// Tests that setting transaction PME timeout works and changes are propagated to Transactions. + /// </summary> + [Test] + public void TestSettingPartitionMapExchangeTimeout() + { + IIgnite ignite = GetIgnite(0); + + ignite.GetCluster().SetTxTimeoutOnPartitionMapExchange(TimeSpan.FromSeconds(12)); + + Assert.AreEqual(ignite.GetTransactions().DefaultTimeoutOnPartitionMapExchange, TimeSpan.FromSeconds(12)); + } + + /// <summary> + /// Tests local active transactions management. + /// </summary> + [Test] + public void TestLocalActiveTransactions() + { + IIgnite ignite = GetIgnite(0); + + using (var tx = ignite.GetTransactions().TxStart(TransactionConcurrency.Optimistic, + TransactionIsolation.ReadCommitted, TimeSpan.FromSeconds(20), 1)) + using (var activeTxCollection = ignite.GetTransactions().GetLocalActiveTransactions()) + { + Assert.IsNotEmpty(activeTxCollection); + + var testTx = activeTxCollection.ElementAt(0); + + Assert.AreEqual(testTx.Concurrency, tx.Concurrency); + + Assert.AreEqual(testTx.Isolation, tx.Isolation); + + Assert.AreEqual(testTx.Timeout, tx.Timeout); + + tx.Commit(); + + Assert.AreEqual(testTx.State, TransactionState.Committed); + + } + + using (var tx = ignite.GetTransactions().TxStart(TransactionConcurrency.Optimistic, + TransactionIsolation.ReadCommitted, TimeSpan.FromSeconds(20), 1)) + using (var activeTxCollection = ignite.GetTransactions().GetLocalActiveTransactions()) + { + Assert.IsNotEmpty(activeTxCollection); + + var testTx = activeTxCollection.ElementAt(0); + + testTx.Rollback(); + + Assert.AreEqual(tx.State, TransactionState.RolledBack); + } + } + + protected override ITransactions Transactions + { + get { return GetIgnite(0).GetTransactions().WithLabel("test-tx"); } + } + + protected override int GridCount() + { + return 3; + } + + protected override string CacheName() + { + return "partitioned"; + } + + protected override bool NearEnabled() + { + return false; + } + + protected override int Backups() + { + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs index 436a45f..07c3cfa 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -251,6 +251,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /// Tests that exceptions from CacheStoreFactory are propagated properly. /// </summary> [Test] + [Ignore("IGNITE-8070")] public void TestFailedCacheStoreException() { var ccfg = new CacheConfiguration("CacheWithFailedStore") http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index ec4a3fd..6c772f4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -170,6 +170,7 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(tx.DefaultTransactionIsolation, resTx.DefaultTransactionIsolation); Assert.AreEqual(tx.PessimisticTransactionLogLinger, resTx.PessimisticTransactionLogLinger); Assert.AreEqual(tx.PessimisticTransactionLogSize, resTx.PessimisticTransactionLogSize); + Assert.AreEqual(tx.DefaultTimeoutOnPartitionMapExchange, resTx.DefaultTimeoutOnPartitionMapExchange); var com = (TcpCommunicationSpi) cfg.CommunicationSpi; var resCom = (TcpCommunicationSpi) resCfg.CommunicationSpi; @@ -699,7 +700,8 @@ namespace Apache.Ignite.Core.Tests DefaultTimeout = TimeSpan.FromSeconds(25), DefaultTransactionIsolation = TransactionIsolation.Serializable, PessimisticTransactionLogLinger = TimeSpan.FromHours(1), - PessimisticTransactionLogSize = 240 + PessimisticTransactionLogSize = 240, + DefaultTimeoutOnPartitionMapExchange = TimeSpan.FromSeconds(25) }, CommunicationSpi = new TcpCommunicationSpi { http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index f217a0d..db9ac85 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -75,6 +75,7 @@ <Compile Include="Common\ExceptionFactory.cs" /> <Compile Include="Impl\Cache\QueryMetricsImpl.cs" /> <Compile Include="Impl\Common\TaskRunner.cs" /> + <Compile Include="Impl\Transactions\TransactionCollectionImpl.cs" /> <Compile Include="Ssl\ISslContextFactory.cs" /> <Compile Include="Configuration\Package-Info.cs" /> <Compile Include="Configuration\ClientConnectorConfiguration.cs" /> @@ -554,6 +555,7 @@ <Compile Include="Services\ServiceDeploymentException.cs" /> <Compile Include="Services\ServiceInvocationException.cs" /> <Compile Include="Transactions\ITransaction.cs" /> + <Compile Include="Transactions\ITransactionCollection.cs" /> <Compile Include="Transactions\ITransactionMetrics.cs" /> <Compile Include="Transactions\ITransactions.cs" /> <Compile Include="Transactions\Package-Info.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs index 45e22e9..a7ef919 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs @@ -155,5 +155,11 @@ namespace Apache.Ignite.Core.Cluster /// </summary> /// <param name="cacheName">Name of the cache.</param> bool IsWalEnabled(string cacheName); + + /// <summary> + /// Set transaction timeout on partition map exchange + /// </summary> + /// <param name="timeout"></param> + void SetTxTimeoutOnPartitionMapExchange(TimeSpan timeout); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index 6c8a429..059bea4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -416,6 +416,7 @@ namespace Apache.Ignite.Core writer.WriteInt((int) TransactionConfiguration.DefaultTransactionIsolation); writer.WriteLong((long) TransactionConfiguration.DefaultTimeout.TotalMilliseconds); writer.WriteInt((int) TransactionConfiguration.PessimisticTransactionLogLinger.TotalMilliseconds); + writer.WriteLong((long) TransactionConfiguration.DefaultDefaultTimeoutOnPartitionMapExchange.TotalMilliseconds); } else writer.WriteBoolean(false); @@ -681,7 +682,8 @@ namespace Apache.Ignite.Core DefaultTransactionConcurrency = (TransactionConcurrency) r.ReadInt(), DefaultTransactionIsolation = (TransactionIsolation) r.ReadInt(), DefaultTimeout = TimeSpan.FromMilliseconds(r.ReadLong()), - PessimisticTransactionLogLinger = TimeSpan.FromMilliseconds(r.ReadInt()) + PessimisticTransactionLogLinger = TimeSpan.FromMilliseconds(r.ReadInt()), + DefaultTimeoutOnPartitionMapExchange = TimeSpan.FromMilliseconds(r.ReadLong()) }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 8707272..7d3d8ad 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -1234,6 +1234,13 @@ <xs:documentation>Delay after which pessimistic recovery entries will be cleaned up for failed node.</xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="defaultTimeoutOnPartitionMapExchange" type="xs:string"> + <xs:annotation> + <xs:documentation> + Transaction timeout for partition map synchronization. TimeSpan.Zero for infinite timeout. + </xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> <xs:element name="logger" minOccurs="0"> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index ffab09f..f4c683e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -90,7 +90,8 @@ namespace Apache.Ignite.Core.Impl GetBaselineTopology = 26, DisableWal = 27, EnableWal = 28, - IsWalEnabled = 29 + IsWalEnabled = 29, + SetTxTimeoutOnPartitionMapExchange = 30 } /** */ @@ -120,9 +121,6 @@ namespace Apache.Ignite.Core.Impl /** Local node. */ private IClusterNode _locNode; - /** Transactions facade. */ - private readonly Lazy<TransactionsImpl> _transactions; - /** Callbacks */ private readonly UnmanagedCallbacks _cbs; @@ -172,10 +170,6 @@ namespace Apache.Ignite.Core.Impl cbs.Initialize(this); - // Grid is not completely started here, can't initialize interop transactions right away. - _transactions = new Lazy<TransactionsImpl>( - () => new TransactionsImpl(DoOutOpObject((int) Op.GetTransactions), GetLocalNode().Id)); - // Set reconnected task to completed state for convenience. _clientReconnectTaskCompletionSource.SetResult(false); @@ -613,7 +607,7 @@ namespace Apache.Ignite.Core.Impl /** <inheritdoc /> */ public ITransactions GetTransactions() { - return _transactions.Value; + return new TransactionsImpl(this, DoOutOpObject((int) Op.GetTransactions), GetLocalNode().Id); } /** <inheritdoc /> */ @@ -844,6 +838,12 @@ namespace Apache.Ignite.Core.Impl return DoOutOp((int) Op.IsWalEnabled, w => w.WriteString(cacheName)) == True; } + public void SetTxTimeoutOnPartitionMapExchange(TimeSpan timeout) + { + DoOutOp((int) Op.SetTxTimeoutOnPartitionMapExchange, + (BinaryWriter w) => w.WriteLong((long) timeout.TotalMilliseconds)); + } + /** <inheritdoc /> */ #pragma warning disable 618 public IPersistentStoreMetrics GetPersistentStoreMetrics() @@ -945,6 +945,24 @@ namespace Apache.Ignite.Core.Impl _nodes[node.Id] = node; } + + /// <summary> + /// Returns instance of Ignite Transactions to mark a transaction with a special label. + /// </summary> + /// <param name="label"></param> + /// <returns><see cref="ITransactions"/></returns> + internal ITransactions GetTransactionsWithLabel(string label) + { + Debug.Assert(label != null); + + var platformTargetInternal = DoOutOpObject((int) Op.GetTransactions, s => + { + var w = BinaryUtils.Marshaller.StartMarshal(s); + w.WriteString(label); + }); + + return new TransactionsImpl(this, platformTargetInternal, GetLocalNode().Id); + } /// <summary> /// Gets the node from cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionCollectionImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionCollectionImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionCollectionImpl.cs new file mode 100644 index 0000000..500665f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionCollectionImpl.cs @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using System.Collections; + using System.Collections.Generic; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Internal transaction read only disposable collection. + /// </summary> + internal sealed class TransactionCollectionImpl : ITransactionCollection + { + /** */ + private readonly ICollection<ITransaction> _col; + + ///<summary> + /// Initialize <see cref="TransactionCollectionImpl"/> by wrapping. + /// </summary> + public TransactionCollectionImpl(ICollection<ITransaction> col) + { + _col = col; + } + + /** <inheritdoc /> */ + public IEnumerator<ITransaction> GetEnumerator() + { + return _col.GetEnumerator(); + } + + /** <inheritdoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable) _col).GetEnumerator(); + } + + /** <inheritdoc /> */ + public void Add(ITransaction item) + { + throw GetReadOnlyException(); + } + + /** <inheritdoc /> */ + public void Clear() + { + throw GetReadOnlyException(); + } + + /** <inheritdoc /> */ + public bool Contains(ITransaction item) + { + return _col.Contains(item); + } + + /** <inheritdoc /> */ + public void CopyTo(ITransaction[] array, int arrayIndex) + { + _col.CopyTo(array, arrayIndex); + } + + /** <inheritdoc /> */ + public bool Remove(ITransaction item) + { + throw GetReadOnlyException(); + } + + /** <inheritdoc /> */ + public int Count + { + get { return _col.Count; } + } + + /** <inheritdoc /> */ + public bool IsReadOnly + { + get { return true; } + } + + /// <summary> + /// Gets the readonly exception. + /// </summary> + private static Exception GetReadOnlyException() + { + return new NotSupportedException("Collection is read-only."); + } + + /** <inheritdoc /> */ + public void Dispose() + { + foreach (var tx in _col) + { + tx.Dispose(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs index c800859..396171c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs @@ -73,8 +73,9 @@ namespace Apache.Ignite.Core.Impl.Transactions /// <param name="isolation">TX isolation.</param> /// <param name="timeout">Timeout.</param> /// <param name="nodeId">The originating node identifier.</param> + /// <param name="bindToThread">Bind transaction to current thread or not.</param> public TransactionImpl(long id, TransactionsImpl txs, TransactionConcurrency concurrency, - TransactionIsolation isolation, TimeSpan timeout, Guid nodeId) { + TransactionIsolation isolation, TimeSpan timeout, Guid nodeId, bool bindToThread = true) { _id = id; _txs = txs; _concurrency = concurrency; @@ -86,7 +87,8 @@ namespace Apache.Ignite.Core.Impl.Transactions _threadId = Thread.CurrentThread.ManagedThreadId; - THREAD_TX = this; + if (bindToThread) + THREAD_TX = this; } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs index 4dd7f9f..99cb837 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs @@ -18,10 +18,13 @@ namespace Apache.Ignite.Core.Impl.Transactions { using System; + using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Transactions; /// <summary> @@ -66,6 +69,9 @@ namespace Apache.Ignite.Core.Impl.Transactions private const int OpPrepare = 12; /** */ + private const int OpLocalActiveTransactions = 13; + + /** */ private readonly TransactionConcurrency _dfltConcurrency; /** */ @@ -75,25 +81,36 @@ namespace Apache.Ignite.Core.Impl.Transactions private readonly TimeSpan _dfltTimeout; /** */ + private readonly TimeSpan _dfltTimeoutOnPartitionMapExchange; + + /** */ private readonly Guid _localNodeId; + /** */ + private readonly Ignite _ignite; + /// <summary> /// Initializes a new instance of the <see cref="TransactionsImpl" /> class. /// </summary> + /// <param name="ignite">Parent target, actually <see cref="Ignite"/> (used for withLabel)</param> /// <param name="target">Target.</param> /// <param name="localNodeId">Local node id.</param> - public TransactionsImpl(IPlatformTargetInternal target, Guid localNodeId) : base(target) + public TransactionsImpl(Ignite ignite, IPlatformTargetInternal target, Guid localNodeId) : base(target) { _localNodeId = localNodeId; var res = target.OutStream(OpCacheConfigParameters, reader => Tuple.Create( (TransactionConcurrency) reader.ReadInt(), (TransactionIsolation) reader.ReadInt(), - reader.ReadLongAsTimespan())); + reader.ReadLongAsTimespan(), + reader.ReadLongAsTimespan() + )); _dfltConcurrency = res.Item1; _dfltIsolation = res.Item2; _dfltTimeout = res.Item3; + _dfltTimeoutOnPartitionMapExchange = res.Item4; + _ignite = ignite; } /** <inheritDoc /> */ @@ -122,7 +139,7 @@ namespace Apache.Ignite.Core.Impl.Transactions }, s => s.ReadLong()); var innerTx = new TransactionImpl(id, this, concurrency, isolation, timeout, _localNodeId); - + return new Transaction(innerTx); } @@ -149,6 +166,45 @@ namespace Apache.Ignite.Core.Impl.Transactions DoOutInOp(OpResetMetrics); } + /** <inhertiDoc /> */ + public ITransactions WithLabel(string label) + { + IgniteArgumentCheck.NotNullOrEmpty(label, "label"); + + return _ignite.GetTransactionsWithLabel(label); + } + + /** <inheritDoc /> */ + public ITransactionCollection GetLocalActiveTransactions() + { + return DoInOp(OpLocalActiveTransactions, stream => + { + var reader = Marshaller.StartUnmarshal(stream); + + var size = reader.ReadInt(); + + var result = new List<ITransaction>(size); + + for (var i = 0; i < size; i++) + { + var id = reader.ReadLong(); + + var concurrency = reader.ReadInt(); + + var isolation = reader.ReadInt(); + + var timeout = reader.ReadLongAsTimespan(); + + var innerTx = new TransactionImpl(id, this, (TransactionConcurrency) concurrency, + (TransactionIsolation) isolation, timeout, _localNodeId, false); + + result.Add(new Transaction(innerTx)); + } + + return new TransactionCollectionImpl(result); + }); + } + /** <inheritDoc /> */ public TransactionConcurrency DefaultTransactionConcurrency { @@ -167,6 +223,12 @@ namespace Apache.Ignite.Core.Impl.Transactions get { return _dfltTimeout; } } + /** <inheritDoc /> */ + public TimeSpan DefaultTimeoutOnPartitionMapExchange + { + get { return _dfltTimeoutOnPartitionMapExchange; } + } + /// <summary> /// Executes prepare step of the two phase commit. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactionCollection.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactionCollection.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactionCollection.cs new file mode 100644 index 0000000..7c39262 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactionCollection.cs @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Transactions +{ + using System; + using System.Collections.Generic; + + /// <summary> + /// Disposable readonly collection of <see cref="ITransaction"/> + /// </summary> + public interface ITransactionCollection : ICollection<ITransaction>, IDisposable + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactions.cs index d3b98da..0760f36 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransactions.cs @@ -77,6 +77,11 @@ namespace Apache.Ignite.Core.Transactions /// Gets the default transaction timeout. /// </summary> TimeSpan DefaultTimeout { get; } + + /// <summary> + /// Gets the default transaction timeout on partition map exchange. + /// </summary> + TimeSpan DefaultTimeoutOnPartitionMapExchange { get; } /// <summary> /// Gets the metrics. @@ -89,5 +94,18 @@ namespace Apache.Ignite.Core.Transactions /// Resets the metrics. /// </summary> void ResetMetrics(); + + /// <summary> + /// Returns instance of Ignite Transactions to mark a transaction with a special label. + /// </summary> + /// <param name="label"></param> + /// <returns><see cref="ITransactions"/></returns> + ITransactions WithLabel(string label); + + /// <summary> + /// Returns a list of active transactions initiated by this node. + /// </summary> + /// <returns>Collection of <see cref="ITransactionCollection"/></returns> + ITransactionCollection GetLocalActiveTransactions(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e375ef93/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/TransactionConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/TransactionConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/TransactionConfiguration.cs index 5fe37f8..e7f62f7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/TransactionConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/TransactionConfiguration.cs @@ -40,6 +40,9 @@ namespace Apache.Ignite.Core.Transactions /// <summary> The default value for <see cref="PessimisticTransactionLogLinger"/> property. </summary> public static readonly TimeSpan DefaultPessimisticTransactionLogLinger = TimeSpan.FromMilliseconds(10000); + /// <summary> The default value for <see cref="DefaultTimeoutOnPartitionMapExchange"/></summary> + public static readonly TimeSpan DefaultDefaultTimeoutOnPartitionMapExchange = TimeSpan.Zero; + /// <summary> /// Gets or sets the cache transaction concurrency to use when one is not explicitly specified. /// </summary> @@ -74,6 +77,13 @@ namespace Apache.Ignite.Core.Transactions public TimeSpan PessimisticTransactionLogLinger { get; set; } /// <summary> + /// Gets or sets transaction timeout for partition map synchronization. + /// <see cref="TimeSpan.Zero"/> for infinite timeout. + /// </summary> + [DefaultValue(typeof(TimeSpan), "00:00:00")] + public TimeSpan DefaultTimeoutOnPartitionMapExchange { get; set; } + + /// <summary> /// Initializes a new instance of the <see cref="TransactionConfiguration" /> class. /// </summary> public TransactionConfiguration() @@ -83,6 +93,7 @@ namespace Apache.Ignite.Core.Transactions DefaultTimeout = DefaultDefaultTimeout; PessimisticTransactionLogSize = DefaultPessimisticTransactionLogSize; PessimisticTransactionLogLinger = DefaultPessimisticTransactionLogLinger; + DefaultTimeoutOnPartitionMapExchange = DefaultDefaultTimeoutOnPartitionMapExchange; } } }
