8446 Signed-off-by: Anton Vinogradov <a...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a43df35 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a43df35 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a43df35 Branch: refs/heads/ignite-8446 Commit: 3a43df35025182af67d1c6f47fb939bf68f5fd94 Parents: 9ab5a0f Author: Anton Vinogradov <a...@apache.org> Authored: Wed Jun 27 17:15:54 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Wed Jun 27 17:15:54 2018 +0300 ---------------------------------------------------------------------- .../transactions/TransactionEventProxyImpl.java | 2 +- .../TxRollbackOnIncorrectParamsTest.java | 99 +++++++++++++++++--- 2 files changed, 87 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a43df35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java index 55ec513..c9d5165 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java @@ -190,7 +190,7 @@ public class TransactionEventProxyImpl implements TransactionProxy, Externalizab */ private Transaction tx() throws IgniteException { if (tx == null) - throw new IgniteException("Operation allowed only inside remote listener or " + + throw new IgniteException("Operation allowed only inside remote filter or " + "inside local listener registered on originating node. " + "Only xid() operation allowed in other cases. "); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a43df35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java index 5587ff0..b71429d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java @@ -17,12 +17,17 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteEvents; +import org.apache.ignite.IgniteException; import org.apache.ignite.events.Event; import org.apache.ignite.events.TransactionStateChangedEvent; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -63,14 +68,14 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { try (Transaction tx = ignite.transactions().txStart( TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 200, 2)) { - cache.put(1,1); + cache.put(1, 1); tx.commit(); } try (Transaction tx = ignite.transactions().txStart( TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { - cache.put(1,2); + cache.put(1, 2); tx.commit(); @@ -81,7 +86,7 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { } try (Transaction tx = ignite.transactions().txStart()) { - cache.put(1,3); + cache.put(1, 3); tx.commit(); @@ -118,13 +123,13 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); try (Transaction tx = ignite.transactions().withLabel("test").txStart()) { - cache.put(1,1); + cache.put(1, 1); tx.commit(); } try (Transaction tx = ignite.transactions().txStart()) { - cache.put(1,2); + cache.put(1, 2); tx.commit(); @@ -164,19 +169,19 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); try (Transaction tx = ignite.transactions().withLabel("test").txStart()) { - cache.put(1,1); + cache.put(1, 1); tx.commit(); } try (Transaction tx = remote.transactions().withLabel("test").txStart()) { - cache.put(1,2); + cache.put(1, 2); tx.commit(); } try (Transaction tx = ignite.transactions().txStart()) { - cache.put(1,3); + cache.put(1, 3); tx.commit(); @@ -187,7 +192,7 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { } try (Transaction tx = remote.transactions().txStart()) { - cache.put(1,4); + cache.put(1, 4); tx.commit(); @@ -228,20 +233,20 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { try (Transaction tx = ignite.transactions().txStart( TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { - cache.put(1,1); + cache.put(1, 1); tx.commit(); } try (Transaction tx = remote.transactions().txStart( TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ, 100, 2)) { - cache.put(1,2); + cache.put(1, 2); tx.commit(); } try (Transaction tx = ignite.transactions().txStart()) { - cache.put(1,3); + cache.put(1, 3); tx.commit(); @@ -252,7 +257,7 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { } try (Transaction tx = remote.transactions().txStart()) { - cache.put(1,4); + cache.put(1, 4); tx.commit(); @@ -263,6 +268,74 @@ public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest { } } + /** + * + */ + public void testRollbackInsideLocalListenerAfterRemoteFilter() throws Exception { + Ignite ignite = startGrid(0); + Ignite remote = startGrid(1); + + final IgniteEvents evts = ignite.events(); + + evts.enableLocal(EVTS_TX); + + AtomicBoolean rollbackFailed = new AtomicBoolean(); + AtomicBoolean alreadyRolledBack = new AtomicBoolean(); + + evts.remoteListen( + (IgniteBiPredicate<UUID, Event>)(uuid, e) -> { + assert e instanceof TransactionStateChangedEvent; + + TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e; + + Transaction tx = evt.tx(); + + try { + tx.rollback(); + } + catch (IgniteException ignored) { + alreadyRolledBack.set(rollbackFailed.getAndSet(true)); + } + + return true; + }, + (IgnitePredicate<Event>)e -> { + assert e instanceof TransactionStateChangedEvent; + + return true; + }, + EVT_TX_STARTED); + + IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + assertFalse(rollbackFailed.get()); + assertFalse(alreadyRolledBack.get()); + + try (Transaction tx = ignite.transactions().txStart()) { + cache.put(1, 1); + + tx.commit(); + + fail("Should fail prior this line."); + } + catch (TransactionRollbackException ignored) { + // No-op. + } + + assertFalse(rollbackFailed.get()); + assertFalse(alreadyRolledBack.get()); + + try (Transaction tx = remote.transactions().txStart()) { + cache.put(1, 2); + + tx.commit(); + } + + assertTrue(GridTestUtils.waitForCondition(rollbackFailed::get, 5_000)); + + assertFalse(alreadyRolledBack.get()); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest();