Repository: ignite
Updated Branches:
  refs/heads/master d9f2b2040 -> 5e9710abc


IGNITE-9411: MVCC: better handling of TX timeouts. This closes #4745.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e9710ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e9710ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e9710ab

Branch: refs/heads/master
Commit: 5e9710abc4caf031923e2371dbfb27a6e9b4b2ec
Parents: d9f2b20
Author: ipavlukhin <vololo...@gmail.com>
Authored: Mon Sep 24 09:02:17 2018 +0300
Committer: devozerov <ppoze...@gmail.com>
Committed: Mon Sep 24 09:02:17 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/mvcc/MvccUtils.java        |   3 +-
 .../query/h2/DmlStatementsProcessor.java        |  12 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  46 ++-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   4 +-
 ...cheMvccSelectForUpdateQueryAbstractTest.java |   8 +-
 .../cache/mvcc/CacheMvccSqlLockTimeoutTest.java | 353 +++++++++++++++++++
 .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java |  31 +-
 ...MvccSqlTxQueriesWithReducerAbstractTest.java |  38 +-
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |   2 +
 10 files changed, 465 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 0422459..c3f9a6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -685,8 +685,7 @@ public class MvccUtils {
      */
     private static GridNearTxLocal txStart(GridKernalContext ctx, @Nullable 
GridCacheContext cctx, long timeout) {
         if (timeout == 0) {
-            TransactionConfiguration tcfg = cctx != null ?
-                CU.transactionConfiguration(cctx, ctx.config()) : null;
+            TransactionConfiguration tcfg = CU.transactionConfiguration(cctx, 
ctx.config());
 
             if (tcfg != null)
                 timeout = tcfg.getDefaultTxTimeout();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 6ce43dd..31715f1 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -508,15 +508,9 @@ public class DmlStatementsProcessor {
             requestSnapshot(cctx, checkActive(tx));
 
             try (GridNearTxLocal toCommit = commit ? tx : null) {
-                long timeout;
-
-                if (implicit)
-                    timeout = tx.remainingTime();
-                else {
-                    long tm1 = tx.remainingTime(), tm2 = 
fieldsQry.getTimeout();
-
-                    timeout = tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : 
Math.max(tm1, tm2);
-                }
+                long timeout = implicit
+                    ? tx.remainingTime()
+                    : 
IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx);
 
                 if (cctx.isReplicated() || distributedPlan == null || 
((plan.mode() == UpdateMode.INSERT
                     || plan.mode() == UpdateMode.MERGE) && 
!plan.isLocalSubquery())) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 79c431f..96d864d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -84,6 +84,7 @@ import 
org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import 
org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
 import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -1062,7 +1063,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param filter Cache name and key filter.
      * @param enforceJoinOrder Enforce join order of tables in the query.
      * @param startTx Start transaction flag.
-     * @param timeout Query timeout in milliseconds.
+     * @param qryTimeout Query timeout in milliseconds.
      * @param cancel Query cancel.
      * @param mvccTracker Query tracker.
      * @return Query result.
@@ -1071,7 +1072,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     @SuppressWarnings("unchecked")
     GridQueryFieldsResult queryLocalSqlFields(final String schemaName, String 
qry,
         @Nullable final Collection<Object> params, final IndexingQueryFilter 
filter, boolean enforceJoinOrder,
-        boolean startTx, int timeout, final GridQueryCancel cancel,
+        boolean startTx, int qryTimeout, final GridQueryCancel cancel,
         MvccQueryTracker mvccTracker) throws IgniteCheckedException {
 
         GridNearTxLocal tx = null;
@@ -1099,7 +1100,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     fldsQry.setArgs(params.toArray());
 
                 fldsQry.setEnforceJoinOrder(enforceJoinOrder);
-                fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
+                fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
 
                 return dmlProc.updateSqlFieldsLocal(schemaName, conn, p, 
fldsQry, filter, cancel);
             }
@@ -1119,6 +1120,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
             GridNearTxSelectForUpdateFuture sfuFut = null;
 
+            int opTimeout = qryTimeout;
+
             if (mvccEnabled) {
                 if (mvccTracker == null)
                     mvccTracker = mvccTracker(stmt, startTx);
@@ -1126,11 +1129,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 if (mvccTracker != null) {
                     ctx.mvccSnapshot(mvccTracker.snapshot());
 
-                    if ((tx = checkActive(tx(this.ctx))) != null) {
-                        int tm1 = (int)tx.remainingTime(), tm2 = timeout;
+                    tx = checkActive(tx(this.ctx));
 
-                        timeout = tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : 
Math.max(tm1, tm2);
-                    }
+                    opTimeout = operationTimeout(opTimeout, tx);
                 }
 
                 if (forUpdate) {
@@ -1155,7 +1156,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                             throw new IgniteSQLException("Failed to lock 
topology for SELECT FOR UPDATE query.", e);
                         }
 
-                        sfuFut = new GridNearTxSelectForUpdateFuture(cctx, tx, 
timeout);
+                        sfuFut = new GridNearTxSelectForUpdateFuture(cctx, tx, 
opTimeout);
 
                         sfuFut.initLocal();
                     }
@@ -1182,7 +1183,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             GridNearTxSelectForUpdateFuture sfuFut0 = sfuFut;
             PreparedStatement stmt0 = stmt;
             String qry0 = qry;
-            int timeout0 = timeout;
+            int timeout0 = opTimeout;
 
             return new GridQueryFieldsResultAdapter(meta, null) {
                 @Override public GridCloseableIterator<List<?>> iterator() 
throws IgniteCheckedException {
@@ -1268,6 +1269,21 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
     }
 
+    /**
+     * @param qryTimeout Query timeout in milliseconds.
+     * @param tx Transaction.
+     * @return Timeout for operation in milliseconds based on query and tx 
timeouts.
+     */
+    public static int operationTimeout(int qryTimeout, IgniteTxAdapter tx) {
+        if (tx != null) {
+            int tm1 = (int)tx.remainingTime(), tm2 = qryTimeout;
+
+            return tm1 > 0 && tm2 > 0 ? Math.min(tm1, tm2) : Math.max(tm1, 
tm2);
+        }
+
+        return qryTimeout;
+    }
+
     /** {@inheritDoc} */
     @Override public long streamUpdateQuery(String schemaName, String qry,
         @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws 
IgniteCheckedException {
@@ -1744,7 +1760,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
      * @param startTx Start transaction flag.
-     * @param timeoutMillis Query timeout.
+     * @param qryTimeout Query timeout.
      * @param cancel Cancel object.
      * @param params Query parameters.
      * @param parts Partitions.
@@ -1758,7 +1774,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         final boolean keepCacheObj,
         final boolean enforceJoinOrder,
         boolean startTx,
-        final int timeoutMillis,
+        final int qryTimeout,
         final GridQueryCancel cancel,
         final Object[] params,
         final int[] parts,
@@ -1770,13 +1786,17 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             final MvccQueryTracker tracker = mvccTracker == null && 
qry.mvccEnabled() ?
                 
MvccUtils.mvccTracker(ctx.cache().context().cacheContext(qry.cacheIds().get(0)),
 startTx) : mvccTracker;
 
+            GridNearTxLocal tx = tx(ctx);
+
             if (qry.forUpdate())
-                qry.forUpdate(checkActive(tx(ctx)) != null);
+                qry.forUpdate(checkActive(tx) != null);
+
+            int opTimeout = operationTimeout(qryTimeout, tx);
 
             return new Iterable<List<?>>() {
                 @SuppressWarnings("NullableProblems")
                 @Override public Iterator<List<?>> iterator() {
-                    return rdcQryExec.query(schemaName, qry, keepCacheObj, 
enforceJoinOrder, timeoutMillis,
+                    return rdcQryExec.query(schemaName, qry, keepCacheObj, 
enforceJoinOrder, opTimeout,
                         cancel, params, parts, lazy, tracker);
                 }
             };

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9166604..ab60746 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -925,7 +925,9 @@ public class GridMapQueryExecutor {
 
                         h2.bindParameters(stmt, params0);
 
-                        rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, 
params0, timeout, qr.queryCancel(qryIdx));
+                        int opTimeout = 
IgniteH2Indexing.operationTimeout(timeout, tx);
+
+                        rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, 
params0, opTimeout, qr.queryCancel(qryIdx));
 
                         if (inTx) {
                             ResultSetEnlistFuture enlistFut = 
ResultSetEnlistFuture.future(
@@ -937,7 +939,7 @@ public class GridMapQueryExecutor {
                                 txDetails.miniId(),
                                 parts,
                                 tx,
-                                timeout,
+                                opTimeout,
                                 mainCctx,
                                 rs
                             );

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 96c88ff..6474d55 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -113,7 +113,9 @@ import org.jetbrains.annotations.Nullable;
 import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
 import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.*;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
index 739aaf8..5c81974 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
@@ -101,22 +101,20 @@ public abstract class 
CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
 
 
     /**
-     *
+     * @throws Exception If failed.
      */
     public void testSelectForUpdateLocal() throws Exception {
         doTestSelectForUpdateLocal("Person", false);
     }
 
     /**
-     *
      * @throws Exception If failed.
      */
-    public void testSelectForUpdateOutsideTx() throws Exception {
+    public void testSelectForUpdateOutsideTxDistributed() throws Exception {
         doTestSelectForUpdateDistributed("Person", true);
     }
 
     /**
-     *
      * @throws Exception If failed.
      */
     public void testSelectForUpdateOutsideTxLocal() throws Exception {
@@ -261,6 +259,8 @@ public abstract class 
CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
             checkLocks("Person", keys, true);
 
             tx.rollback();
+
+            checkLocks("Person", keys, false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java
new file mode 100644
index 0000000..eae79a5
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlLockTimeoutTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+public class CacheMvccSqlLockTimeoutTest extends CacheMvccAbstractTest {
+    /** */
+    private static final int TIMEOUT_MILLIS = 200;
+
+    /** */
+    private UnaryOperator<IgniteConfiguration> cfgCustomizer = 
UnaryOperator.identity();
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        throw new RuntimeException("Is not used in current test");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        return cfgCustomizer.apply(super.getConfiguration(gridName));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testLockTimeoutsForPartitionedCache() throws Exception {
+        checkLockTimeouts(partitionedCacheConfig());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testLockTimeoutsForReplicatedCache() throws Exception {
+        checkLockTimeouts(replicatedCacheConfig());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testLockTimeoutsAfterDefaultTxTimeoutForPartitionedCache() 
throws Exception {
+        checkLockTimeoutsAfterDefaultTxTimeout(partitionedCacheConfig());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testLockTimeoutsAfterDefaultTxTimeoutForReplicatedCache() 
throws Exception {
+        checkLockTimeoutsAfterDefaultTxTimeout(replicatedCacheConfig());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testConcurrentForPartitionedCache() throws Exception {
+        checkTimeoutsConcurrent(partitionedCacheConfig());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testConcurrentForReplicatedCache() throws Exception {
+        checkTimeoutsConcurrent(replicatedCacheConfig());
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> partitionedCacheConfig() {
+        return baseCacheConfig()
+            .setCacheMode(PARTITIONED)
+            .setBackups(1);
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> replicatedCacheConfig() {
+        return baseCacheConfig().setCacheMode(REPLICATED);
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> baseCacheConfig() {
+        return new CacheConfiguration<>("test")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Integer.class);
+    }
+
+    /** */
+    private void checkLockTimeouts(CacheConfiguration<?, ?> ccfg) throws 
Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteEx ignite = grid(0);
+
+        ignite.createCache(ccfg);
+
+        AtomicInteger keyCntr = new AtomicInteger();
+
+        int nearKey = keyForNode(ignite.affinity("test"), keyCntr, 
ignite.localNode());
+        int otherKey = keyForNode(ignite.affinity("test"), keyCntr, 
grid(1).localNode());
+
+        TimeoutChecker timeoutChecker = new TimeoutChecker(ignite, "test");
+
+        timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.EXPLICIT, 
nearKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.EXPLICIT, 
otherKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.IMPLICIT, 
nearKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.STMT, TxStartMode.IMPLICIT, 
otherKey);
+
+        // explicit tx timeout has no sense for implicit transaction
+        timeoutChecker.checkScenario(TimeoutMode.TX, TxStartMode.EXPLICIT, 
nearKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.TX, TxStartMode.EXPLICIT, 
otherKey);
+    }
+
+    /** */
+    private void checkLockTimeoutsAfterDefaultTxTimeout(CacheConfiguration<?, 
?> ccfg) throws Exception {
+        cfgCustomizer = cfg ->
+            cfg.setTransactionConfiguration(new 
TransactionConfiguration().setDefaultTxTimeout(TIMEOUT_MILLIS));
+
+        startGridsMultiThreaded(2);
+
+        IgniteEx ignite = grid(0);
+
+        ignite.createCache(ccfg);
+
+        AtomicInteger keyCntr = new AtomicInteger();
+
+        int nearKey = keyForNode(ignite.affinity("test"), keyCntr, 
ignite.localNode());
+        int otherKey = keyForNode(ignite.affinity("test"), keyCntr, 
grid(1).localNode());
+
+        TimeoutChecker timeoutChecker = new TimeoutChecker(ignite, "test");
+
+        timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, 
TxStartMode.EXPLICIT, nearKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, 
TxStartMode.EXPLICIT, otherKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, 
TxStartMode.IMPLICIT, nearKey);
+
+        timeoutChecker.checkScenario(TimeoutMode.TX_DEFAULT, 
TxStartMode.IMPLICIT, otherKey);
+    }
+
+    /** */
+    private static class TimeoutChecker {
+        /** */
+        final IgniteEx ignite;
+        /** */
+        final String cacheName;
+
+        /** */
+        TimeoutChecker(IgniteEx ignite, String cacheName) {
+            this.ignite = ignite;
+            this.cacheName = cacheName;
+        }
+
+        /** */
+        void checkScenario(TimeoutMode timeoutMode, TxStartMode txStartMode, 
int key) throws Exception {
+            // 999 is used as bound to enforce query execution with obtaining 
cursor before enlist
+            assert key <= 999;
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ, 60_000, 1)) {
+                ignite.cache(cacheName).query(new SqlFieldsQuery("merge into 
Integer(_key, _val) values(?, 1)")
+                    .setArgs(key));
+
+                tx.commit();
+            }
+
+            ensureTimeIsOut("insert into Integer(_key, _val) values(?, 42)", 
key, timeoutMode, txStartMode);
+            ensureTimeIsOut("merge into Integer(_key, _val) values(?, 42)", 
key, timeoutMode, txStartMode);
+            ensureTimeIsOut("update Integer set _val = 42 where _key = ?", 
key, timeoutMode, txStartMode);
+            ensureTimeIsOut("update Integer set _val = 42 where _key = ? or 
_key > 999", key, timeoutMode, txStartMode);
+            ensureTimeIsOut("delete from Integer where _key = ?", key, 
timeoutMode, txStartMode);
+            ensureTimeIsOut("delete from Integer where _key = ? or _key > 
999", key, timeoutMode, txStartMode);
+
+            // SELECT ... FOR UPDATE locking entries has no meaning for 
implicit transaction
+            if (txStartMode != TxStartMode.IMPLICIT) {
+                ensureTimeIsOut("select * from Integer where _key = ? for 
update", key, timeoutMode, txStartMode);
+                ensureTimeIsOut(
+                    "select * from Integer where _key = ? or _key > 999 for 
update", key, timeoutMode, txStartMode);
+            }
+        }
+
+        /** */
+        void ensureTimeIsOut(String sql, int key, TimeoutMode timeoutMode, 
TxStartMode txStartMode) throws Exception {
+            assert txStartMode == TxStartMode.EXPLICIT || timeoutMode != 
TimeoutMode.TX;
+
+            IgniteCache<?, ?> cache = ignite.cache(cacheName);
+
+            int oldVal = (Integer)cache
+                .query(new SqlFieldsQuery("select _val from Integer where _key 
= ?").setArgs(key))
+                .getAll().get(0).get(0);
+
+            try (Transaction tx1 = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ, 6_000, 1)) {
+                cache.query(new SqlFieldsQuery("update Integer set _val = 42 
where _key = ?").setArgs(key));
+
+                try {
+                    CompletableFuture.runAsync(() -> {
+                        SqlFieldsQuery qry = new 
SqlFieldsQuery(sql).setArgs(key);
+
+                        try (Transaction tx2 = txStartMode == 
TxStartMode.EXPLICIT ? startTx(timeoutMode): null) {
+                            if (timeoutMode == TimeoutMode.STMT)
+                                qry.setTimeout(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
+
+                            cache.query(qry).getAll();
+
+                            if (tx2 != null)
+                                tx2.commit();
+                        }
+                        finally {
+                            
ignite.context().cache().context().tm().resetContext();
+                        }
+                    }).get();
+
+                    fail("Timeout exception should be thrown");
+                }
+                catch (ExecutionException e) {
+                    assertTrue(msgContains(e, "Failed to acquire lock within 
provided timeout for transaction")
+                        || msgContains(e, "Failed to finish transaction 
because it has been rolled back"));
+                }
+
+                // assert that outer tx has not timed out
+                cache.query(new SqlFieldsQuery("update Integer set _val = 42 
where _key = ?").setArgs(key));
+
+                tx1.rollback();
+            }
+
+            int newVal = (Integer)cache
+                .query(new SqlFieldsQuery("select _val from Integer where _key 
= ?").setArgs(key))
+                .getAll().get(0).get(0);
+
+            assertEquals(oldVal, newVal);
+        }
+
+        /** */
+        private Transaction startTx(TimeoutMode timeoutMode) {
+            return timeoutMode == TimeoutMode.TX
+                ? ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 
TIMEOUT_MILLIS, 1)
+                : ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+        }
+    }
+
+    /** */
+    private static boolean msgContains(Throwable e, String str) {
+        return e.getMessage() != null && e.getMessage().contains(str);
+    }
+
+    /** */
+    private enum TimeoutMode {
+        /** */
+        TX,
+        /** */
+        TX_DEFAULT,
+        /** */
+        STMT
+    }
+
+    /** */
+    private enum TxStartMode {
+        /** */
+        EXPLICIT,
+        /** */
+        IMPLICIT
+    }
+
+    /** */
+    private void checkTimeoutsConcurrent(CacheConfiguration<?, ?> ccfg) throws 
Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteEx ignite = grid(0);
+
+        IgniteCache<?, ?> cache = ignite.createCache(ccfg);
+
+        AtomicInteger keyCntr = new AtomicInteger();
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++)
+            keys.add(keyForNode(grid(0).affinity("test"), keyCntr, 
ignite.localNode()));
+
+        for (int i = 0; i < 5; i++)
+            keys.add(keyForNode(grid(1).affinity("test"), keyCntr, 
ignite.localNode()));
+
+        CompletableFuture.allOf(
+            CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, 
keys)),
+            CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, 
keys)),
+            CompletableFuture.runAsync(() -> mergeInRandomOrder(ignite, cache, 
keys))
+        ).join();
+    }
+
+    /** */
+    private void mergeInRandomOrder(IgniteEx ignite, IgniteCache<?, ?> cache, 
List<Integer> keys) {
+        List<Integer> keys0 = new ArrayList<>(keys);
+
+        for (int i = 0; i < 100; i++) {
+            Collections.shuffle(keys0);
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("merge into 
Integer(_key, _val) values(?, ?)")
+                    .setTimeout(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+                int op = 0;
+
+                for (Integer key : keys0)
+                    cache.query(qry.setArgs(key, op++));
+
+                tx.commit();
+            }
+            catch (Exception e) {
+                assertTrue(msgContains(e, "Failed to acquire lock within 
provided timeout for transaction")
+                    || msgContains(e, "Mvcc version mismatch"));
+            }
+            finally {
+                ignite.context().cache().context().tm().resetContext();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index 2aad2d4..b881f02 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -649,7 +649,27 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest 
extends CacheMvccAbstrac
     /**
      * @throws Exception If failed.
      */
-    public void testQueryDeadlock() throws Exception {
+    public void testQueryDeadlockWithTxTimeout() throws Exception {
+        checkQueryDeadlock(TimeoutMode.TX);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryDeadlockWithStmtTimeout() throws Exception {
+        checkQueryDeadlock(TimeoutMode.STMT);
+    }
+
+    /** */
+    private enum TimeoutMode {
+        /** */
+        TX,
+        /** */
+        STMT
+    }
+
+    /** */
+    private void checkQueryDeadlock(TimeoutMode timeoutMode) throws Exception {
         ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 
DFLT_PARTITION_COUNT)
             .setIndexedTypes(Integer.class, Integer.class);
 
@@ -671,7 +691,8 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest 
extends CacheMvccAbstrac
 
                 try {
                     try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                        tx.timeout(TX_TIMEOUT);
+                        if (timeoutMode == TimeoutMode.TX)
+                            tx.timeout(TX_TIMEOUT);
 
                         IgniteCache<Object, Object> cache0 = 
node.cache(DEFAULT_CACHE_NAME);
 
@@ -680,6 +701,9 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest 
extends CacheMvccAbstrac
 
                         SqlFieldsQuery qry = new SqlFieldsQuery((id % 2) == 0 
? qry1 : qry2);
 
+                        if (timeoutMode == TimeoutMode.STMT)
+                            qry.setTimeout(TX_TIMEOUT, TimeUnit.MILLISECONDS);
+
                         try (FieldsQueryCursor<List<?>> cur = 
cache0.query(qry)) {
                             cur.getAll();
                         }
@@ -688,6 +712,9 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest 
extends CacheMvccAbstrac
 
                         qry = new SqlFieldsQuery((id % 2) == 0 ? qry2 : qry1);
 
+                        if (timeoutMode == TimeoutMode.STMT)
+                            qry.setTimeout(TX_TIMEOUT, TimeUnit.MILLISECONDS);
+
                         try (FieldsQueryCursor<List<?>> cur = 
cache0.query(qry)) {
                             cur.getAll();
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
index 69cf108..a7cf292 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -505,14 +506,33 @@ public abstract class 
CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache
     /**
      * @throws Exception If failed.
      */
-    public void testQueryReducerDeadlockInsert() throws Exception {
+    public void testQueryReducerDeadlockInsertWithTxTimeout() throws Exception 
{
+        checkQueryReducerDeadlockInsert(TimeoutMode.TX);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryReducerDeadlockInsertWithStmtTimeout() throws 
Exception {
+        checkQueryReducerDeadlockInsert(TimeoutMode.STMT);
+    }
+
+    /** */
+    private enum TimeoutMode {
+        /** */
+        TX,
+        /** */
+        STMT
+    }
+
+    /** */
+    public void checkQueryReducerDeadlockInsert(TimeoutMode timeoutMode) 
throws Exception {
         ccfgs = new CacheConfiguration[] {
             cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
                 .setName("int")
                 .setIndexedTypes(Integer.class, Integer.class),
             cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
-                .setIndexedTypes(Integer.class,
-                CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class),
+                .setIndexedTypes(Integer.class, 
CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class),
         };
 
         startGridsMultiThreaded(2);
@@ -544,7 +564,8 @@ public abstract class 
CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache
 
                 try {
                     try (Transaction tx = 
node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                        tx.timeout(TIMEOUT);
+                        if (timeoutMode == TimeoutMode.TX)
+                            tx.timeout(TIMEOUT);
 
                         String sqlText = "INSERT INTO MvccTestSqlIndexValue 
(_key, idxVal1) " +
                             "SELECT DISTINCT _key, _val FROM \"int\".Integer 
ORDER BY _key";
@@ -554,6 +575,9 @@ public abstract class 
CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache
 
                         SqlFieldsQuery qry = new SqlFieldsQuery((id % 2) == 0 
? sqlAsc : sqlDesc);
 
+                        if (timeoutMode == TimeoutMode.STMT)
+                            qry.setTimeout(TIMEOUT, TimeUnit.MILLISECONDS);
+
                         IgniteCache<Object, Object> cache0 = 
node.cache(DEFAULT_CACHE_NAME);
 
                         cache0.query(qry).getAll();
@@ -562,6 +586,9 @@ public abstract class 
CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache
 
                         qry = new SqlFieldsQuery((id % 2) == 0 ? sqlDesc : 
sqlAsc);
 
+                        if (timeoutMode == TimeoutMode.STMT)
+                            qry.setTimeout(TIMEOUT, TimeUnit.MILLISECONDS);
+
                         cache0.query(qry).getAll();
 
                         tx.commit();
@@ -577,8 +604,9 @@ public abstract class 
CacheMvccSqlTxQueriesWithReducerAbstractTest extends Cache
 
         assertNotNull(ex0);
 
-        if (!X.hasCause(ex0, IgniteTxTimeoutCheckedException.class))
+        assertThrowsWithCause(() -> {
             throw ex0;
+        }, IgniteTxTimeoutCheckedException.class);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e9710ab/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index ec60596..888b1ba 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithCo
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfigurationValidationTest;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
 import 
org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
 
@@ -57,6 +58,7 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
         suite.addTestSuite(CacheMvccDmlSimpleTest.class);
         
suite.addTestSuite(SqlTransactionsCommandsWithMvccEnabledSelfTest.class);
         suite.addTestSuite(CacheMvccSizeTest.class);
+        suite.addTestSuite(CacheMvccSqlLockTimeoutTest.class);
 
         suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class);
 

Reply via email to