Repository: ignite Updated Branches: refs/heads/master d3760ed79 -> 309162be3
IGNITE-8446 Ability to check and completely fill transactions on creation Signed-off-by: Anton Vinogradov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/309162be Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/309162be Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/309162be Branch: refs/heads/master Commit: 309162be35d1c0864982ad9cd5d17da107cb464d Parents: d3760ed Author: Anton Vinogradov <[email protected]> Authored: Mon Aug 6 12:56:22 2018 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Aug 6 12:56:22 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/events/EventType.java | 75 +++++ .../events/TransactionStateChangedEvent.java | 56 ++++ .../cache/transactions/IgniteTxAdapter.java | 59 +++- .../cache/transactions/IgniteTxManager.java | 5 +- .../transactions/TransactionEventProxyImpl.java | 220 +++++++++++++ .../TxRollbackOnIncorrectParamsTest.java | 326 +++++++++++++++++++ .../transactions/TxStateChangeEventTest.java | 234 +++++++++++++ .../testsuites/IgniteCacheTestSuite6.java | 4 + 8 files changed, 977 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index b0b410a..a6ab962 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -781,6 +781,66 @@ public interface EventType { public static final int EVT_WAL_SEGMENT_ARCHIVED = 128; /** + * Built-in event type: Transaction has been started. + * <p> + * Fired for each started transaction except system transactions. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see TransactionStateChangedEvent + */ + public static final int EVT_TX_STARTED = 129; + + /** + * Built-in event type: Transaction has been committed. + * <p> + * Fired for each committed transaction except system transactions. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see TransactionStateChangedEvent + */ + public static final int EVT_TX_COMMITTED = 130; + + /** + * Built-in event type: Transaction has been rolled back. + * <p> + * Fired for each rolled back transaction except system transactions. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see TransactionStateChangedEvent + */ + public static final int EVT_TX_ROLLED_BACK = 131; + + /** + * Built-in event type: Transaction has been suspended. + * <p> + * Fired for each suspended transaction except system transactions. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see TransactionStateChangedEvent + */ + public static final int EVT_TX_SUSPENDED = 132; + + /** + * Built-in event type: Transaction has been resumed. + * <p> + * Fired for each resumed transaction except system transactions. + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see TransactionStateChangedEvent + */ + public static final int EVT_TX_RESUMED = 133; + + /** * All checkpoint events. This array can be directly passed into * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to * subscribe to all checkpoint events. @@ -979,6 +1039,21 @@ public interface EventType { }; /** + * All Transaction events. This array can be directly passed into + * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to + * subscribe to all transaction events. + * + * @see TransactionStateChangedEvent + */ + public static final int[] EVTS_TX = { + EVT_TX_STARTED, + EVT_TX_COMMITTED, + EVT_TX_ROLLED_BACK, + EVT_TX_SUSPENDED, + EVT_TX_RESUMED + }; + + /** * All Ignite events (<b>including</b> metric update event). */ public static final int[] EVTS_ALL = U.gridEvents(); http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java new file mode 100644 index 0000000..cf6e099 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.transactions.Transaction; + +/** + * Event indicates transaction state change. + * + * @see EventType#EVTS_TX + */ +public class TransactionStateChangedEvent extends EventAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Tx. */ + private Transaction tx; + + /** + * @param node Node. + * @param msg Message. + * @param type Type. + * @param tx Tx. + */ + public TransactionStateChangedEvent(ClusterNode node, String msg, int type, Transaction tx) { + super(node, msg, type); + + assert tx != null; + + this.tx = tx; + } + + /** + * Provides transaction proxy allows all 'get' operations such as {@link Transaction#label()} + * and also {@link Transaction#setRollbackOnly()} method. + */ + public Transaction tx() { + return tx; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 15df637..6fdb046 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -43,8 +43,10 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.TransactionStateChangedEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.discovery.ConsistentIdMapper; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -60,13 +62,14 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.processors.cluster.BaselineTopology; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridSetWrapper; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -88,6 +91,10 @@ import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; +import static org.apache.ignite.events.EventType.EVT_TX_COMMITTED; +import static org.apache.ignite.events.EventType.EVT_TX_RESUMED; +import static org.apache.ignite.events.EventType.EVT_TX_ROLLED_BACK; +import static org.apache.ignite.events.EventType.EVT_TX_SUSPENDED; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; @@ -1095,6 +1102,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (log.isDebugEnabled()) log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); + recordStateChangedEvent(state); + notifyAll(); } else { @@ -1160,6 +1169,54 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return valid; } + /** */ + private void recordStateChangedEvent(TransactionState state){ + if (!near() || !local()) // Covers only GridNearTxLocal's state changes. + return; + + switch (state) { + case ACTIVE: { + recordStateChangedEvent(EVT_TX_RESUMED); + + break; + } + + case COMMITTED: { + recordStateChangedEvent(EVT_TX_COMMITTED); + + break; + } + + case ROLLED_BACK: { + recordStateChangedEvent(EVT_TX_ROLLED_BACK); + + break; + } + + case SUSPENDED: { + recordStateChangedEvent(EVT_TX_SUSPENDED); + + break; + } + } + } + + /** + * @param type Event type. + */ + protected void recordStateChangedEvent(int type){ + assert near() && local(); + + GridEventStorageManager evtMgr = cctx.gridEvents(); + + if (!system() /* ignoring system tx */ && evtMgr.isRecordable(type)) + evtMgr.record(new TransactionStateChangedEvent( + cctx.discovery().localNode(), + "Transaction state changed.", + type, + new TransactionEventProxyImpl((GridNearTxLocal)this))); + } + /** {@inheritDoc} */ @Override public void endVersion(GridCacheVersion endVer) { this.endVer = endVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index ec9a5c4..ffaaf43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -51,9 +51,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; @@ -101,6 +101,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.events.EventType.EVT_TX_STARTED; import static org.apache.ignite.internal.GridTopic.TOPIC_TX; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; @@ -532,6 +533,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { else sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); } + + ((GridNearTxLocal)tx).recordStateChangedEvent(EVT_TX_STARTED); } // Handle mapped versions. http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java new file mode 100644 index 0000000..f6bc668 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.events.TransactionStateChangedEvent; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.lang.IgniteAsyncSupport; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; +import org.jetbrains.annotations.Nullable; + +/** + * Transaction proxy used at tx events. + * + * @see TransactionStateChangedEvent + */ +public class TransactionEventProxyImpl implements TransactionProxy, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Unsupported operation message. */ + private static final String UNSUPPORTED_OPERATION_MESSAGE = "All operations changing transaction's params or " + + "state are restricted inside event listener. " + + "The only exception is setRollbackOnly(), use it in case transaction should be rolled back."; + + /** Xid. */ + private IgniteUuid xid; + + /** Tx. */ + private GridNearTxLocal tx; + + /** Proxy. */ + private TransactionProxy proxy; + + /** + * Default constructor (required by Externalizable). + */ + public TransactionEventProxyImpl() { + } + + /** + * @param tx Tx proxy. + */ + public TransactionEventProxyImpl(GridNearTxLocal tx) { + assert tx != null; + + this.tx = tx; + this.xid = tx.xid(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid xid() { + return xid; + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return tx().nodeId(); + } + + /** {@inheritDoc} */ + @Override public long threadId() { + return tx().threadId(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return tx().startTime(); + } + + /** {@inheritDoc} */ + @Override public TransactionIsolation isolation() { + return tx().isolation(); + } + + /** {@inheritDoc} */ + @Override public TransactionConcurrency concurrency() { + return tx().concurrency(); + } + + /** {@inheritDoc} */ + @Override public boolean implicit() { + return tx().implicit(); + } + + /** {@inheritDoc} */ + @Override public boolean isInvalidate() { + return tx().isInvalidate(); + } + + /** {@inheritDoc} */ + @Override public TransactionState state() { + return tx().state(); + } + + /** {@inheritDoc} */ + @Override public long timeout() { + return tx().timeout(); + } + + /** {@inheritDoc} */ + @Override public long timeout(long timeout) { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + return tx().setRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public boolean isRollbackOnly() { + return tx().isRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public void commit() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public void rollback() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException { + throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE); + } + + /** {@inheritDoc} */ + @Nullable @Override public String label() { + return tx().label(); + } + + /** {@inheritDoc} */ + @Override public IgniteAsyncSupport withAsync() { + throw new UnsupportedOperationException("Operation deprecated."); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + throw new UnsupportedOperationException("Operation deprecated."); + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throw new UnsupportedOperationException("Operation deprecated."); + } + + /** + * @return local transaction + * @throws IgniteException in case tx was not found. + */ + private TransactionProxy tx() throws IgniteException { + if (tx == null) + throw new IgniteException("Operation allowed only inside remote filter or " + + "inside local listener registered on originating node. " + + "Only xid() operation allowed in other case. "); + + if (proxy == null) + proxy = tx.proxy(); + + return proxy; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(xid); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + xid = (IgniteUuid)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java new file mode 100644 index 0000000..8aafa8b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.TransactionStateChangedEvent; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.events.EventType.EVT_TX_STARTED; + +/** + * Tests transaction rollback on incorrect tx params. + */ +public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { + /** + * + */ + public void testTimeoutSetLocalGuarantee() throws Exception { + Ignite ignite = startGrid(0); + + ignite.events().localListen((IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + if (tx.timeout() < 200) + tx.setRollbackOnly(); + + return true; + }, EVT_TX_STARTED); + + IgniteCache cache = ignite.getOrCreateCache(defaultCacheConfiguration()); + + try (Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 200, 2)) { + cache.put(1, 1); + + tx.commit(); + } + + try (Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { + cache.put(1, 2); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + + try (Transaction tx = ignite.transactions().txStart()) { + cache.put(1, 3); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + } + + /** + * + */ + public void testLabelFilledLocalGuarantee() throws Exception { + Ignite ignite = startGrid(0); + + ignite.events().localListen((IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + if (tx.label() == null) + tx.setRollbackOnly(); + + return true; + }, EVT_TX_STARTED); + + IgniteCache cache = ignite.getOrCreateCache(defaultCacheConfiguration()); + + try (Transaction tx = ignite.transactions().withLabel("test").txStart()) { + cache.put(1, 1); + + tx.commit(); + } + + try (Transaction tx = ignite.transactions().txStart()) { + cache.put(1, 2); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + } + + /** + * + */ + public void testLabelFilledRemoteGuarantee() throws Exception { + Ignite ignite = startGrid(0); + Ignite remote = startGrid(1); + + IgniteCache cacheLocal = ignite.getOrCreateCache(defaultCacheConfiguration()); + IgniteCache cacheRemote = remote.getOrCreateCache(defaultCacheConfiguration()); + + ignite.events().remoteListen(null, + (IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + if (tx.label() == null) + tx.setRollbackOnly(); + + return true; + }, + EVT_TX_STARTED); + + try (Transaction tx = ignite.transactions().withLabel("test").txStart()) { + cacheLocal.put(1, 1); + + tx.commit(); + } + + try (Transaction tx = remote.transactions().withLabel("test").txStart()) { + cacheRemote.put(1, 2); + + tx.commit(); + } + + try (Transaction tx = ignite.transactions().txStart()) { + cacheLocal.put(1, 3); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + + try (Transaction tx = remote.transactions().txStart()) { + cacheRemote.put(1, 4); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + } + + /** + * + */ + public void testTimeoutSetRemoteGuarantee() throws Exception { + Ignite ignite = startGrid(0); + Ignite remote = startGrid(1); + + IgniteCache cacheLocal = ignite.getOrCreateCache(defaultCacheConfiguration()); + IgniteCache cacheRemote = remote.getOrCreateCache(defaultCacheConfiguration()); + + ignite.events().remoteListen(null, + (IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + if (tx.timeout() == 0) + tx.setRollbackOnly(); + + return true; + }, + EVT_TX_STARTED); + + try (Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { + cacheLocal.put(1, 1); + + tx.commit(); + } + + try (Transaction tx = remote.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { + cacheRemote.put(1, 2); + + tx.commit(); + } + + try (Transaction tx = ignite.transactions().txStart()) { + cacheLocal.put(1, 3); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + + try (Transaction tx = remote.transactions().txStart()) { + cacheRemote.put(1, 4); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + } + + /** + * + */ + public void testRollbackInsideLocalListenerAfterRemoteFilter() throws Exception { + Ignite ignite = startGrid(0); + Ignite remote = startGrid(1); + + IgniteCache cacheLocal = ignite.getOrCreateCache(defaultCacheConfiguration()); + IgniteCache cacheRemote = remote.getOrCreateCache(defaultCacheConfiguration()); + + AtomicBoolean rollbackFailed = new AtomicBoolean(); + AtomicBoolean alreadyRolledBack = new AtomicBoolean(); + + ignite.events().remoteListen( + (IgniteBiPredicate<UUID, Event>)(uuid, e) -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + try { + tx.setRollbackOnly(); + } + catch (IgniteException ignored) { + alreadyRolledBack.set(rollbackFailed.getAndSet(true)); + } + + return true; + }, + (IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + return true; + }, + EVT_TX_STARTED); + + assertFalse(rollbackFailed.get()); + assertFalse(alreadyRolledBack.get()); + + try (Transaction tx = ignite.transactions().txStart()) { + cacheLocal.put(1, 1); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (CacheException ignored) { + // No-op. + } + + assertFalse(rollbackFailed.get()); + assertFalse(alreadyRolledBack.get()); + + try (Transaction tx = remote.transactions().txStart()) { + cacheRemote.put(1, 2); + + tx.commit(); + } + + assertTrue(GridTestUtils.waitForCondition(rollbackFailed::get, 5_000)); + + assertFalse(alreadyRolledBack.get()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java new file mode 100644 index 0000000..01c87ae --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.TransactionStateChangedEvent; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; + +import static org.apache.ignite.events.EventType.EVTS_TX; +import static org.apache.ignite.events.EventType.EVT_TX_COMMITTED; +import static org.apache.ignite.events.EventType.EVT_TX_RESUMED; +import static org.apache.ignite.events.EventType.EVT_TX_ROLLED_BACK; +import static org.apache.ignite.events.EventType.EVT_TX_STARTED; +import static org.apache.ignite.events.EventType.EVT_TX_SUSPENDED; + +/** + * Tests transaction state change event. + */ +public class TxStateChangeEventTest extends GridCommonAbstractTest { + /** Label. */ + private final String lb = "testLabel"; + + /** Timeout. */ + private final long timeout = 404; + + /** Creation. */ + private static AtomicBoolean creation = new AtomicBoolean(); + + /** Commit. */ + private static AtomicBoolean commit = new AtomicBoolean(); + + /** Rollback. */ + private static AtomicBoolean rollback = new AtomicBoolean(); + + /** Suspend. */ + private static AtomicBoolean suspend = new AtomicBoolean(); + + /** Resume. */ + private static AtomicBoolean resume = new AtomicBoolean(); + + /** + * + */ + public void testLocal() throws Exception { + test(true); + } + + /** + * + */ + public void testRemote() throws Exception { + test(false); + } + + /** + * + */ + private void test(boolean loc) throws Exception { + Ignite ignite = startGrids(5); + + final IgniteEvents evts = loc ? ignite.events() : grid(3).events(); + + if (loc) + evts.localListen((IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + checkEvent((TransactionStateChangedEvent)e); + + return true; + }, EVTS_TX); + else + evts.remoteListen(null, + (IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + checkEvent((TransactionStateChangedEvent)e); + + return false; + }, + EVTS_TX); + + IgniteCache cache = ignite.getOrCreateCache(defaultCacheConfiguration().setBackups(2)); + + // create & commit + try (Transaction tx = ignite.transactions().withLabel(lb).txStart( + TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE, timeout, 3)) { + cache.put(1, 1); + + tx.commit(); + } + + assertTrue( + creation.get() && + commit.get() && + !rollback.get() && + !suspend.get() && + !resume.get()); + + clear(); + + // create & suspend & resume & commit + try (Transaction tx = ignite.transactions().withLabel(lb).txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE, timeout, 3)) { + cache.put(2, 7); + + tx.suspend(); + + U.sleep(100); + + tx.resume(); + + tx.commit(); + } + + assertTrue( + creation.get() && + commit.get() && + !rollback.get() && + suspend.get() && + resume.get()); + + clear(); + + // create & rollback (pessimistic) + try (Transaction tx = ignite.transactions().withLabel(lb).txStart( + TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE, timeout, 3)) { + cache.put(4, 5); + } + + assertTrue( + creation.get() && + !commit.get() && + rollback.get() && + !suspend.get() && + !resume.get()); + } + + /** + * + */ + private void clear() { + creation.set(false); + commit.set(false); + rollback.set(false); + suspend.set(false); + resume.set(false); + } + + /** + * @param evt Event. + */ + private void checkEvent(TransactionStateChangedEvent evt) { + Transaction tx = evt.tx(); + + assertEquals(timeout, tx.timeout()); + assertEquals(lb, tx.label()); + + switch (evt.type()) { + case EVT_TX_STARTED: { + assertEquals(tx.state(), TransactionState.ACTIVE); + + assertFalse(creation.getAndSet(true)); + + break; + } + + case EVT_TX_COMMITTED: { + assertEquals(tx.state(), TransactionState.COMMITTED); + + assertFalse(commit.getAndSet(true)); + + break; + } + + case EVT_TX_ROLLED_BACK: { + assertEquals(tx.state(), TransactionState.ROLLED_BACK); + + assertFalse(rollback.getAndSet(true)); + + break; + } + + case EVT_TX_SUSPENDED: { + assertEquals(tx.state(), TransactionState.SUSPENDED); + + assertFalse(suspend.getAndSet(true)); + + break; + } + + case EVT_TX_RESUMED: { + assertEquals(tx.state(), TransactionState.ACTIVE); + + assertFalse(resume.getAndSet(true)); + + break; + } + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + clear(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 7a5aa80..4af9e1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -44,6 +44,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTime import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest; +import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest; +import org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest; import org.apache.ignite.testframework.junits.GridAbstractTest; /** @@ -79,6 +81,8 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(TxOptimisticPrepareOnUnstableTopologyTest.class); suite.addTestSuite(TxLabelTest.class); + suite.addTestSuite(TxRollbackOnIncorrectParamsTest.class); + suite.addTestSuite(TxStateChangeEventTest.class); suite.addTestSuite(TxMultiCacheAsyncOpsTest.class);
