IGNITE-2313 Need to add a mode to fail atomic operations within a transaction. - Fixes #1709.
Signed-off-by: Dmitriy Pavlov <dpav...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1dcb540 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1dcb540 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1dcb540 Branch: refs/heads/ignite-8446 Commit: a1dcb540b2356c713f0da954a08125ee4c478944 Parents: f790165 Author: Dmitrii Ryabov <somefire...@gmail.com> Authored: Tue Jun 26 16:49:12 2018 +0300 Committer: Dmitriy Pavlov <dpav...@apache.org> Committed: Tue Jun 26 16:49:49 2018 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 7 + .../apache/ignite/IgniteSystemProperties.java | 5 + .../processors/cache/CacheOperationContext.java | 73 ++++-- .../cache/GatewayProtectedCacheProxy.java | 26 ++- .../processors/cache/GridCacheAdapter.java | 66 +++++- .../processors/cache/GridCacheGateway.java | 29 ++- .../processors/cache/GridCacheProxyImpl.java | 25 +- .../processors/cache/IgniteCacheProxy.java | 3 + .../processors/cache/IgniteCacheProxyImpl.java | 7 +- .../processors/cache/IgniteInternalCache.java | 5 + .../IgniteClientReconnectCacheTest.java | 8 +- .../cache/CacheReadThroughRestartSelfTest.java | 2 +- ...idAbstractCacheInterceptorRebalanceTest.java | 2 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 20 +- .../GridCacheAbstractLocalStoreSelfTest.java | 25 +- .../cache/GridCacheAbstractSelfTest.java | 2 +- ...idCacheValueConsistencyAbstractSelfTest.java | 6 +- .../IgniteCacheConfigVariationsFullApiTest.java | 2 +- .../IgniteStartCacheInTransactionSelfTest.java | 25 +- ...plicatedAtomicCacheGetsDistributionTest.java | 6 +- .../CacheGetInsideLockChangingTopologyTest.java | 10 +- .../GridCacheAbstractNodeRestartSelfTest.java | 4 +- .../IgniteCacheTxIteratorSelfTest.java | 2 +- ...idCacheNearOnlyMultiNodeFullApiSelfTest.java | 5 +- .../near/GridNearCacheStoreUpdateTest.java | 8 +- ...lientAffinityAssignmentWithBaselineTest.java | 2 +- .../db/wal/reader/IgniteWalReaderTest.java | 3 +- .../transactions/AtomicOperationsInTxTest.java | 227 +++++++++++++++++++ .../TxOptimisticDeadlockDetectionTest.java | 2 +- .../TxPessimisticDeadlockDetectionTest.java | 2 +- .../testframework/junits/GridAbstractTest.java | 1 + .../junits/common/GridCommonAbstractTest.java | 4 +- .../multijvm/IgniteCacheProcessProxy.java | 5 + .../ignite/testsuites/IgniteBasicTestSuite.java | 3 + .../cache/hibernate/HibernateCacheProxy.java | 5 + .../query/h2/DmlStatementsProcessor.java | 2 +- .../cache/IgniteCacheGroupsSqlTest.java | 4 +- .../ExpiryCacheHolderTest.cs | 7 + .../Cache/CacheTestAsyncWrapper.cs | 13 +- .../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 12 + .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 34 ++- .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 2 +- 42 files changed, 605 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index b8d1aae..7de830a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -177,6 +177,13 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public <K1, V1> IgniteCache<K1, V1> withKeepBinary(); /** + * Reasonable only for atomic caches. + * + * @return Cache with atomic operations allowed in transactions. + */ + public <K1, V1> IgniteCache<K1, V1> withAllowAtomicOpsInTx(); + + /** * Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes. * * @param p Optional predicate (may be {@code null}). If provided, will be used to http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 70b35bf..a7d9199 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -335,6 +335,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED = "IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED"; /** + * Flag indicating whether atomic operations allowed for use inside transactions. + */ + public static final String IGNITE_ALLOW_ATOMIC_OPS_IN_TX = "IGNITE_ALLOW_ATOMIC_OPS_IN_TX"; + + /** * Atomic cache deferred update response buffer size. */ public static final String IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE = "IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE"; http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index eb2b902..8a7afe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -20,15 +20,23 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_ATOMIC_OPS_IN_TX; + /** * Cache operation context. */ public class CacheOperationContext implements Serializable { /** */ + //TODO IGNITE-8801 remove this and set default as `false`. + public static final boolean DFLT_ALLOW_ATOMIC_OPS_IN_TX = + IgniteSystemProperties.getBoolean(IGNITE_ALLOW_ATOMIC_OPS_IN_TX, true); + + /** */ private static final long serialVersionUID = 0L; /** Skip store. */ @@ -48,6 +56,9 @@ public class CacheOperationContext implements Serializable { /** Keep binary flag. */ private final boolean keepBinary; + /** Allow atomic cache in transaction. */ + private final boolean allowAtomicOpsInTx; + /** Expiry policy. */ private final ExpiryPolicy expiryPlc; @@ -71,6 +82,8 @@ public class CacheOperationContext implements Serializable { recovery = false; dataCenterId = null; + + allowAtomicOpsInTx = DFLT_ALLOW_ATOMIC_OPS_IN_TX; } /** @@ -87,7 +100,8 @@ public class CacheOperationContext implements Serializable { @Nullable ExpiryPolicy expiryPlc, boolean noRetries, @Nullable Byte dataCenterId, - boolean recovery + boolean recovery, + boolean allowAtomicOpsInTx ) { this.skipStore = skipStore; @@ -102,6 +116,8 @@ public class CacheOperationContext implements Serializable { this.dataCenterId = dataCenterId; this.recovery = recovery; + + this.allowAtomicOpsInTx = allowAtomicOpsInTx; } /** @@ -131,7 +147,8 @@ public class CacheOperationContext implements Serializable { expiryPlc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -166,7 +183,8 @@ public class CacheOperationContext implements Serializable { expiryPlc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -190,7 +208,8 @@ public class CacheOperationContext implements Serializable { expiryPlc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -214,7 +233,8 @@ public class CacheOperationContext implements Serializable { plc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -229,7 +249,8 @@ public class CacheOperationContext implements Serializable { expiryPlc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -238,13 +259,14 @@ public class CacheOperationContext implements Serializable { */ public CacheOperationContext setDataCenterId(byte dataCenterId) { return new CacheOperationContext( - skipStore, - subjId, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery); + skipStore, + subjId, + keepBinary, + expiryPlc, + noRetries, + dataCenterId, + recovery, + allowAtomicOpsInTx); } /** @@ -259,7 +281,8 @@ public class CacheOperationContext implements Serializable { expiryPlc, noRetries, dataCenterId, - recovery); + recovery, + allowAtomicOpsInTx); } /** @@ -276,6 +299,28 @@ public class CacheOperationContext implements Serializable { return noRetries; } + /** + * @return Operation context. + */ + public CacheOperationContext setAllowAtomicOpsInTx() { + return new CacheOperationContext( + skipStore, + subjId, + keepBinary, + expiryPlc, + noRetries, + dataCenterId, + recovery, + true); + } + + /** + * @return Allow in transactions flag. + */ + public boolean allowedAtomicOpsInTx() { + return allowAtomicOpsInTx; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheOperationContext.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 94c6012..33ea048 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -175,6 +175,23 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite } /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAllowAtomicOpsInTx() { + CacheOperationGate opGate = onEnter(); + + try { + boolean allowed = opCtx.allowedAtomicOpsInTx(); + + if (allowed) + return this; + + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setAllowAtomicOpsInTx(), lock); + } + finally { + onLeave(opGate); + } + } + + /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> withNoRetries() { CacheOperationGate opGate = onEnter(); @@ -316,7 +333,14 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public Lock lock(K key) { - return delegate.lock(key); + CacheOperationGate opGate = onEnter(); + + try { + return delegate.lock(key); + } + finally { + onLeave(opGate); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b6679df..1f0d270 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -147,6 +147,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_KEY_VALIDATION_DISABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; +import static org.apache.ignite.internal.processors.cache.CacheOperationContext.DFLT_ALLOW_ATOMIC_OPS_IN_TX; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; @@ -485,7 +486,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public final GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { - CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext( + false, + subjId, + false, + null, + false, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -497,14 +506,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public final GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { - CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext( + true, + null, + false, + null, + false, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() { - CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext( + false, + null, + true, + null, + false, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); } @@ -518,14 +543,45 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) { assert !CU.isUtilityCache(ctx.name()); - CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext( + false, + null, + false, + plc, + false, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public final IgniteInternalCache<K, V> withNoRetries() { - CacheOperationContext opCtx = new CacheOperationContext(false, null, false, null, true, null, false); + CacheOperationContext opCtx = new CacheOperationContext( + false, + null, + false, + null, + true, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); + + return new GridCacheProxyImpl<>(ctx, this, opCtx); + } + + /** {@inheritDoc} */ + @Override public final IgniteInternalCache<K, V> withAllowAtomicOpsInTx() { + CacheOperationContext opCtx = new CacheOperationContext( + false, + null, + false, + null, + false, + null, + false, + DFLT_ALLOW_ATOMIC_OPS_IN_TX); return new GridCacheProxyImpl<>(ctx, this, opCtx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 658ca2a..9a8ce7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -107,7 +107,7 @@ public class GridCacheGateway<K, V> { * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotStopped() { - onEnter(); + onEnter(null); // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop. rwLock.readLock().lock(); @@ -121,7 +121,7 @@ public class GridCacheGateway<K, V> { * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotStoppedNoLock() { - onEnter(); + onEnter(null); return checkState(false, false); } @@ -170,7 +170,7 @@ public class GridCacheGateway<K, V> { ctx.name() + "]", e); } - onEnter(); + onEnter(opCtx); Lock lock = rwLock.readLock(); @@ -195,7 +195,7 @@ public class GridCacheGateway<K, V> { * @return Previous operation context set on this thread. */ @Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) { - onEnter(); + onEnter(opCtx); checkState(false, false); @@ -244,13 +244,16 @@ public class GridCacheGateway<K, V> { } /** - * + * @param opCtx Cache operation context. */ - private void onEnter() { + private void onEnter(CacheOperationContext opCtx) { ctx.itHolder().checkWeakQueue(); if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); + + if (opCtx != null) + checkAtomicOpsInTx(opCtx); } /** @@ -343,4 +346,18 @@ public class GridCacheGateway<K, V> { /** */ STOPPED } + + /** + * Checks if this operation is available to be used in transaction. + * + * @throws IgniteException - in case of atomic operation inside transaction without permission. + */ + private void checkAtomicOpsInTx(CacheOperationContext opCtx) throws IgniteException { + if (ctx.atomic() && !opCtx.allowedAtomicOpsInTx()) { + if (ctx.grid().transactions().tx() != null) { + throw new IgniteException("Transaction spans operations on atomic cache " + + "(don't use atomic cache inside transaction or set up flag by cache.allowedAtomicOpsInTx())."); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 30edbea..fb0a556 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -52,6 +52,8 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.CacheOperationContext.DFLT_ALLOW_ATOMIC_OPS_IN_TX; + /** * Cache proxy. */ @@ -237,7 +239,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { return new GridCacheProxyImpl<>(ctx, delegate, opCtx != null ? opCtx.forSubjectId(subjId) : - new CacheOperationContext(false, subjId, false, null, false, null, false)); + new CacheOperationContext(false, subjId, false, null, false, null, false, DFLT_ALLOW_ATOMIC_OPS_IN_TX)); } /** {@inheritDoc} */ @@ -250,7 +252,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return new GridCacheProxyImpl<>(ctx, delegate, opCtx != null ? opCtx.setSkipStore(skipStore) : - new CacheOperationContext(true, null, false, null, false, null, false)); + new CacheOperationContext(true, null, false, null, false, null, false, DFLT_ALLOW_ATOMIC_OPS_IN_TX)); } finally { gate.leave(prev); @@ -265,7 +267,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)delegate, - opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false, null, false)); + opCtx != null ? opCtx.keepBinary() : + new CacheOperationContext(false, null, true, null, false, null, false, DFLT_ALLOW_ATOMIC_OPS_IN_TX)); } /** {@inheritDoc} */ @@ -1556,7 +1559,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte try { return new GridCacheProxyImpl<>(ctx, delegate, opCtx != null ? opCtx.withExpiryPolicy(plc) : - new CacheOperationContext(false, null, false, plc, false, null, false)); + new CacheOperationContext(false, null, false, plc, false, null, false, DFLT_ALLOW_ATOMIC_OPS_IN_TX)); } finally { gate.leave(prev); @@ -1569,7 +1572,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte try { return new GridCacheProxyImpl<>(ctx, delegate, - new CacheOperationContext(false, null, false, null, true, null, false)); + new CacheOperationContext(false, null, false, null, true, null, false, DFLT_ALLOW_ATOMIC_OPS_IN_TX)); } finally { gate.leave(prev); @@ -1589,6 +1592,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalCache<K, V> withAllowAtomicOpsInTx() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return new GridCacheProxyImpl<>(ctx, delegate, opCtx.setAllowAtomicOpsInTx()); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); out.writeObject(delegate); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index c333125..ccf5cb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -88,6 +88,9 @@ public interface IgniteCacheProxy<K, V> extends IgniteCache<K, V>, Externalizabl */ public IgniteCache<K, V> skipStore(); + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAllowAtomicOpsInTx(); + /** * @return Internal proxy. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 2a82848..28553b9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -347,7 +347,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Lock lockAll(final Collection<? extends K> keys) { - return new CacheLockImpl<>(ctx.gate(), delegate, new CacheOperationContext(), keys); + return new CacheLockImpl<>(ctx.gate(), delegate, ctx.operationContextPerCall(), keys); } /** {@inheritDoc} */ @@ -1709,6 +1709,11 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAllowAtomicOpsInTx() { + throw new UnsupportedOperationException(); + } + /** * Method converts exception to IgniteCacheRestartingException in case of cache restarting * or to CacheException in other cases. http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index d01d536..bbedef8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1697,6 +1697,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalCache<K, V> withNoRetries(); /** + * @return New projection based on this one, but with atomic cache operations allowed to be used. + */ + public <K1, V1> IgniteInternalCache<K1, V1> withAllowAtomicOpsInTx(); + + /** * @param key Key. * @param entryProcessor Entry processor. * @param args Arguments. http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 3b578db..a975101 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -165,9 +165,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi(); - final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + .withAllowAtomicOpsInTx(); - final IgniteCache<Object, Object> staticCache = client.cache(STATIC_CACHE); + final IgniteCache<Object, Object> staticCache = client.cache(STATIC_CACHE).withAllowAtomicOpsInTx(); staticCache.put(1, 1); @@ -178,7 +179,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setName("nearCache"); - final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()); + final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()) + .withAllowAtomicOpsInTx(); nearCache.put(1, 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java index 8d7af84..422ed58 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java @@ -112,7 +112,7 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { Ignite ignite = grid(1); - cache = ignite.cache(DEFAULT_CACHE_NAME); + cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) { for (TransactionIsolation txIsolation : TransactionIsolation.values()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java index 99cf1f1..611f42b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java @@ -204,7 +204,7 @@ public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridComm final IgniteEx ignite = startGrid(1); - final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME); + final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME).withAllowAtomicOpsInTx(); for (int i = 0; i < CNT; i++) cache.put(i, i); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index e5df2c8..9ae9f8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -310,9 +310,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract storeStgy.removeFromStore(key); - try (final Transaction transaction = grid(0).transactions().txStart()) { - IgniteCache<String, Integer> cache = jcache(0); + IgniteCache<String, Integer> cache = jcache(0); + try (final Transaction transaction = grid(0).transactions().txStart()) { // retrieve market type from the grid Integer old = cache.withSkipStore().get(key); @@ -4029,7 +4029,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract private void checkPeekTxRemove(TransactionConcurrency concurrency) throws Exception { if (txShouldBeUsed()) { Ignite ignite = primaryIgnite("key"); - IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); cache.put("key", 1); @@ -5310,7 +5310,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME); + IgniteCache<String, Integer> cache = jcache(0); IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); @@ -5522,7 +5522,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373. return; - IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME); + IgniteCache<String, Integer> cache = jcache(0); IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); @@ -5887,7 +5887,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract }; try { - IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME); + IgniteCache<String, Integer> cache = jcache(0); List<String> keys = primaryKeysForCache(cache, 2); @@ -5972,8 +5972,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract log, new Callable<Object>() { @Override public Object call() throws Exception { + IgniteCache<String, Integer> cache = jcache(0); + try (Transaction tx = ignite(0).transactions().txStart()) { - jcache(0).lock("key").lock(); + cache.lock("key").lock(); } return null; @@ -5987,8 +5989,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract log, new Callable<Object>() { @Override public Object call() throws Exception { + IgniteCache<String, Integer> cache = jcache(0); + try (Transaction tx = ignite(0).transactions().txStart()) { - jcache(0).lockAll(Arrays.asList("key1", "key2")).lock(); + cache.lockAll(Arrays.asList("key1", "key2")).lock(); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java index 9fe714f..bc1996b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@ -341,16 +341,18 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst assertEquals(0, LOCAL_STORE_1.map.size()); assertEquals(0, LOCAL_STORE_2.map.size()); + IgniteCache<Integer, Integer> cache = ignite1.cache(name).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite1.transactions().txStart()) { - ignite1.cache(name).put(key1, key1); - ignite1.cache(name).put(key2, key2); + cache.put(key1, key1); + cache.put(key2, key2); Map<Integer, Integer> m = new HashMap<>(); for (int i = KEYS; i < KEYS + 100; i++) m.put(i, i); - ignite1.cache(name).putAll(m); + cache.putAll(m); tx.commit(); } @@ -446,11 +448,13 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst for (int i = 0; i < KEYS; i++) { Ignite ignite = grid(rn.nextInt(6) + 1); + IgniteCache<Integer, Integer> cache = ignite.cache(name).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart()) { - ignite.cache(name).put(i, i); + cache.put(i, i); for (int j = 0; j < 5; j++) - ignite.cache(name).get(rn.nextInt(KEYS)); + cache.get(rn.nextInt(KEYS)); Map<Integer, Integer> m = new HashMap<>(5); @@ -460,7 +464,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst m.put(key, key); } - ignite.cache(name).putAll(m); + cache.putAll(m); tx.commit(); } @@ -567,8 +571,10 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst m.put(kB, kB); m.put(kN, kN); + IgniteCache<Integer, Integer> cache = grid(i).cache(BACKUP_CACHE_1).withAllowAtomicOpsInTx(); + try (Transaction tx = grid(i).transactions().txStart()) { - grid(i).cache(BACKUP_CACHE_1).putAll(m); + cache.putAll(m); tx.commit(); } @@ -585,6 +591,9 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst Random rn = new Random(); for (int i = 1; i <= 3; i++) { + IgniteCache<Integer, Integer> cache = grid(i).cache(BACKUP_CACHE_1) + .withSkipStore().withAllowAtomicOpsInTx(); + try (Transaction tx = grid(i).transactions().txStart()) { Map<Integer, Integer> m = new HashMap<>(3); @@ -592,7 +601,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst m.put(rn.nextInt(1000), 1000); } - grid(i).cache(BACKUP_CACHE_1).withSkipStore().putAll(m); + cache.putAll(m); tx.commit(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index 7a35999..16e7de4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -369,7 +369,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { */ @SuppressWarnings({"unchecked"}) @Override protected IgniteCache<String, Integer> jcache(int idx) { - return ignite(idx).cache(DEFAULT_CACHE_NAME); + return ignite(idx).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index e068252..19f98ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -124,7 +124,8 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach info("Node is reported as NOT affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); - if (nearEnabled() && cache.equals(cache0)) + if (nearEnabled() && + ((IgniteCacheProxy)cache).context().equals(((IgniteCacheProxy)cache0).context())) assertEquals((Integer)i, cache0.localPeek(key)); else assertNull(cache0.localPeek(key)); @@ -184,7 +185,8 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach info("Node is reported as NOT affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); - if (nearEnabled() && cache.equals(cache0)) + if (nearEnabled() && + ((IgniteCacheProxy)cache).context().equals(((IgniteCacheProxy)cache0).context())) assertEquals((Integer)i, cache0.localPeek(key)); else assertNull(cache0.localPeek(key)); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index 8d5462d..3ffdd65 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -5885,7 +5885,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar }; try { - IgniteCache<String, Integer> cache = grid(0).cache(cacheName()); + IgniteCache<String, Integer> cache = grid(0).cache(cacheName()).withAllowAtomicOpsInTx(); List<String> keys = primaryKeysForCache(0, 2, 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java index 3fa81e0..b037a7b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; @@ -95,8 +96,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -119,8 +122,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -143,8 +148,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -167,8 +174,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -191,8 +200,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -215,8 +226,10 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes final String key = "key"; final String val = "val"; + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){ - ignite.cache(DEFAULT_CACHE_NAME).put(key, val); + cache.put(key, val); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java index 1d0c6de..1aaea76 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java @@ -145,7 +145,8 @@ public class ReplicatedAtomicCacheGetsDistributionTest extends GridCacheAbstract for (Integer key : keys) cache.put(key, VAL_PREFIX + key); - IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME); + IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME) + .withAllowAtomicOpsInTx(); assertTrue(GridTestUtils.waitForCondition( new GridAbsPredicate() { @@ -246,7 +247,8 @@ public class ReplicatedAtomicCacheGetsDistributionTest extends GridCacheAbstract for (Integer key : keys) cache.put(key, VAL_PREFIX + key); - IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME); + IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME) + .withAllowAtomicOpsInTx(); try (Transaction tx = grid(CLIENT_NAME).transactions().txStart()) { if (batchMode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java index 944b8b9..80aa9ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java @@ -317,9 +317,9 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe * @throws Exception If failed. */ private void getInsideTxStopPrimary(Ignite ignite, String cacheName) throws Exception { - IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1); + IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1).withAllowAtomicOpsInTx(); - IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName); + IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName).withAllowAtomicOpsInTx(); final int NEW_NODE = SRVS + CLIENTS; @@ -407,9 +407,9 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe Ignite ignite = ignite(node); - IgniteCache<Integer, Integer> txCache1 = ignite.cache(TX_CACHE1); - IgniteCache<Integer, Integer> txCache2 = ignite.cache(TX_CACHE2); - IgniteCache<Integer, Integer> atomicCache = ignite.cache(ATOMIC_CACHE); + IgniteCache<Integer, Integer> txCache1 = ignite.cache(TX_CACHE1).withAllowAtomicOpsInTx(); + IgniteCache<Integer, Integer> txCache2 = ignite.cache(TX_CACHE2).withAllowAtomicOpsInTx(); + IgniteCache<Integer, Integer> atomicCache = ignite.cache(ATOMIC_CACHE).withAllowAtomicOpsInTx(); ThreadLocalRandom rnd = ThreadLocalRandom.current(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java index b3cac1c..8e69e43 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java @@ -737,7 +737,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs UUID locNodeId = ignite.cluster().localNode().id(); - IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME); + IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME).withAllowAtomicOpsInTx(); List<Integer> keys = new ArrayList<>(txKeys); @@ -894,7 +894,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs UUID locNodeId = ignite.cluster().localNode().id(); - IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME); + IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME).withAllowAtomicOpsInTx(); List<Integer> keys = new ArrayList<>(txKeys); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java index 605f2a8..dfc8b05 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java @@ -137,7 +137,7 @@ public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest { nearEnabled, useEvicPlc); - final IgniteCache<String, TestClass> cache = ignite.createCache(ccfg); + final IgniteCache<String, TestClass> cache = ignite.createCache(ccfg).withAllowAtomicOpsInTx(); info("Checking cache [mode=" + mode + ", atomMode=" + atomMode + ", near=" + nearEnabled + ", evict=" + useEvicPlc + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java index c21d706..ed436d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java @@ -39,6 +39,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.util.lang.GridAbsPredicateX; @@ -165,7 +166,9 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio /** {@inheritDoc} */ @Override protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt) throws IgniteCheckedException { - if (cache.equals(jcache())) + if (cache instanceof GatewayProtectedCacheProxy && + ((GatewayProtectedCacheProxy) cache).internalProxy().delegate().equals( + ((GatewayProtectedCacheProxy) jcache()).internalProxy().delegate())) return super.primaryKeysForCache(fullCache(), cnt); return super.primaryKeysForCache(cache, cnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java index 183b9ca..bf8ad78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java @@ -150,8 +150,8 @@ public class GridNearCacheStoreUpdateTest extends GridCommonAbstractTest { boolean tx = txConc != null && txIsolation != null; - final IgniteCache<String, String> clientCache = this.cache; - final IgniteCache<String, String> srvCache = srv.<String, String>cache(CACHE_NAME); + final IgniteCache<String, String> clientCache = this.cache.withAllowAtomicOpsInTx(); + final IgniteCache<String, String> srvCache = srv.<String, String>cache(CACHE_NAME).withAllowAtomicOpsInTx(); if (tx) { doInTransaction(client, txConc, txIsolation, new Callable<Object>() { @@ -278,8 +278,8 @@ public class GridNearCacheStoreUpdateTest extends GridCommonAbstractTest { data2.put(String.valueOf(i), "other"); } - final IgniteCache<String, String> clientCache = this.cache; - final IgniteCache<String, String> srvCache = srv.cache(CACHE_NAME); + final IgniteCache<String, String> clientCache = this.cache.withAllowAtomicOpsInTx(); + final IgniteCache<String, String> srvCache = srv.cache(CACHE_NAME).withAllowAtomicOpsInTx(); boolean tx = txConc != null && txIsolation != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java index 15ec415..7071f65 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java @@ -782,7 +782,7 @@ public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstract @Override public void run() { ThreadLocalRandom r = ThreadLocalRandom.current(); - IgniteCache<Integer, String> cache = ig.cache(cacheName); + IgniteCache<Integer, String> cache = ig.cache(cacheName).withAllowAtomicOpsInTx(); boolean pessimistic = r.nextBoolean(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 619cc00..72ee0c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -828,7 +828,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { final CacheConfiguration<Integer, Organization> cfg = new CacheConfiguration<>("Org" + "11"); cfg.setAtomicityMode(mode); - final IgniteCache<Integer, Organization> cache = ig.getOrCreateCache(cfg).withKeepBinary(); + final IgniteCache<Integer, Organization> cache = ig.getOrCreateCache(cfg).withKeepBinary() + .withAllowAtomicOpsInTx(); try (Transaction tx = ig.transactions().txStart()) { for (int i = 0; i < 10; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicOperationsInTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicOperationsInTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicOperationsInTxTest.java new file mode 100644 index 0000000..f12d790 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicOperationsInTxTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.HashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CachePeekMode.ALL; + +/** + * Checks how operations under atomic cache works inside a transaction. + */ +public class AtomicOperationsInTxTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(ATOMIC); + ccfg.setName(DEFAULT_CACHE_NAME); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAllowedAtomicOperations() throws Exception { + checkOperations(true); + } + + /** + * @throws Exception If failed. + */ + public void testNotAllowedAtomicOperations() throws Exception { + checkOperations(false); + } + + /** + * @param isAtomicCacheAllowedInTx If true - atomic operation allowed. + * Otherwise - it should throw exception. + */ + private void checkOperations(boolean isAtomicCacheAllowedInTx) { + HashMap<Integer, Integer> map = new HashMap<>(); + + map.put(1, 1); + map.put(2, 1); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.put(1, 1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.putAsync(1, 1).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.putAll(map)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.putAllAsync(map).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.putIfAbsent(1, 1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.putIfAbsentAsync(1, 1).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.get(1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAll(map.keySet())); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAllAsync(map.keySet()).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndPut(1, 2)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndPutAsync(1, 2).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndPutIfAbsent(1, 2)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndPutIfAbsentAsync(1, 2).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndRemove(1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndRemoveAsync(1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndReplace(1, 2)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.getAndReplaceAsync(1, 2).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.remove(1, 1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.removeAsync(1, 1).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.removeAll(map.keySet())); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.removeAllAsync(map.keySet()).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.containsKey(1)); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.containsKeyAsync(1).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.containsKeys(map.keySet())); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.containsKeysAsync(map.keySet()).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.invoke(1, new SetEntryProcessor())); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.invokeAsync(1, new SetEntryProcessor()).get()); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.invokeAll(map.keySet(), new SetEntryProcessor())); + + checkOperation(isAtomicCacheAllowedInTx, cache -> cache.invokeAllAsync(map.keySet(), + new SetEntryProcessor()).get()); + + checkLock(isAtomicCacheAllowedInTx); + } + + /** + * @param isAtomicCacheAllowedInTx If true - atomic operation allowed. + * Otherwise - it should throw exception. + * @param op Operation. + */ + private void checkOperation(boolean isAtomicCacheAllowedInTx, Consumer<IgniteCache<Integer, Integer>> op) { + IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME); + + if (isAtomicCacheAllowedInTx) + cache = cache.withAllowAtomicOpsInTx(); + + cache.clear(); + + assertEquals(0, cache.size(ALL)); + + IgniteException err = null; + + try (Transaction tx = grid(0).transactions().txStart()) { + op.accept(cache); + } catch (IgniteException e) { + err = e; + } + + if (isAtomicCacheAllowedInTx) + assertNull(err); + else + assertTrue(err != null && err.getMessage() + .startsWith("Transaction spans operations on atomic cache")); + } + + /** + * @param isAtomicCacheAllowedInTx If true - atomic operation allowed. + * Otherwise - it should throw exception. + */ + private void checkLock(boolean isAtomicCacheAllowedInTx) { + IgniteCache<Integer, Integer> cache; + Class<? extends Throwable> eCls; + String eMsg; + + if (isAtomicCacheAllowedInTx) { + cache = grid(0).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); + eCls = CacheException.class; + eMsg = "Explicit lock can't be acquired within a transaction."; + } else { + cache = grid(0).cache(DEFAULT_CACHE_NAME); + eCls = IgniteException.class; + eMsg = "Transaction spans operations on atomic cache"; + } + + Lock lock = cache.lock(1); + + GridTestUtils.assertThrows(log, (Callable<Void>)() -> { + try (Transaction tx = grid(0).transactions().txStart()) { + lock.lock(); + } + + return null; + }, eCls, eMsg); + } + + /** */ + private class SetEntryProcessor implements EntryProcessor<Integer, Integer, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Integer, Integer> entry, Object... objects) + throws EntryProcessorException { + entry.setValue(entry.getKey()); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java index 1dae853..4e47fa6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java @@ -296,7 +296,7 @@ public class TxOptimisticDeadlockDetectionTest extends AbstractDeadlockDetection Ignite ignite = ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1); - IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME); + IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME).withAllowAtomicOpsInTx(); List<Object> keys = keySets.get(threadNum - 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java index 32dbdc5..46651cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java @@ -262,7 +262,7 @@ public class TxPessimisticDeadlockDetectionTest extends AbstractDeadlockDetectio Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1); - IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME); + IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME).withAllowAtomicOpsInTx(); List<Object> keys = keySets.get(threadNum - 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 86f7866..728c88f 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -199,6 +199,7 @@ public abstract class GridAbstractTest extends TestCase { * */ static { + System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_ATOMIC_OPS_IN_TX, "false"); System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000"); System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false"); System.setProperty(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, "1"); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 6803ab3..967bdc1 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -144,7 +144,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @return Cache. */ protected <K, V> IgniteCache<K, V> jcache(int idx) { - return grid(idx).cache(DEFAULT_CACHE_NAME); + return grid(idx).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); } /** @@ -275,7 +275,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @return Cache. */ protected <K, V> IgniteCache<K, V> jcache() { - return grid().cache(DEFAULT_CACHE_NAME); + return grid().cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java index 68363c2..b46231b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -682,6 +682,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { throw new UnsupportedOperationException("Method should be supported."); } + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAllowAtomicOpsInTx() { + return this; + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index a1a45dc..08a77d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurre import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest; import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest; +import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorRemoteTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest; @@ -210,6 +211,8 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(OomFailureHandlerTest.class); suite.addTestSuite(AccountTransferTransactionTest.class); + suite.addTestSuite(AtomicOperationsInTxTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java index 0fc2c2d..70e55de 100644 --- a/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java +++ b/modules/hibernate-core/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java @@ -627,6 +627,11 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object> } /** {@inheritDoc} */ + @Override public <K1, V1> IgniteInternalCache<K1, V1> withAllowAtomicOpsInTx() { + return delegate.withAllowAtomicOpsInTx(); + } + + /** {@inheritDoc} */ @Override public GridCacheContext context() { return delegate.context(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 62dbd50..5270e7f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -299,7 +299,7 @@ public class DmlStatementsProcessor { if (opCtx == null) // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary - newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false); + newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false, true); else if (!opCtx.isKeepBinary()) newOpCtx = opCtx.keepBinary(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java index a7c81fd..617909d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java @@ -175,12 +175,12 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest { IgniteCache pers = srv0.createCache(personCacheConfiguration(grp1, "pers") .setAffinity(new RendezvousAffinityFunction().setPartitions(10)) .setCacheMode(cm1) - .setAtomicityMode(cam1)); + .setAtomicityMode(cam1)).withAllowAtomicOpsInTx(); IgniteCache acc = srv0.createCache(accountCacheConfiguration(grp2, "acc") .setAffinity(new RendezvousAffinityFunction().setPartitions(10)) .setCacheMode(cm2) - .setAtomicityMode(cam2)); + .setAtomicityMode(cam2)).withAllowAtomicOpsInTx(); try(Transaction tx = cam1 == TRANSACTIONAL || cam2 == TRANSACTIONAL ? srv0.transactions().txStart() : null) { for (int i = 0; i < keys; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/platforms/dotnet/Apache.Ignite.AspNet.Tests/ExpiryCacheHolderTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.AspNet.Tests/ExpiryCacheHolderTest.cs b/modules/platforms/dotnet/Apache.Ignite.AspNet.Tests/ExpiryCacheHolderTest.cs index 34d4b4d..b3d5228 100644 --- a/modules/platforms/dotnet/Apache.Ignite.AspNet.Tests/ExpiryCacheHolderTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.AspNet.Tests/ExpiryCacheHolderTest.cs @@ -108,6 +108,8 @@ namespace Apache.Ignite.AspNet.Tests public bool IsKeepBinary { get; private set; } + public bool IsAllowAtomicOpsInTx { get; private set; } + public ICache<int, int> WithSkipStore() { throw new NotImplementedException(); @@ -123,6 +125,11 @@ namespace Apache.Ignite.AspNet.Tests throw new NotImplementedException(); } + public ICache<int, int> WithAllowAtomicOpsInTx() + { + throw new NotImplementedException(); + } + public void LoadCache(ICacheEntryFilter<int, int> p, params object[] args) { throw new NotImplementedException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs index 0fe16a5..ce2d6c0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs @@ -78,6 +78,11 @@ namespace Apache.Ignite.Core.Tests.Cache } /** <inheritDoc /> */ + public bool IsAllowAtomicOpsInTx { + get { return _cache.IsAllowAtomicOpsInTx; } + } + + /** <inheritDoc /> */ public ICache<TK, TV> WithSkipStore() { return _cache.WithSkipStore().WrapAsync(); @@ -94,7 +99,13 @@ namespace Apache.Ignite.Core.Tests.Cache { return _cache.WithKeepBinary<TK1, TV1>().WrapAsync(); } - + + /** <inheritDoc /> */ + public ICache<TK, TV> WithAllowAtomicOpsInTx() + { + return _cache.WithAllowAtomicOpsInTx().WrapAsync(); + } + /** <inheritDoc /> */ public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a1dcb540/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs index 8f52e03..43ce3c6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs @@ -85,6 +85,11 @@ namespace Apache.Ignite.Core.Cache bool IsKeepBinary { get; } /// <summary> + /// Gets a value indicating whether to allow use atomic operations in transactions. + /// </summary> + bool IsAllowAtomicOpsInTx { get; } + + /// <summary> /// Get another cache instance with read-through and write-through behavior disabled. /// </summary> /// <returns>Cache with read-through and write-through behavior disabled.</returns> @@ -112,6 +117,13 @@ namespace Apache.Ignite.Core.Cache ICache<TK1, TV1> WithKeepBinary<TK1, TV1>(); /// <summary> + /// Get another cache instance with operations allowed in transactions. + /// Only atomic caches need this. Transactional caches already available for transactions. + /// </summary> + /// <returns>Cache allowed to use in transactions.</returns> + ICache<TK, TV> WithAllowAtomicOpsInTx(); + + /// <summary> /// Executes <see cref="LocalLoadCache"/> on all cache nodes. /// </summary> /// <param name="p">