Repository: ignite
Updated Branches:
  refs/heads/ignite-2.5 44401882d -> 111592e2c


IGNITE-8435 StorageException is handled like NodeStoppingException during 
failing transaction commit

Signed-off-by: Andrey Gura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/111592e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/111592e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/111592e2

Branch: refs/heads/ignite-2.5
Commit: 111592e2ccd3af466433f78626c1d06d4b5ece4c
Parents: 4440188
Author: Anton Kalashnikov <[email protected]>
Authored: Tue May 8 16:35:05 2018 +0300
Committer: Andrey Gura <[email protected]>
Committed: Tue May 8 17:02:38 2018 +0300

----------------------------------------------------------------------
 .../internal/InvalidEnvironmentException.java   |  25 ++
 .../ignite/internal/NodeStoppingException.java  |   2 +-
 .../internal/pagemem/wal/StorageException.java  |   3 +-
 .../GridDistributedTxRemoteAdapter.java         |   8 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   8 +-
 .../transactions/IgniteTxLocalAdapter.java      |   8 +-
 .../apache/ignite/internal/util/typedef/X.java  |   4 +-
 .../failure/AccountTransferTransactionTest.java | 331 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 9 files changed, 375 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java
new file mode 100644
index 0000000..d45a443
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/InvalidEnvironmentException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Marker interface of invalid environment exception.
+ */
+public interface InvalidEnvironmentException {
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
index 75447a1..cc39b14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
@@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
 /**
  *
  */
-public class NodeStoppingException extends IgniteCheckedException {
+public class NodeStoppingException extends IgniteCheckedException implements 
InvalidEnvironmentException {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java
index 2da08b9..debc391 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.pagemem.wal;
 
 import java.io.IOException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Exception is needed to distinguish WAL manager & page store critical I/O 
errors.
  */
-public class StorageException extends IgniteCheckedException {
+public class StorageException extends IgniteCheckedException implements 
InvalidEnvironmentException {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index a692b2e..5e3111c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -28,8 +28,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -734,21 +734,21 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                     }
                                 }
                                 catch (Throwable ex) {
-                                    boolean nodeStopping = X.hasCause(ex, 
NodeStoppingException.class);
+                                    boolean hasIOIssue = X.hasCause(ex, 
InvalidEnvironmentException.class);
 
                                     // In case of error, we still make the 
best effort to commit,
                                     // as there is no way to rollback at this 
point.
                                     err = new 
IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
                                         "(all transaction entries will be 
invalidated): " + CU.txString(this), ex);
 
-                                    if (nodeStopping) {
+                                    if (hasIOIssue) {
                                         U.warn(log, "Failed to commit 
transaction, node is stopping [tx=" + this +
                                             ", err=" + ex + ']');
                                     }
                                     else
                                         U.error(log, "Commit failed.", err);
 
-                                    uncommit(nodeStopping);
+                                    uncommit(hasIOIssue);
 
                                     state(UNKNOWN);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 6380710..0ed8419 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -26,10 +26,10 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -168,7 +168,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         if (ERR_UPD.compareAndSet(this, null, e)) {
             tx.setRollbackOnly();
 
-            if (X.hasCause(e, NodeStoppingException.class))
+            if (X.hasCause(e, InvalidEnvironmentException.class))
                 onComplete();
             else
                 finish(false);
@@ -225,9 +225,9 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
             if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
                 try {
-                    boolean nodeStop = err != null && X.hasCause(err, 
NodeStoppingException.class);
+                    boolean hasInvalidEnvironmentIssue = X.hasCause(err, 
InvalidEnvironmentException.class);
 
-                    this.tx.tmFinish(err == null, nodeStop, false);
+                    this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, 
false);
                 }
                 catch (IgniteCheckedException finishErr) {
                     U.error(log, "Failed to finish tx: " + tx, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 61650cc..6f11a57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -30,8 +30,8 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -846,13 +846,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                             throw ex;
                         }
                         else {
-                            boolean nodeStopping = X.hasCause(ex, 
NodeStoppingException.class);
+                            boolean hasInvalidEnvironmentIssue = 
X.hasCause(ex, InvalidEnvironmentException.class);
 
                             IgniteCheckedException err = new 
IgniteTxHeuristicCheckedException("Failed to locally write to cache " +
                                 "(all transaction entries will be invalidated, 
however there was a window when " +
                                 "entries for this transaction were visible to 
others): " + this, ex);
 
-                            if (nodeStopping) {
+                            if (hasInvalidEnvironmentIssue) {
                                 U.warn(log, "Failed to commit transaction, 
node is stopping " +
                                     "[tx=" + this + ", err=" + ex + ']');
                             }
@@ -865,7 +865,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
 
                             try {
                                 // Courtesy to minimize damage.
-                                uncommit(nodeStopping);
+                                uncommit(hasInvalidEnvironmentIssue);
                             }
                             catch (Throwable ex1) {
                                 U.error(log, "Failed to uncommit transaction: 
" + this, ex1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
index 1a43daa..49732b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
@@ -433,14 +433,14 @@ public final class X {
      *      {@code false} otherwise.
      */
     @SafeVarargs
-    public static boolean hasCause(@Nullable Throwable t, @Nullable Class<? 
extends Throwable>... cls) {
+    public static boolean hasCause(@Nullable Throwable t, @Nullable 
Class<?>... cls) {
         if (t == null || F.isEmpty(cls))
             return false;
 
         assert cls != null;
 
         for (Throwable th = t; th != null; th = th.getCause()) {
-            for (Class<? extends Throwable> c : cls) {
+            for (Class<?> c : cls) {
                 if (c.isAssignableFrom(th.getClass()))
                     return true;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
new file mode 100644
index 0000000..8d7cf15
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.failure;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
+import org.apache.ignite.mxbean.WorkersControlMXBean;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test transfer amount between accounts with enabled {@link 
StopNodeFailureHandler}.
+ */
+public class AccountTransferTransactionTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+    /** Count of accounts in one thread. */
+    private static final int ACCOUNTS_CNT = 20;
+    /** Count of threads and caches. */
+    private static final int THREADS_CNT = 20;
+    /** Count of nodes to start. */
+    private static final int NODES_CNT = 3;
+    /** Count of transaction on cache. */
+    private static final int TRANSACTION_CNT = 10;
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String 
igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(name);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(50 * 1024 * 1024)
+                .setPersistenceEnabled(true))
+        );
+
+        CacheConfiguration[] cacheConfigurations = new 
CacheConfiguration[THREADS_CNT];
+        for (int i = 0; i < THREADS_CNT; i++) {
+            cacheConfigurations[i] = new CacheConfiguration()
+                .setName(cacheName(i))
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+                .setBackups(1)
+                .setAtomicityMode(TRANSACTIONAL)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setWriteSynchronizationMode(FULL_SYNC)
+                .setEvictionPolicy(new FifoEvictionPolicy(1000))
+                .setOnheapCacheEnabled(true);
+        }
+
+        cfg.setCacheConfiguration(cacheConfigurations);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Test transfer amount.
+     */
+    public void testTransferAmount() throws Exception {
+        //given: started some nodes with client.
+        startGrids(NODES_CNT);
+
+        IgniteEx igniteClient = startGrid(getClientConfiguration(NODES_CNT));
+
+        igniteClient.cluster().active(true);
+
+        Random random = new Random();
+
+        long[] initAmount = new long[THREADS_CNT];
+
+        //and: fill all accounts on all caches and calculate total amount for 
every cache.
+        for (int cachePrefixIdx = 0; cachePrefixIdx < THREADS_CNT; 
cachePrefixIdx++) {
+            IgniteCache<Object, Object> cache = 
igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
+
+            try (Transaction tx = 
igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                for (int accountId = 0; accountId < ACCOUNTS_CNT; accountId++) 
{
+                    Long amount = (long)random.nextInt(1000);
+
+                    cache.put(accountId, amount);
+
+                    initAmount[cachePrefixIdx] += amount;
+                }
+
+                tx.commit();
+            }
+        }
+
+        //when: start transfer amount from account to account in different 
threads.
+        CountDownLatch firstTransactionDone = new CountDownLatch(THREADS_CNT);
+
+        ArrayList<Thread> transferThreads = new ArrayList<>();
+
+        for (int i = 0; i < THREADS_CNT; i++) {
+            transferThreads.add(new 
TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i)));
+
+            transferThreads.get(i).start();
+        }
+
+        firstTransactionDone.await(10, TimeUnit.SECONDS);
+
+        //and: terminate disco-event-worker thread on one node.
+        WorkersControlMXBean bean = workersMXBean(1);
+
+        bean.terminateWorker(
+            bean.getWorkerNames().stream()
+                .filter(name -> name.startsWith("disco-event-worker"))
+                .findFirst()
+                .orElse(null)
+        );
+
+        for (Thread thread : transferThreads) {
+            thread.join();
+        }
+
+        long[] resultAmount = new long[THREADS_CNT];
+
+        //then: calculate total amount for every thread.
+        for (int j = 0; j < THREADS_CNT; j++) {
+            String cacheName = cacheName(j);
+
+            IgniteCache<Object, Object> cache = 
igniteClient.getOrCreateCache(cacheName);
+
+            try (Transaction tx = 
igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+
+                for (int i = 0; i < ACCOUNTS_CNT; i++)
+                    resultAmount[j] += getNotNullValue(cache, i);
+                tx.commit();
+            }
+
+            long diffAmount = initAmount[j] - resultAmount[j];
+
+            //and: check that result amount equal to init amount.
+            assertTrue(
+                String.format("Total amount before and after transfer is not 
same: diff=%s, cache=%s",
+                    diffAmount, cacheName),
+                diffAmount == 0
+            );
+        }
+    }
+
+    /**
+     * Make test cache name by prefix.
+     */
+    @NotNull private String cacheName(int cachePrefixIdx) {
+        return "cache" + cachePrefixIdx;
+    }
+
+    /**
+     * Ignite configuration for client.
+     */
+    @NotNull private IgniteConfiguration getClientConfiguration(int 
nodesPrefix) throws Exception {
+        IgniteConfiguration clientConf = 
getConfiguration(getTestIgniteInstanceName(nodesPrefix));
+
+        clientConf.setClientMode(true);
+
+        return clientConf;
+    }
+
+    /**
+     * Extract not null value from cache.
+     */
+    private long getNotNullValue(IgniteCache<Object, Object> cache, int i) {
+        Object value = cache.get(i);
+
+        return value == null ? 0 : ((Long)value);
+    }
+
+    /**
+     * Configure workers mx bean.
+     */
+    private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception 
{
+        ObjectName mbeanName = U.makeMBeanName(
+            getTestIgniteInstanceName(igniteInt),
+            "Kernal",
+            WorkersControlMXBeanImpl.class.getSimpleName()
+        );
+
+        MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+        if (!mbeanSrv.isRegistered(mbeanName))
+            fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+        return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, 
mbeanName, WorkersControlMXBean.class, true);
+    }
+
+    /**
+     *
+     */
+    private static class TransferAmountTxThread extends Thread {
+        /** */
+        private CountDownLatch firstTransactionLatch;
+        /** */
+        private Ignite ignite;
+        /** */
+        private String cacheName;
+        /** */
+        private Random random = new Random();
+
+        /**
+         * @param ignite Ignite.
+         */
+        private TransferAmountTxThread(CountDownLatch firstTransactionLatch, 
final Ignite ignite, String cacheName) {
+            this.firstTransactionLatch = firstTransactionLatch;
+            this.ignite = ignite;
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            for (int i = 0; i < TRANSACTION_CNT; i++) {
+                try {
+                    updateInTransaction(ignite.cache(cacheName));
+                }
+                finally {
+                    if (i == 0)
+                        firstTransactionLatch.countDown();
+                }
+            }
+        }
+
+        /**
+         * @throws IgniteException if fails
+         */
+        @SuppressWarnings("unchecked")
+        private void updateInTransaction(IgniteCache cache) throws 
IgniteException {
+            int accIdFrom = random.nextInt(ACCOUNTS_CNT);
+            int accIdTo = random.nextInt(ACCOUNTS_CNT);
+
+            if (accIdFrom == accIdTo)
+                accIdTo = (int)getNextAccountId(accIdFrom);
+
+            Long acctFrom;
+            Long acctTo;
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                acctFrom = (Long)cache.get(accIdFrom);
+                acctTo = (Long)cache.get(accIdTo);
+
+                long transactionAmount = (long)(random.nextDouble() * 
acctFrom);
+
+                cache.put(accIdFrom, acctFrom - transactionAmount);
+                cache.put(accIdTo, acctTo + transactionAmount);
+
+                tx.commit();
+            }
+        }
+
+        /**
+         * @param curr current
+         * @return random value
+         */
+        private long getNextAccountId(long curr) {
+            long randomVal;
+
+            do {
+                randomVal = random.nextInt(ACCOUNTS_CNT);
+            }
+            while (curr == randomVal);
+
+            return randomVal;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/111592e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index e71a569..21d56b4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.Set;
 import junit.framework.TestSuite;
 import org.apache.ignite.GridSuppressedExceptionSelfTest;
+import org.apache.ignite.failure.AccountTransferTransactionTest;
 import org.apache.ignite.failure.FailureHandlerTriggeredTest;
 import org.apache.ignite.failure.IoomFailureHandlerTest;
 import org.apache.ignite.failure.OomFailureHandlerTest;
@@ -205,6 +206,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class);
         suite.addTestSuite(IoomFailureHandlerTest.class);
         suite.addTestSuite(OomFailureHandlerTest.class);
+        suite.addTestSuite(AccountTransferTransactionTest.class);
 
         return suite;
     }

Reply via email to