IGNITE-9114: SQL: fail query after some timeout if it cannot be mapped to 
topology. This closes #4453.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce30b1e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce30b1e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce30b1e1

Branch: refs/heads/ignite-8446
Commit: ce30b1e1d921d9c3e1e065f7c01f1f066d42cba4
Parents: d73211d
Author: devozerov <ppoze...@gmail.com>
Authored: Tue Jul 31 12:53:51 2018 +0300
Committer: devozerov <ppoze...@gmail.com>
Committed: Tue Jul 31 12:53:51 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../query/h2/opt/GridH2IndexBase.java           |  31 +++--
 .../query/h2/twostep/GridMapQueryExecutor.java  |  75 ++++++++--
 .../h2/twostep/GridReduceQueryExecutor.java     | 136 +++++++++++++++----
 ...butedPartitionQueryNodeRestartsSelfTest.java |  21 ++-
 ...QueryNodeRestartDistributedJoinSelfTest.java |  10 ++
 6 files changed, 230 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index eb60371..c4b83e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -480,6 +480,9 @@ public final class IgniteSystemProperties {
     /** Disable SQL system views. */
     public static final String IGNITE_SQL_DISABLE_SYSTEM_VIEWS = 
"IGNITE_SQL_DISABLE_SYSTEM_VIEWS";
 
+    /** SQL retry timeout. */
+    public static final String IGNITE_SQL_RETRY_TIMEOUT = 
"IGNITE_SQL_RETRY_TIMEOUT";
+
     /** Maximum size for affinity assignment history. */
     public static final String IGNITE_AFFINITY_HISTORY_SIZE = 
"IGNITE_AFFINITY_HISTORY_SIZE";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 1c89213..77bd69a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -361,7 +361,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             locNodeHnd,
             GridIoPolicy.IDX_POOL,
             false))
-            throw new GridH2RetryException("Failed to send message to nodes: " 
+ nodes + ".");
+            throw retryException("Failed to send message to nodes: " + nodes);
     }
 
     /**
@@ -554,14 +554,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     ClusterNode node = ctx.discovery().node(nodeId);
 
                     if (node == null)
-                        throw new GridH2RetryException("Failed to find node.");
+                        throw retryException("Failed to get node by ID during 
broadcast [nodeId=" + nodeId + ']');
 
                     nodes.add(node);
                 }
             }
 
             if (F.isEmpty(nodes))
-                throw new GridH2RetryException("Failed to collect affinity 
nodes.");
+                throw retryException("Failed to collect affinity nodes during 
broadcast [" +
+                    "cacheName=" + cctx.name() + ']');
         }
 
         int segmentsCount = segmentsCount();
@@ -615,7 +616,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             node = cctx.affinity().primaryByKey(affKeyObj, 
qctx.topologyVersion());
 
             if (node == null) // Node was not found, probably topology changed 
and we need to retry the whole query.
-                throw new GridH2RetryException("Failed to find node.");
+                throw retryException("Failed to get primary node by key for 
range segment.");
         }
 
         return new SegmentKey(node, segmentForPartition(partition));
@@ -1317,10 +1318,10 @@ public abstract class GridH2IndexBase extends BaseIndex 
{
 
             for (int attempt = 0;; attempt++) {
                 if (qctx.isCleared())
-                    throw new GridH2RetryException("Query is cancelled.");
+                    throw retryException("Query is cancelled.");
 
                 if (kernalContext().isStopping())
-                    throw new GridH2RetryException("Stopping node.");
+                    throw retryException("Local node is stopping.");
 
                 GridH2IndexRangeResponse res;
 
@@ -1328,7 +1329,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     res = respQueue.poll(500, TimeUnit.MILLISECONDS);
                 }
                 catch (InterruptedException ignored) {
-                    throw new GridH2RetryException("Interrupted.");
+                    throw retryException("Interrupted while waiting for 
reply.");
                 }
 
                 if (res != null) {
@@ -1355,10 +1356,10 @@ public abstract class GridH2IndexBase extends BaseIndex 
{
 
                         case STATUS_NOT_FOUND:
                             if (req == null || req.bounds() == null) // We 
have already received the first response.
-                                throw new GridH2RetryException("Failure on 
remote node.");
+                                throw retryException("Failure on remote 
node.");
 
                             if (U.currentTimeMillis() - start > 30_000)
-                                throw new GridH2RetryException("Timeout.");
+                                throw retryException("Timeout reached.");
 
                             try {
                                 U.sleep(20 * attempt);
@@ -1381,7 +1382,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 }
 
                 if (!kernalContext().discovery().alive(node))
-                    throw new GridH2RetryException("Node left: " + node);
+                    throw retryException("Node has left topology: " + 
node.id());
             }
         }
 
@@ -1585,6 +1586,16 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
+     * Create retry exception for distributed join.
+     *
+     * @param msg Message.
+     * @return Exception.
+     */
+    private GridH2RetryException retryException(String msg) {
+        return new GridH2RetryException(msg);
+    }
+
+    /**
      *
      */
     private static final class CursorIteratorWrapper implements 
Iterator<GridH2Row> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 930ada2..216a259 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -293,6 +294,8 @@ public class GridMapQueryExecutor {
      * @param topVer Topology version.
      * @param explicitParts Explicit partitions list.
      * @param reserved Reserved list.
+     * @param nodeId Node ID.
+     * @param reqId Request ID.
      * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
@@ -300,7 +303,9 @@ public class GridMapQueryExecutor {
         @Nullable List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
         final int[] explicitParts,
-        List<GridReservable> reserved
+        List<GridReservable> reserved,
+        UUID nodeId,
+        long reqId
     ) throws IgniteCheckedException {
         assert topVer != null;
 
@@ -312,8 +317,14 @@ public class GridMapQueryExecutor {
         for (int i = 0; i < cacheIds.size(); i++) {
             GridCacheContext<?, ?> cctx = 
ctx.cache().context().cacheContext(cacheIds.get(i));
 
-            if (cctx == null) // Cache was not found, probably was not 
deployed yet.
+            // Cache was not found, probably was not deployed yet.
+            if (cctx == null) {
+                logRetry("Failed to reserve partitions for query (cache is not 
found on local node) [" +
+                    "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", 
affTopVer=" + topVer + ", cacheId=" +
+                    cacheIds.get(i) + "]");
+
                 return false;
+            }
 
             if (cctx.isLocal() || !cctx.rebalanceEnabled())
                 continue;
@@ -325,8 +336,13 @@ public class GridMapQueryExecutor {
 
             if (explicitParts == null && r != null) { // Try to reserve group 
partition if any and no explicits.
                 if (r != MapReplicatedReservation.INSTANCE) {
-                    if (!r.reserve())
+                    if (!r.reserve()) {
+                        logRetry("Failed to reserve partitions for query 
(group reservation failed) [" +
+                            "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", 
affTopVer=" + topVer +
+                            ", cacheId=" + cacheIds.get(i) + ", cacheName=" + 
cctx.name() + "]");
+
                         return false; // We need explicit partitions here -> 
retry.
+                    }
 
                     reserved.add(r);
                 }
@@ -340,8 +356,17 @@ public class GridMapQueryExecutor {
                             GridDhtLocalPartition part = partition(cctx, p);
 
                             // We don't need to reserve partitions because 
they will not be evicted in replicated caches.
-                            if (part == null || part.state() != OWNING)
+                            GridDhtPartitionState partState = part != null ? 
part.state() : null;
+
+                            if (partState != OWNING) {
+                                logRetry("Failed to reserve partitions for 
query (partition of " +
+                                    "REPLICATED cache is not in OWNING state) 
[rmtNodeId=" + nodeId +
+                                    ", reqId=" + reqId + ", affTopVer=" + 
topVer + ", cacheId=" + cacheIds.get(i) +
+                                    ", cacheName=" + cctx.name() + ", part=" + 
p + ", partFound=" + (part != null) +
+                                    ", partState=" + partState + "]");
+
                                 return false;
+                            }
                         }
 
                         // Mark that we checked this replicated cache.
@@ -355,14 +380,31 @@ public class GridMapQueryExecutor {
                     for (int partId : partIds) {
                         GridDhtLocalPartition part = partition(cctx, partId);
 
-                        if (part == null || part.state() != OWNING || 
!part.reserve())
+                        GridDhtPartitionState partState = part != null ? 
part.state() : null;
+
+                        if (partState != OWNING || !part.reserve()) {
+                            logRetry("Failed to reserve partitions for query 
(partition of " +
+                                "PARTITIONED cache cannot be reserved) 
[rmtNodeId=" + nodeId + ", reqId=" + reqId +
+                                ", affTopVer=" + topVer + ", cacheId=" + 
cacheIds.get(i) +
+                                ", cacheName=" + cctx.name() + ", part=" + 
partId + ", partFound=" + (part != null) +
+                                ", partState=" + partState + "]");
+
                             return false;
+                        }
 
                         reserved.add(part);
 
                         // Double check that we are still in owning state and 
partition contents are not cleared.
-                        if (part.state() != OWNING)
+                        partState = part.state();
+
+                        if (part.state() != OWNING) {
+                            logRetry("Failed to reserve partitions for query 
(partition of " +
+                                "PARTITIONED cache is not in OWNING state 
after reservation) [rmtNodeId=" + nodeId +
+                                ", reqId=" + reqId + ", affTopVer=" + topVer + 
", cacheId=" + cacheIds.get(i) +
+                                ", cacheName=" + cctx.name() + ", part=" + 
partId + ", partState=" + partState + "]");
+
                             return false;
+                        }
                     }
 
                     if (explicitParts == null) {
@@ -388,6 +430,15 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * Load failed partition reservation.
+     *
+     * @param msg Message.
+     */
+    private void logRetry(String msg) {
+        log.info(msg);
+    }
+
+    /**
      * @param ints Integers.
      * @return Collection wrapper.
      */
@@ -622,7 +673,7 @@ public class GridMapQueryExecutor {
         try {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
-                if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+                if (!reservePartitions(cacheIds, topVer, parts, reserved, 
node.id(), reqId)) {
                     // Unregister lazy worker because re-try may never reach 
this node again.
                     if (lazy)
                         stopAndUnregisterCurrentLazyWorker();
@@ -739,8 +790,14 @@ public class GridMapQueryExecutor {
             if (lazy)
                 stopAndUnregisterCurrentLazyWorker();
 
-            if (X.hasCause(e, GridH2RetryException.class))
+            GridH2RetryException retryErr = X.cause(e, 
GridH2RetryException.class);
+
+            if (retryErr != null) {
+                logRetry("Failed to execute non-collocated query (will retry) 
[nodeId=" + node.id() +
+                    ", reqId=" + reqId + ", errMsg=" + retryErr.getMessage() + 
']');
+
                 sendRetry(node, reqId, segmentId);
+            }
             else {
                 U.error(log, "Failed to execute local query.", e);
 
@@ -788,7 +845,7 @@ public class GridMapQueryExecutor {
 
         List<GridReservable> reserved = new ArrayList<>();
 
-        if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+        if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), 
reqId)) {
             U.error(log, "Failed to reserve partitions for DML request. 
[localNodeId=" + ctx.localNodeId() +
                 ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", 
cacheIds=" + cacheIds +
                 ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + 
']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index cd76bc1..d778fcc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -45,6 +45,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -101,6 +102,7 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
 import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
@@ -111,6 +113,9 @@ import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySpl
  * Reduce query executor.
  */
 public class GridReduceQueryExecutor {
+    /** Fail query after 10 seconds of unsuccessful attempts to reserve 
partitions. */
+    public static final long DFLT_RETRY_TIMEOUT = 10_000L;
+
     /** */
     private static final String MERGE_INDEX_UNSORTED = "merge_scan";
 
@@ -442,14 +447,24 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Load failed partition reservation.
+     *
+     * @param msg Message.
+     */
+    private void logRetry(String msg) {
+        log.info(msg);
+    }
+
+    /**
      * @param isReplicatedOnly If we must only have replicated caches.
      * @param topVer Topology version.
      * @param cacheIds Participating cache IDs.
      * @param parts Partitions.
+     * @param qryId Query ID.
      * @return Data nodes or {@code null} if repartitioning started and we 
need to retry.
      */
     private Map<ClusterNode, IntArray> stableDataNodes(boolean 
isReplicatedOnly, AffinityTopologyVersion topVer,
-        List<Integer> cacheIds, int[] parts) {
+        List<Integer> cacheIds, int[] parts, long qryId) {
         GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
 
         // If the first cache is not partitioned, find it (if it's present) 
and move it to index 0.
@@ -507,8 +522,15 @@ public class GridReduceQueryExecutor {
                 disjoint = !extraNodes.equals(nodes);
 
             if (disjoint) {
-                if (isPreloadingActive(cacheIds))
+                if (isPreloadingActive(cacheIds)) {
+                    logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map during rebalance) " +
+                        "[qryId=" + qryId + ", affTopVer=" + topVer + ", 
cacheIds=" + cacheIds +
+                        ", parts=" + (parts == null ? "[]" : 
Arrays.toString(parts)) +
+                        ", replicatedOnly=" + isReplicatedOnly + ", 
lastCache=" + extraCctx.name() +
+                        ", lastCacheId=" + extraCctx.cacheId() + ']');
+
                     return null; // Retry.
+                }
                 else
                     throw new CacheException("Caches have distinct sets of 
data nodes [cache1=" + cctx.name() +
                         ", cache2=" + extraCacheName + "]");
@@ -546,8 +568,14 @@ public class GridReduceQueryExecutor {
 
         final boolean isReplicatedOnly = qry.isReplicatedOnly();
 
-        // Fail if all caches are replicated and explicit partitions are set.
+        long retryTimeout = retryTimeout();
+
+        final long startTime = U.currentTimeMillis();
+
         for (int attempt = 0;; attempt++) {
+            if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - 
startTime > retryTimeout))
+                throw new CacheException("Failed to map SQL query to 
topology.");
+
             if (attempt != 0) {
                 try {
                     Thread.sleep(attempt * 10); // Wait for exchange.
@@ -559,7 +587,7 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            final long qryReqId = qryIdGen.incrementAndGet();
+            long qryReqId = qryIdGen.incrementAndGet();
 
             final ReduceQueryRun r = new ReduceQueryRun(qryReqId, 
qry.originalSql(), schemaName,
                 h2.connectionForSchema(schemaName), qry.mapQueries().size(), 
qry.pageSize(),
@@ -602,7 +630,8 @@ public class GridReduceQueryExecutor {
             if (qry.isLocal())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
-                NodesForPartitionsResult nodesParts = 
nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
+                NodesForPartitionsResult nodesParts =
+                    nodesForPartitions(cacheIds, topVer, parts, 
isReplicatedOnly, qryReqId);
 
                 nodes = nodesParts.nodes();
                 partsMap = nodesParts.partitionsMap();
@@ -704,9 +733,11 @@ public class GridReduceQueryExecutor {
 
                 final boolean distributedJoins = qry.distributedJoins();
 
+                final long qryReqId0 = qryReqId;
+
                 cancel.set(new Runnable() {
                     @Override public void run() {
-                        send(finalNodes, new GridQueryCancelRequest(qryReqId), 
null, false);
+                        send(finalNodes, new 
GridQueryCancelRequest(qryReqId0), null, false);
                     }
                 });
 
@@ -885,10 +916,10 @@ public class GridReduceQueryExecutor {
     ) {
         AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-        NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, 
topVer, parts, isReplicatedOnly);
-
         final long reqId = qryIdGen.incrementAndGet();
 
+        NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, 
topVer, parts, isReplicatedOnly, reqId);
+
         final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, 
selectQry, GridCacheQueryType.SQL_FIELDS,
             schemaName, U.currentTimeMillis(), cancel, false);
 
@@ -1128,9 +1159,10 @@ public class GridReduceQueryExecutor {
      * Calculates data nodes for replicated caches on unstable topology.
      *
      * @param cacheIds Cache IDs.
+     * @param qryId Query ID.
      * @return Collection of all data nodes owning all the caches or {@code 
null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> 
cacheIds) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> 
cacheIds, long qryId) {
         int i = 0;
 
         GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
@@ -1145,7 +1177,7 @@ public class GridReduceQueryExecutor {
             assert cctx.isReplicated(): "all the extra caches must be 
replicated here";
         }
 
-        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
+        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId);
 
         if (F.isEmpty(nodes))
             return null; // Retry.
@@ -1161,15 +1193,20 @@ public class GridReduceQueryExecutor {
                     "with tables in partitioned caches [replicatedCache=" + 
cctx.name() + ", " +
                     "partitionedCache=" + extraCctx.name() + "]");
 
-            Set<ClusterNode> extraOwners = 
replicatedUnstableDataNodes(extraCctx);
+            Set<ClusterNode> extraOwners = 
replicatedUnstableDataNodes(extraCctx, qryId);
 
             if (F.isEmpty(extraOwners))
                 return null; // Retry.
 
             nodes.retainAll(extraOwners);
 
-            if (nodes.isEmpty())
+            if (nodes.isEmpty()) {
+                logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map for REPLICATED caches " +
+                    "during rebalance) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                    ", lastCache=" + extraCctx.name() + ", lastCacheId=" + 
extraCctx.cacheId() + ']');
+
                 return null; // Retry.
+            }
         }
 
         return nodes;
@@ -1190,9 +1227,10 @@ public class GridReduceQueryExecutor {
      * Collects all the nodes owning all the partitions for the given 
replicated cache.
      *
      * @param cctx Cache context.
+     * @param qryId Query ID.
      * @return Owning nodes or {@code null} if we can't find owners for some 
partitions.
      */
-    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> 
cctx) {
+    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> 
cctx, long qryId) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
         String cacheName = cctx.name();
@@ -1206,13 +1244,23 @@ public class GridReduceQueryExecutor {
         for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
-            if (F.isEmpty(owners))
+            if (F.isEmpty(owners)) {
+                logRetry("Failed to calculate nodes for SQL query (partition 
of a REPLICATED cache has no owners) [" +
+                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
+                    ", part=" + p + ']');
+
                 return null; // Retry.
+            }
 
             dataNodes.retainAll(owners);
 
-            if (dataNodes.isEmpty())
+            if (dataNodes.isEmpty()) {
+                logRetry("Failed to calculate nodes for SQL query (partitions 
of a REPLICATED has no common owners) [" +
+                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
+                    ", lastPart=" + p + ']');
+
                 return null; // Retry.
+            }
         }
 
         return dataNodes;
@@ -1222,10 +1270,11 @@ public class GridReduceQueryExecutor {
      * Calculates partition mapping for partitioned cache on unstable topology.
      *
      * @param cacheIds Cache IDs.
+     * @param qryId Query ID.
      * @return Partition mapping or {@code null} if we can't calculate it due 
to repartitioning and we need to retry.
      */
     @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> 
partitionedUnstableDataNodes(List<Integer> cacheIds) {
+    private Map<ClusterNode, IntArray> 
partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) {
         // If the main cache is replicated, just replace it with the first 
partitioned.
         GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
 
@@ -1260,8 +1309,14 @@ public class GridReduceQueryExecutor {
 
                     continue;
                 }
-                else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE)))
+                else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) {
+                    logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but corresponding " +
+                        "cache group has data nodes) [qryId=" + qryId + ", 
cacheIds=" + cacheIds +
+                        ", cacheName=" + cctx.name() + ", cacheId=" + 
cctx.cacheId() + ", part=" + p +
+                        ", cacheGroupId=" + cctx.groupId() + ']');
+
                     return null; // Retry.
+                }
 
                 throw new CacheException("Failed to find data nodes [cache=" + 
cctx.name() + ", part=" + p + "]");
             }
@@ -1289,8 +1344,15 @@ public class GridReduceQueryExecutor {
                         continue; // Skip unmapped partitions.
 
                     if (F.isEmpty(owners)) {
-                        if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE)))
+                        if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) {
+                            logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but " +
+                                "corresponding cache group has data nodes) 
[qryId=" + qryId +
+                                ", cacheIds=" + cacheIds + ", cacheName=" + 
extraCctx.name() +
+                                ", cacheId=" + extraCctx.cacheId() + ", part=" 
+ p +
+                                ", cacheGroupId=" + extraCctx.groupId() + ']');
+
                             return null; // Retry.
+                        }
 
                         throw new CacheException("Failed to find data nodes 
[cache=" + extraCctx.name() +
                             ", part=" + p + "]");
@@ -1301,8 +1363,14 @@ public class GridReduceQueryExecutor {
                     else {
                         partLocs[p].retainAll(owners); // Intersection of 
owners.
 
-                        if (partLocs[p].isEmpty())
+                        if (partLocs[p].isEmpty()) {
+                            logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
+                                "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                                ", lastCacheName=" + extraCctx.name() + ", 
lastCacheId=" + extraCctx.cacheId() +
+                                ", part=" + p + ']');
+
                             return null; // Intersection is empty -> retry.
+                        }
                     }
                 }
             }
@@ -1314,19 +1382,29 @@ public class GridReduceQueryExecutor {
                 if (!extraCctx.isReplicated())
                     continue;
 
-                Set<ClusterNode> dataNodes = 
replicatedUnstableDataNodes(extraCctx);
+                Set<ClusterNode> dataNodes = 
replicatedUnstableDataNodes(extraCctx, qryId);
 
                 if (F.isEmpty(dataNodes))
                     return null; // Retry.
 
+                int part = 0;
+
                 for (Set<ClusterNode> partLoc : partLocs) {
                     if (partLoc == UNMAPPED_PARTS)
                         continue; // Skip unmapped partition.
 
                     partLoc.retainAll(dataNodes);
 
-                    if (partLoc.isEmpty())
+                    if (partLoc.isEmpty()) {
+                        logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
+                            "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                            ", lastReplicatedCacheName=" + extraCctx.name() +
+                            ", lastReplicatedCacheId=" + extraCctx.cacheId() + 
", part=" + part + ']');
+
                         return null; // Retry.
+                    }
+
+                    part++;
                 }
             }
         }
@@ -1475,19 +1553,20 @@ public class GridReduceQueryExecutor {
      * @param topVer Topology version.
      * @param parts Partitions array.
      * @param isReplicatedOnly Allow only replicated caches.
+     * @param qryId Query ID.
      * @return Result.
      */
     private NodesForPartitionsResult nodesForPartitions(List<Integer> 
cacheIds, AffinityTopologyVersion topVer,
-        int[] parts, boolean isReplicatedOnly) {
+        int[] parts, boolean isReplicatedOnly, long qryId) {
         Collection<ClusterNode> nodes = null;
         Map<ClusterNode, IntArray> partsMap = null;
         Map<ClusterNode, IntArray> qryMap = null;
 
         if (isPreloadingActive(cacheIds)) {
             if (isReplicatedOnly)
-                nodes = replicatedUnstableDataNodes(cacheIds);
+                nodes = replicatedUnstableDataNodes(cacheIds, qryId);
             else {
-                partsMap = partitionedUnstableDataNodes(cacheIds);
+                partsMap = partitionedUnstableDataNodes(cacheIds, qryId);
 
                 if (partsMap != null) {
                     qryMap = narrowForQuery(partsMap, parts);
@@ -1497,7 +1576,7 @@ public class GridReduceQueryExecutor {
             }
         }
         else {
-            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, 
parts);
+            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, 
parts, qryId);
 
             if (qryMap != null)
                 nodes = qryMap.keySet();
@@ -1676,6 +1755,13 @@ public class GridReduceQueryExecutor {
         return cp.isEmpty() ? null : cp;
     }
 
+    /**
+     * @return Query retry timeout.
+     */
+    private static long retryTimeout() {
+        return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, 
DFLT_RETRY_TIMEOUT);
+    }
+
     /** */
     private static class ExplicitPartitionsSpecializer implements 
IgniteBiClosure<ClusterNode, Message, Message> {
         /** Partitions map. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
index 8525410..806fbbf 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
@@ -24,14 +24,31 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
 
 /**
  * Tests distributed queries over set of partitions on unstable topology.
  */
-public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest
-        extends IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest extends
+    IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, 
Long.toString(1000_000L));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT,
+            Long.toString(GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT));
+
+        super.afterTestsStopped();
+    }
+
     /**
      * Tests join query within region on unstable topology.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index 4f20078..bad5303 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -19,8 +19,10 @@ package 
org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.CAX;
 import org.apache.ignite.internal.util.typedef.X;
@@ -41,6 +43,8 @@ public class 
IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, 
Long.toString(1000_000L));
+
         super.beforeTestsStarted();
 
         if (totalNodes > GRID_CNT) {
@@ -51,6 +55,12 @@ public class 
IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
             totalNodes = GRID_CNT;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT,
+            Long.toString(GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT));
+    }
+
     /**
      * @throws Exception If failed.
      */

Reply via email to