Repository: ignite Updated Branches: refs/heads/master e269e21cc -> 4074a9b9b
IGNITE-9622: MVCC: Prohibit non PESSIMISTIC REPEATABLE_READ transactions on MVCC-enabled caches. This closes #5074. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4074a9b9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4074a9b9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4074a9b9 Branch: refs/heads/master Commit: 4074a9b9b8cce04528337e0694966c09a9cdc049 Parents: e269e21 Author: ipavlukhin <vololo...@gmail.com> Authored: Wed Nov 21 11:43:16 2018 +0300 Committer: Igor Seliverstov <gvvinbl...@gmail.com> Committed: Wed Nov 21 11:43:53 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 20 +- .../dht/colocated/GridDhtColocatedCache.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 30 +- .../processors/cache/mvcc/MvccUtils.java | 48 ++- .../cache/mvcc/MvccUnsupportedTxModesTest.java | 316 +++++++++++++++++++ .../testsuites/IgniteCacheMvccTestSuite.java | 3 + 6 files changed, 382 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index feb609a..465f1f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -147,6 +147,7 @@ import org.apache.ignite.resources.JobContextResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionException; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -1975,7 +1976,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(e); } - tx = ctx.tm().threadLocalTx(ctx); + tx = checkCurrentTx(); } if (tx == null || tx.implicit()) { @@ -2317,6 +2318,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } + /** */ + protected GridNearTxLocal checkCurrentTx() { + if (!ctx.mvccEnabled()) + return ctx.tm().threadLocalTx(ctx); + + try { + return MvccUtils.currentTx(ctx.kernalContext(), null); + } + catch (MvccUtils.UnsupportedTxModeException | MvccUtils.NonMvccTransactionException e) { + throw new TransactionException(e.getMessage()); + } + } + /** * @param topVer Affinity topology version for which load was performed. * @param loadKeys Keys to load. @@ -4241,7 +4255,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V awaitLastFut(); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = checkCurrentTx(); if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); @@ -4343,7 +4357,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (log.isDebugEnabled()) log.debug("Performing async op: " + op); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = checkCurrentTx(); CacheOperationContext opCtx = ctx.operationContextPerCall(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 8061362..8fc353c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -189,7 +189,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKey(key); - GridNearTxLocal tx = ctx.mvccEnabled() ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = checkCurrentTx(); final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -306,7 +306,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKeys(keys); - GridNearTxLocal tx = (ctx.mvccEnabled()) ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = checkCurrentTx(); final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -346,7 +346,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert mvccSnapshot != null; } catch (IgniteCheckedException ex) { - return new GridFinishedFuture(ex); + return new GridFinishedFuture<>(ex); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f7c6000..9f1f86d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -702,19 +702,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * Validate Tx mode. - * - * @param ctx Cache context. - * @throws IgniteCheckedException If tx mode is not supported. - */ - protected void validateTxMode(GridCacheContext ctx) throws IgniteCheckedException { - if(!ctx.mvccEnabled() || pessimistic() && repeatableRead()) - return; - - throw new IgniteCheckedException("Only pessimistic repeatable read transactions are supported at the moment."); - } - - /** * Internal method for put and transform operations in Mvcc mode. * Note: Only one of {@code map}, {@code transformMap} maps must be non-null. * @@ -735,8 +722,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable final CacheEntryPredicate filter ) { try { - validateTxMode(cacheCtx); - MvccUtils.requestSnapshot(cacheCtx, this); beforePut(cacheCtx, retval, true); @@ -1919,13 +1904,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable final CacheEntryPredicate filter ) { try { - validateTxMode(cacheCtx); - - if (mvccSnapshot == null) { - MvccUtils.mvccTracker(cacheCtx, this); - - assert mvccSnapshot != null; - } + MvccUtils.requestSnapshot(cacheCtx, this); beforeRemove(cacheCtx, retval, true); } @@ -2186,13 +2165,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); - try { - validateTxMode(cacheCtx); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture(e); - } - if (cacheCtx.mvccEnabled() && !isOperationAllowed(true)) return txTypeMismatchFinishFuture(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index a9bb540..0ceed09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -677,6 +677,26 @@ public class MvccUtils { * @return Currently started user transaction, or {@code null} if none started. */ @Nullable public static GridNearTxLocal tx(GridKernalContext ctx, @Nullable GridCacheVersion txId) { + try { + return currentTx(ctx, txId); + } + catch (UnsupportedTxModeException e) { + throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + } + catch (NonMvccTransactionException e) { + throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH); + } + } + + /** + * @param ctx Grid kernal context. + * @param txId Transaction ID. + * @return Currently started user transaction, or {@code null} if none started. + * @throws UnsupportedTxModeException If transaction mode is not supported when MVCC is enabled. + * @throws NonMvccTransactionException If started transaction spans non MVCC caches. + */ + @Nullable public static GridNearTxLocal currentTx(GridKernalContext ctx, + @Nullable GridCacheVersion txId) throws UnsupportedTxModeException, NonMvccTransactionException { IgniteTxManager tm = ctx.cache().context().tm(); IgniteInternalTx tx0 = txId == null ? tm.tx() : tm.tx(txId); @@ -687,23 +707,19 @@ public class MvccUtils { if (!tx.pessimistic() || !tx.repeatableRead()) { tx.setRollbackOnly(); - throw new IgniteSQLException("Only pessimistic repeatable read transactions are supported at the moment.", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - + throw new UnsupportedTxModeException(); } if (!tx.isOperationAllowed(true)) { tx.setRollbackOnly(); - throw new IgniteSQLException("SQL queries and cache operations " + - "may not be used in the same transaction.", IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH); + throw new NonMvccTransactionException(); } } return tx; } - /** * @param ctx Grid kernal context. * @param timeout Transaction timeout. @@ -925,4 +941,24 @@ public class MvccUtils { return newMvccCrd == MVCC_CRD_COUNTER_NA ? null : mvccVersion(newMvccCrd, newMvccCntr, newMvccOpCntr); } } + + /** */ + public static class UnsupportedTxModeException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + /** */ + private UnsupportedTxModeException() { + super("Only pessimistic repeatable read transactions are supported when MVCC is enabled."); + } + } + + /** */ + public static class NonMvccTransactionException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + /** */ + private NonMvccTransactionException() { + super("Operations on MVCC caches are not permitted in transactions spanning non MVCC caches."); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUnsupportedTxModesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUnsupportedTxModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUnsupportedTxModesTest.java new file mode 100644 index 0000000..bd82405 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUnsupportedTxModesTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionException; +import org.apache.ignite.transactions.TransactionIsolation; + +import static java.util.Collections.singleton; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** */ +public class MvccUnsupportedTxModesTest extends GridCommonAbstractTest { + /** */ + private static IgniteCache<Object, Object> cache; + /** */ + private static final CacheEntryProcessor<Object, Object, Object> testEntryProcessor = (entry, arguments) -> null; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + IgniteEx ign = startGrid(0); + + cache = ign.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL_SNAPSHOT)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** */ + public void testGetAndPutIfAbsent() { + checkOperation(() -> cache.getAndPutIfAbsent(1, 1)); + } + + /** */ + public void testGetAndPutIfAbsentAsync() { + checkOperation(() -> cache.getAndPutIfAbsentAsync(1, 1)); + } + + /** */ + public void testGet() { + checkOperation(() -> cache.get(1)); + } + + /** */ + public void testGetAsync() { + checkOperation(() -> cache.getAsync(1)); + } + + /** */ + public void testGetEntry() { + checkOperation(() -> cache.getEntry(1)); + } + + /** */ + public void testGetEntryAsync() { + checkOperation(() -> cache.getEntryAsync(1)); + } + + /** */ + public void testGetAll() { + checkOperation(() -> cache.getAll(singleton(1))); + } + + /** */ + public void testGetAllAsync() { + checkOperation(() -> cache.getAllAsync(singleton(1))); + } + + /** */ + public void testGetEntries() { + checkOperation(() -> cache.getEntries(singleton(1))); + } + + /** */ + public void testGetEntriesAsync() { + checkOperation(() -> cache.getEntriesAsync(singleton(1))); + } + + /** */ + public void testContainsKey() { + checkOperation(() -> cache.containsKey(1)); + } + + /** */ + public void testContainsKeyAsync() { + checkOperation(() -> cache.containsKeyAsync(1)); + } + + /** */ + public void testContainsKeys() { + checkOperation(() -> cache.containsKeys(singleton(1))); + } + + /** */ + public void testContainsKeysAsync() { + checkOperation(() -> cache.containsKeysAsync(singleton(1))); + } + + /** */ + public void testPut() { + checkOperation(() -> cache.put(1, 1)); + } + + /** */ + public void testPutAsync() { + checkOperation(() -> cache.putAsync(1, 1)); + } + + /** */ + public void testGetAndPut() { + checkOperation(() -> cache.getAndPut(1, 1)); + } + + /** */ + public void testGetAndPutAsync() { + checkOperation(() -> cache.getAndPutAsync(1, 1)); + } + + /** */ + public void testPutAll() { + checkOperation(() -> cache.putAll(ImmutableMap.of(1, 1))); + } + + /** */ + public void testPutAllAsync() { + checkOperation(() -> cache.putAllAsync(ImmutableMap.of(1, 1))); + } + + /** */ + public void testPutIfAbsent() { + checkOperation(() -> cache.putIfAbsent(1, 1)); + } + + /** */ + public void testPutIfAbsentAsync() { + checkOperation(() -> cache.putIfAbsentAsync(1, 1)); + } + + /** */ + public void testRemove1() { + checkOperation(() -> cache.remove(1)); + } + + /** */ + public void testRemoveAsync1() { + checkOperation(() -> cache.removeAsync(1)); + } + + /** */ + public void testRemove2() { + checkOperation(() -> cache.remove(1, 1)); + } + + /** */ + public void testRemoveAsync2() { + checkOperation(() -> cache.removeAsync(1, 1)); + } + + /** */ + public void testGetAndRemove() { + checkOperation(() -> cache.getAndRemove(1)); + } + + /** */ + public void testGetAndRemoveAsync() { + checkOperation(() -> cache.getAndRemoveAsync(1)); + } + + /** */ + public void testReplace1() { + checkOperation(() -> cache.replace(1, 1, 1)); + } + + /** */ + public void testReplaceAsync1() { + checkOperation(() -> cache.replaceAsync(1, 1, 1)); + } + + /** */ + public void testReplace2() { + checkOperation(() -> cache.replace(1, 1)); + } + + /** */ + public void testReplaceAsync2() { + checkOperation(() -> cache.replaceAsync(1, 1)); + } + + /** */ + public void testGetAndReplace() { + checkOperation(() -> cache.getAndReplace(1, 1)); + } + + /** */ + public void testGetAndReplaceAsync() { + checkOperation(() -> cache.getAndReplaceAsync(1, 1)); + } + + /** */ + public void testRemoveAll1() { + checkOperation(() -> cache.removeAll(singleton(1))); + } + + /** */ + public void testRemoveAllAsync1() { + checkOperation(() -> cache.removeAllAsync(singleton(1))); + } + + /** */ + public void testInvoke1() { + checkOperation(() -> cache.invoke(1, testEntryProcessor)); + } + + /** */ + public void testInvokeAsync1() { + checkOperation(() -> cache.invokeAsync(1, testEntryProcessor)); + } + + /** */ + public void testInvoke2() { + checkOperation(() -> cache.invoke(1, testEntryProcessor)); + } + + /** */ + public void testInvokeAsync2() { + checkOperation(() -> cache.invokeAsync(1, testEntryProcessor)); + } + + /** */ + public void testInvokeAll1() { + checkOperation(() -> cache.invokeAll(singleton(1), testEntryProcessor)); + } + + /** */ + public void testInvokeAllAsync1() { + checkOperation(() -> cache.invokeAllAsync(singleton(1), testEntryProcessor)); + } + + /** */ + public void testInvokeAll2() { + checkOperation(() -> cache.invokeAll(singleton(1), testEntryProcessor)); + } + + /** */ + public void testInvokeAllAsync2() { + checkOperation(() -> cache.invokeAllAsync(singleton(1), testEntryProcessor)); + } + + /** */ + public void testInvokeAll3() { + checkOperation(() -> cache.invokeAll(Collections.singletonMap(1, testEntryProcessor))); + } + + /** */ + public void testInvokeAllAsync3() { + checkOperation(() -> cache.invokeAllAsync(Collections.singletonMap(1, testEntryProcessor))); + } + + /** + * @param action Action. + */ + private void checkOperation(Runnable action) { + assertNotSupportedInTx(action, OPTIMISTIC, READ_COMMITTED); + assertNotSupportedInTx(action, OPTIMISTIC, REPEATABLE_READ); + assertNotSupportedInTx(action, OPTIMISTIC, SERIALIZABLE); + assertNotSupportedInTx(action, PESSIMISTIC, READ_COMMITTED); + assertNotSupportedInTx(action, PESSIMISTIC, SERIALIZABLE); + } + + /** */ + private void assertNotSupportedInTx(Runnable action, TransactionConcurrency conc, TransactionIsolation iso) { + try (Transaction ignored = grid(0).transactions().txStart(conc, iso)) { + action.run(); + + fail("Action failure is expected."); + } + catch (TransactionException e) { + assertEquals("Only pessimistic repeatable read transactions are supported when MVCC is enabled.", e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4074a9b9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index a522cbd..d4b837c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTes import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; @@ -62,6 +63,8 @@ public class IgniteCacheMvccTestSuite extends TestSuite { suite.addTestSuite(CacheMvccRemoteTxOnNearNodeStartTest.class); + suite.addTestSuite(MvccUnsupportedTxModesTest.class); + suite.addTestSuite(MvccCachePeekTest.class); suite.addTestSuite(MvccIgniteCacheTxPeekModesTest.class);