IGNITE-3057 - Optimization for transactions that do not acquire locks
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b476fdf4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b476fdf4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b476fdf4 Branch: refs/heads/ignite-3163 Commit: b476fdf48151edd7be0b1911a4e004faad2aeb67 Parents: 7b0edfb Author: sboikov <[email protected]> Authored: Wed Apr 27 14:04:24 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Apr 27 16:07:34 2016 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 32 +++ .../cache/transactions/IgniteTxManager.java | 40 +++ .../processors/cache/CacheTxFastFinishTest.java | 253 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 4 files changed, 328 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ae4972e..515d284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -71,6 +71,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; @@ -816,6 +817,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); + if (fastFinish()) { + state(PREPARING); + state(PREPARED); + state(COMMITTING); + + cctx.tm().fastFinishTx(this, true); + + state(COMMITTED); + + return new GridFinishedFuture<>((IgniteInternalTx)this); + } + prepareAsync(); GridNearTxFinishFuture fut = commitFut.get(); @@ -860,6 +873,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); + if (fastFinish()) { + state(PREPARING); + state(PREPARED); + state(ROLLING_BACK); + + cctx.tm().fastFinishTx(this, false); + + state(ROLLED_BACK); + + return new GridFinishedFuture<>((IgniteInternalTx)this); + } + GridNearTxFinishFuture fut = rollbackFut.get(); if (fut != null) @@ -908,6 +933,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** + * @return {@code True} if 'fast finish' path can be used for transaction completion. + */ + private boolean fastFinish() { + return writeMap().isEmpty() && ((optimistic() && !serializable()) || readMap().isEmpty()); + } + + /** * Prepares next batch of entries in dht transaction. * * @param reads Read entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index e96a472..5dcd53d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1199,6 +1199,46 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Fast finish transaction. Can be used only if no locks were acquired. + * + * @param tx Transaction to finish. + * @param commit {@code True} if transaction is committed, {@code false} if rolled back. + */ + public void fastFinishTx(IgniteInternalTx tx, boolean commit) { + assert tx != null; + assert tx.writeMap().isEmpty(); + assert tx.optimistic() || tx.readMap().isEmpty(); + + ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); + + if (txIdMap.remove(tx.xidVersion(), tx)) { + // 1. Notify evictions. + notifyEvitions(tx); + + // 2. Remove obsolete entries. + removeObsolete(tx); + + // 3. Remove from per-thread storage. + clearThreadMap(tx); + + // 4. Clear context. + resetContext(); + + // 5. Update metrics. + if (!tx.dht() && tx.local()) { + if (!tx.system()) { + if (commit) + cctx.txMetrics().onTxCommit(); + else + cctx.txMetrics().onTxRollback(); + } + + tx.txState().onTxEnd(cctx, tx, commit); + } + } + } + + /** * Tries to minimize damage from partially-committed transaction. * * @param tx Tx to uncommit. http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java new file mode 100644 index 0000000..35b1405 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheTxFastFinishTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private boolean nearCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + cfg.setCacheConfiguration(ccfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testFastFinishTxNearCache() throws Exception { + nearCache = true; + + fastFinishTx(); + } + + /** + * @throws Exception If failed. + */ + public void testFastFinishTx() throws Exception { + fastFinishTx(); + } + + /** + * @throws Exception If failed. + */ + private void fastFinishTx() throws Exception { + startGrid(0); + + fastFinishTx(ignite(0)); + + client = true; + + startGrid(1); + + for (int i = 0; i < 2; i++) + fastFinishTx(ignite(i)); + + client = false; + + startGrid(2); + + for (int i = 0; i < 3; i++) + fastFinishTx(ignite(i)); + + startGrid(3); + + for (int i = 0; i < 4; i++) + fastFinishTx(ignite(i)); + + stopGrid(1); + + for (int i = 0; i < 4; i++) { + if (i != 1) + fastFinishTx(ignite(i)); + } + } + + /** + * @param ignite Node. + */ + private void fastFinishTx(Ignite ignite) { + IgniteTransactions txs = ignite.transactions(); + + IgniteCache cache = ignite.cache(null); + + for (boolean commit : new boolean[]{true, false}) { + for (TransactionConcurrency c : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction tx = txs.txStart(c, isolation)) { + checkFastTxFinish(tx, commit); + } + } + } + + for (int i = 0; i < 100; i++) { + try (Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) { + cache.get(i); + + checkFastTxFinish(tx, commit); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, READ_COMMITTED)) { + cache.get(i); + + checkFastTxFinish(tx, commit); + } + } + + for (int i = 0; i < 100; i++) { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.get(i); + + checkNormalTxFinish(tx, commit); + } + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.get(i); + + checkNormalTxFinish(tx, commit); + } + } + + for (int i = 0; i < 100; i++) { + for (TransactionConcurrency c : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction tx = txs.txStart(c, isolation)) { + cache.put(i, i); + + checkNormalTxFinish(tx, commit); + } + } + } + } + } + } + + /** + * @param tx Transaction. + * @param commit Commit flag. + */ + private void checkFastTxFinish(Transaction tx, boolean commit) { + if (commit) + tx.commit(); + else + tx.rollback(); + + IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx(); + + assertNull(fieldValue(tx0, "prepFut")); + assertNull(fieldValue(tx0, "commitFut")); + assertNull(fieldValue(tx0, "rollbackFut")); + } + + /** + * @param tx Transaction. + * @param commit Commit flag. + */ + private void checkNormalTxFinish(Transaction tx, boolean commit) { + IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx(); + + if (commit) { + tx.commit(); + + assertNotNull(fieldValue(tx0, "prepFut")); + assertNotNull(fieldValue(tx0, "commitFut")); + } + else { + tx.rollback(); + + assertNotNull(fieldValue(tx0, "rollbackFut")); + } + } + + /** + * @param obj Obejct. + * @param fieldName Field name. + * @return Field value. + */ + private Object fieldValue(Object obj, String fieldName) { + Object val = GridTestUtils.getFieldValue(obj, fieldName); + + if (val == null) + return null; + + if (val instanceof AtomicReference) + return ((AtomicReference)val).get(); + + return val; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 7f1d7df..003b12c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTe import org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest; import org.apache.ignite.internal.processors.cache.CacheNamesSelfTest; import org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest; +import org.apache.ignite.internal.processors.cache.CacheTxFastFinishTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityApiSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityMapperSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityRoutingSelfTest; @@ -298,6 +299,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteTxConfigCacheSelfTest.class); + suite.addTestSuite(CacheTxFastFinishTest.class); + return suite; } }
