Repository: ignite Updated Branches: refs/heads/ignite-1192 e6be4206c -> 9017ab483
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 35d0776..77272e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -17,6 +17,25 @@ package org.apache.ignite.internal.processors.igfs; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; @@ -47,7 +66,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCallable; import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable; import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable; @@ -76,26 +95,6 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorResult; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; - import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; @@ -604,7 +603,7 @@ public class IgfsMetaManager extends IgfsManager { assert fileId != null; - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); @@ -1005,7 +1004,7 @@ public class IgfsMetaManager extends IgfsManager { srcPathIds.addExistingIds(lockIds, relaxed); dstPathIds.addExistingIds(lockIds, relaxed); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Obtain the locks. final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); @@ -1145,7 +1144,7 @@ public class IgfsMetaManager extends IgfsManager { IgniteUuid trashId = IgfsUtils.randomTrashId(); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // NB: We may lock root because its id is less than any other id: final IgfsEntryInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID); @@ -1268,7 +1267,7 @@ public class IgfsMetaManager extends IgfsManager { allIds.add(trashId); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Lock participants. Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds); @@ -1345,7 +1344,7 @@ public class IgfsMetaManager extends IgfsManager { assert listing != null; validTxState(false); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Collection<IgniteUuid> res = new HashSet<>(); // Obtain all necessary locks in one hop. @@ -1431,7 +1430,7 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> infos = lockIds(parentId, id); IgfsEntryInfo victim = infos.get(id); @@ -1517,7 +1516,7 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { IgfsEntryInfo info = updatePropertiesNonTx(fileId, props); tx.commit(); @@ -1551,7 +1550,7 @@ public class IgfsMetaManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Reserve file space: " + fileId); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); @@ -1596,7 +1595,7 @@ public class IgfsMetaManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']'); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); @@ -1658,7 +1657,7 @@ public class IgfsMetaManager extends IgfsManager { pathIds.addSurrogateIds(lockIds); // Start TX. - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (!pathIds.verifyIntegrity(lockInfos, relaxed)) @@ -1709,7 +1708,7 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling); tx.commit(); @@ -2602,7 +2601,7 @@ public class IgfsMetaManager extends IgfsManager { pathIds.add(idsForPath(path)); // Start pessimistic. - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { // Lock the very first existing parents and possibly the leaf as well. Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>(); @@ -2788,7 +2787,7 @@ public class IgfsMetaManager extends IgfsManager { * * @return Transaction. */ - private IgniteInternalTx startTx() { + private GridNearTxLocal startTx() { return metaCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); } @@ -2817,7 +2816,7 @@ public class IgfsMetaManager extends IgfsManager { pathIds.addExistingIds(lockIds, relaxed); // Start TX. - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos)) @@ -2927,7 +2926,7 @@ public class IgfsMetaManager extends IgfsManager { pathIds.addSurrogateIds(lockIds); // Start TX. - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (!pathIds.verifyIntegrity(lockInfos, relaxed)) @@ -3034,7 +3033,7 @@ public class IgfsMetaManager extends IgfsManager { } // Start TX. - try (IgniteInternalTx tx = startTx()) { + try (GridNearTxLocal tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (secondaryCtx != null && isRetryForSecondary(pathIds, lockInfos)) http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 41dbdd0..a680a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -66,15 +66,16 @@ import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage; import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.SerializableTransient; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -92,7 +93,6 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.internal.util.SerializableTransient; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.JobContextResource; import org.apache.ignite.resources.LoggerResource; @@ -945,7 +945,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { else nodes = null; - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName()); GridServiceAssignments oldAssigns = (GridServiceAssignments)cache.get(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 53e6add..c4d8a79 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { blockedMsgs.add(new T2<>(node, ioMsg)); + notifyAll(); + return; } } @@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param cls Message class. + * @param nodeName Node name. + * @throws InterruptedException If interrupted. + */ + public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException { + synchronized (this) { + while (!hasMessage(cls, nodeName)) + wait(); + } + } + + /** + * @param cls Message class. + * @param nodeName Node name. + * @return {@code True} if has blocked message. + */ + private boolean hasMessage(Class<?> cls, String nodeName) { + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + if (msg.get2().message().getClass() == cls && + nodeName.equals(msg.get1().attribute(ATTR_IGNITE_INSTANCE_NAME))) + return true; + } + + return false; + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java index 3622964..eb30927 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java @@ -338,8 +338,6 @@ public final class GridCacheTestStore implements CacheStore<Integer, String> { txs.add(tx); - assertTrue("Unexpected tx class: " + tx.getClass(), tx instanceof TransactionProxy); - IgniteInternalTx tx0 = GridTestUtils.getFieldValue(tx, "tx"); if (!tx0.local()) http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 6e8c2a1..4fd4989 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -33,8 +33,8 @@ import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -210,7 +210,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception { - try (final IgniteInternalTx tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) { + try (final GridNearTxLocal tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) { assert tx != null; sleepForTxFailure(); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java index 17570aa..f821a45 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java @@ -18,21 +18,18 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.Map; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -84,7 +81,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf utilityCache.getAndPutIfAbsent("2", "2"); - try (IgniteInternalTx itx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal itx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { assertEquals(null, utilityCache.get("1")); assertEquals("2", utilityCache.get("2")); assertEquals(null, utilityCache.get("3")); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java index b0aa67a..9ac9e31 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java @@ -105,6 +105,11 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 15 * 60_000; + } + + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index 702a883..91e3b26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; @@ -169,14 +169,14 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart(); - IgniteInternalTx txEx = tx.tx(); + GridNearTxLocal txEx = tx.tx(); assertTrue(txEx.optimistic()); cache.putAll(map); try { - txEx.prepareAsync().get(3, TimeUnit.SECONDS); + txEx.prepareNearTxLocal().get(3, TimeUnit.SECONDS); } catch (IgniteFutureTimeoutCheckedException ignored) { info("Failed to wait for prepare future completion: " + partial); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java index 3c1ae8e..4997b20 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -349,7 +349,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest TransactionProxyImpl txProxy = (TransactionProxyImpl)tx; - IgniteInternalTx txEx = txProxy.tx(); + GridNearTxLocal txEx = txProxy.tx(); assertTrue(txEx.pessimistic()); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index a0a5627..7ca3914 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; @@ -210,11 +211,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends log.info("Start prepare."); - IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx(); + GridNearTxLocal txEx = ((TransactionProxyImpl)tx).tx(); commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture<?> prepFut = txEx.prepareAsync(); + IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal(); waitPrepared(ignite(1)); @@ -371,11 +372,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends log.info("Start prepare."); - IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx(); + GridNearTxLocal txEx = ((TransactionProxyImpl)tx).tx(); commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture<?> prepFut = txEx.prepareAsync(); + IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal(); waitPrepared(ignite(1)); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java new file mode 100644 index 0000000..cfe9029 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static ConcurrentHashMap<Object, Object> storeMap = new ConcurrentHashMap<>(); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setConsistentId(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try { + for (Ignite node : G.allGrids()) { + Collection<IgniteInternalTx> txs = ((IgniteKernal)node).context().cache().context().tm().txs(); + + assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + } + } + finally { + stopAllGrids(); + + storeMap.clear(); + + super.afterTest(); + } + } + + /** + * @throws Exception If failed. + */ + public void testNearTx1Implicit() throws Exception { + nearTx1(null); + } + + /** + * @throws Exception If failed. + */ + public void testNearTx1Optimistic() throws Exception { + nearTx1(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testNearTx1Pessimistic() throws Exception { + nearTx1(PESSIMISTIC); + } + + /** + * Stop tx near node (client2), near cache tx on client1 is either committed + * by primary or invalidated. + * + * @param concurrency Tx concurrency or {@code null} for implicit transaction. + * @throws Exception If failed. + */ + private void nearTx1(final TransactionConcurrency concurrency) throws Exception { + startGrids(4); + + Ignite srv0 = grid(0); + + final IgniteCache<Integer, Integer> srvCache = srv0.createCache(cacheConfiguration(2, false, false)); + + awaitPartitionMapExchange(); + + client = true; + + Ignite client1 = startGrid(4); + final Ignite client2 = startGrid(5); + + final Integer key = primaryKey(srv0.cache(null)); + + final IgniteCache<Integer, Integer> cache1 = + client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>()); + + final IgniteCache<Integer, Integer> cache2 = + client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>()); + + cache1.put(key, 1); + + final Integer newVal = 2; + + testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Start put, concurrency: " + concurrency); + + if (concurrency != null) { + try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) { + cache2.put(key, newVal); + + tx.commit(); + } + } + else + cache2.put(key, newVal); + + return null; + } + }); + + U.sleep(500); + + assertFalse(fut.isDone()); + + testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name()); + + stopGrid(client2.name()); + + try { + fut.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key)); + } + }, 5000); + + checkData(F.asMap(key, newVal)); + } + + /** + * @throws Exception If failed. + */ + public void testNearTx2Implicit() throws Exception { + nearTx2(null); + } + + /** + * @throws Exception If failed. + */ + public void testNearTx2Optimistic() throws Exception { + nearTx2(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testNearTx2Pessimistic() throws Exception { + nearTx2(PESSIMISTIC); + } + + /** + * Stop both tx near node (client2) and primary node, near cache tx on client1 is invalidated. + * + * @param concurrency Tx concurrency or {@code null} for implicit transaction. + * @throws Exception If failed. + */ + private void nearTx2(final TransactionConcurrency concurrency) throws Exception { + startGrids(4); + + Ignite srv0 = grid(0); + + srv0.createCache(cacheConfiguration(2, false, false)); + + awaitPartitionMapExchange(); + + client = true; + + Ignite client1 = startGrid(4); + final Ignite client2 = startGrid(5); + + final Integer key = primaryKey(srv0.cache(null)); + + final IgniteCache<Integer, Integer> cache1 = + client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>()); + + final IgniteCache<Integer, Integer> cache2 = + client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>()); + + cache1.put(key, 1); + + final Integer newVal = 2; + + testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name()); + + testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtTxFinishRequest; + } + }); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Start put, concurrency: " + concurrency); + + if (concurrency != null) { + try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) { + cache2.put(key, newVal); + + tx.commit(); + } + } + else + cache2.put(key, newVal); + + return null; + } + }); + + U.sleep(500); + + assertFalse(fut.isDone()); + + testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name()); + + stopGrid(client2.name()); + stopGrid(srv0.name()); + + try { + fut.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + final IgniteCache<Integer, Integer> srvCache = grid(1).cache(null); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key)); + } + }, 5000); + + checkData(F.asMap(key, newVal)); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStoreImplicit() throws Exception { + txWithStore(null, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStoreOptimistic() throws Exception { + txWithStore(OPTIMISTIC, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStorePessimistic() throws Exception { + txWithStore(PESSIMISTIC, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStoreNoWriteThroughImplicit() throws Exception { + txWithStore(null, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStoreNoWriteThroughOptimistic() throws Exception { + txWithStore(OPTIMISTIC, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStoreNoWriteThroughPessimistic() throws Exception { + txWithStore(PESSIMISTIC, false); + } + + /** + * @param concurrency Tx concurrency or {@code null} for implicit transaction. + * @param writeThrough Store write through flag. + * @throws Exception If failed. + */ + private void txWithStore(final TransactionConcurrency concurrency, boolean writeThrough) throws Exception { + startGrids(4); + + Ignite srv0 = grid(0); + + IgniteCache<Integer, Integer> srv0Cache = srv0.createCache(cacheConfiguration(1, true, writeThrough)); + + awaitPartitionMapExchange(); + + final Integer key = primaryKey(srv0Cache); + + srv0Cache.put(key, 1); + + client = true; + + Ignite client = startGrid(4); + + testSpi(srv0).blockMessages(GridNearTxPrepareResponse.class, client.name()); + + final IgniteCache<Integer, Integer> clientCache = client.cache(null); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Start put"); + + clientCache.put(key, 2); + + return null; + } + }); + + U.sleep(500); + + assertFalse(fut.isDone()); + + testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name()); + + stopGrid(client.name()); + + try { + fut.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + U.sleep(1000); + + if (writeThrough) + checkData(F.asMap(key, 1)); + else + checkData(F.asMap(key, 2)); + } + + /** + * @param node Node. + * @return Node communication SPI. + */ + private TestRecordingCommunicationSpi testSpi(Ignite node) { + return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi(); + } + + /** + * @param backups Number of backups. + * @param store Cache store flag. + * @param writeThrough Store write through flag. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups, boolean store, boolean writeThrough) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(backups); + ccfg.setRebalanceMode(ASYNC); + + if (store) { + ccfg.setWriteThrough(writeThrough); + + ccfg.setCacheStoreFactory(new TestStoreFactory()); + } + + return ccfg; + } + + /** + * @param expData Expected cache data. + */ + private void checkData(Map<Integer, Integer> expData) { + assert !expData.isEmpty(); + + List<Ignite> nodes = G.allGrids(); + + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(null); + + for (Map.Entry<Integer, Integer> e : expData.entrySet()) { + assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']', + e.getValue(), + cache.get(e.getKey())); + } + } + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return storeMap.get(key); + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + log.info("Store write [key=" + entry.getKey() + ", val=" + entry.getValue() + ']'); + + storeMap.put(entry.getKey(), entry.getValue()); + } + + @Override public void delete(Object key) throws CacheWriterException { + log.info("Store delete [key=" + key + ']'); + + storeMap.remove(key); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java index 1517672..3e56b00 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -153,7 +152,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * Check whether caches has no transactions after salvage timeout. * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). - * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}). + * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}). * @throws Exception If failed. */ private void checkSalvageAfterTimeout(TransactionConcurrency mode, boolean prepare) throws Exception { @@ -172,7 +171,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). * @param prepare Whether to prepare transaction state - * (i.e. call {@link IgniteInternalTx#prepare()}). + * (i.e. call {@link GridNearTxLocal#prepare()}). * @throws Exception If failed. */ private void checkSalvageBeforeTimeout(TransactionConcurrency mode, boolean prepare) throws Exception { @@ -198,7 +197,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * Start new transaction on the grid(0) and put some keys to it. * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). - * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}). + * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}). * @throws Exception If failed. */ private void startTxAndPutKeys(final TransactionConcurrency mode, final boolean prepare) throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java index 005e4a5..212675b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -111,9 +112,6 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { cfg.setClientMode(client); - // Test spi blocks message send, this can cause hang with striped pool. - cfg.setStripedPoolSize(-1); - return cfg; } @@ -555,15 +553,25 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { GridCacheVersion txId = req.version(); - if (TX_IDS.contains(txId)) { - while (TX_IDS.size() < TX_CNT) { - try { - U.sleep(50); - } - catch (IgniteInterruptedCheckedException e) { - e.printStackTrace(); + if (TX_IDS.contains(txId) && TX_IDS.size() < TX_CNT) { + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (TX_IDS.size() < TX_CNT) { + try { + U.sleep(50); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + + TestCommunicationSpi.super.sendMessage(node, msg, ackC); + + return null; } - } + }); + + return; } } else if (msg0 instanceof GridNearTxPrepareResponse) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java index fba78c8..16596ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java @@ -34,7 +34,7 @@ import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.lang.GridAbsPredicateX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -118,7 +118,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst @Override public Void call() throws Exception { GridCacheAdapter dataCache = dataCache(attacker); - try (IgniteInternalTx tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { dataCache.put(DATA_KEY, 0); txStartLatch.countDown(); @@ -257,7 +257,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); - cfg.setStripedPoolSize(0); + cfg.setStripedPoolSize(2); cfg.setSystemThreadPoolSize(2); cfg.setRebalanceThreadPoolSize(1); cfg.setPublicThreadPoolSize(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index 0513786..1901283 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -44,7 +44,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCachePutAllRestartTest.class); suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); - suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); + // TODO IGNITE-4768. + //suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); suite.addTestSuite(IgniteCacheGetRestartTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java index 7363c7c..7bd7797 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCo import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTxRecoveryRollbackTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxOriginatingNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest; @@ -54,6 +55,8 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest.class); suite.addTestSuite(GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest.class); + suite.addTestSuite(IgniteCacheTxRecoveryRollbackTest.class); + return suite; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java index bbb1d4e..1e2aded 100644 --- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateReadWriteAccessStrategy.java @@ -22,8 +22,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.GridLeanSet; -import org.apache.ignite.transactions.Transaction; import org.hibernate.cache.CacheException; import org.hibernate.cache.spi.access.AccessType; import org.hibernate.cache.spi.access.SoftLock; @@ -250,15 +250,15 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda if (ctx.unlocked(key)) { // Finish transaction if last key is unlocked. txCtx.remove(); - Transaction tx = cache.tx(); + GridNearTxLocal tx = cache.tx(); assert tx != null; try { - tx.commit(); + tx.proxy().commit(); } finally { - tx.close(); + tx.proxy().close(); } assert cache.tx() == null; @@ -275,10 +275,10 @@ public class HibernateReadWriteAccessStrategy extends HibernateAccessStrategyAda if (ctx != null) { txCtx.remove(); - Transaction tx = cache.tx(); + GridNearTxLocal tx = cache.tx(); if (tx != null) - tx.rollback(); + tx.proxy().rollback(); } } catch (IgniteException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index f581ebb..5047491 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.jta.CacheTmLookup; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; @@ -151,7 +152,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter { Transaction jtaTx = jtaTm.getTransaction(); if (jtaTx != null) { - IgniteInternalTx tx = cctx.tm().userTx(); + GridNearTxLocal tx = cctx.tm().userTx(); if (tx == null) { TransactionConfiguration tCfg = cctx.kernalContext().config() http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java index f43981e..649f7c4 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java @@ -27,7 +27,7 @@ import javax.transaction.xa.Xid; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionState; @@ -51,7 +51,7 @@ final class CacheJtaResource implements XAResource, Synchronization { private static final Xid[] NO_XID = new Xid[] {}; /** Cache transaction. */ - private IgniteInternalTx cacheTx; + private GridNearTxLocal cacheTx; /** */ private Xid xid; @@ -60,7 +60,7 @@ final class CacheJtaResource implements XAResource, Synchronization { * @param cacheTx Cache jta. * @param ctx Kernal context. */ - public CacheJtaResource(IgniteInternalTx cacheTx, GridKernalContext ctx) { + CacheJtaResource(GridNearTxLocal cacheTx, GridKernalContext ctx) { assert cacheTx != null; assert ctx != null; @@ -291,7 +291,7 @@ final class CacheJtaResource implements XAResource, Synchronization { * * @return {@code true} if jta was already committed or rolled back. */ - public boolean isFinished() { + boolean isFinished() { TransactionState state = cacheTx.state(); return state == COMMITTED || state == ROLLED_BACK;