IGNITE-9114: SQL: fail query after some timeout if it cannot be mapped to topology. This closes #4453.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce30b1e1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce30b1e1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce30b1e1 Branch: refs/heads/ignite-8446 Commit: ce30b1e1d921d9c3e1e065f7c01f1f066d42cba4 Parents: d73211d Author: devozerov <ppoze...@gmail.com> Authored: Tue Jul 31 12:53:51 2018 +0300 Committer: devozerov <ppoze...@gmail.com> Committed: Tue Jul 31 12:53:51 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../query/h2/opt/GridH2IndexBase.java | 31 +++-- .../query/h2/twostep/GridMapQueryExecutor.java | 75 ++++++++-- .../h2/twostep/GridReduceQueryExecutor.java | 136 +++++++++++++++---- ...butedPartitionQueryNodeRestartsSelfTest.java | 21 ++- ...QueryNodeRestartDistributedJoinSelfTest.java | 10 ++ 6 files changed, 230 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index eb60371..c4b83e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -480,6 +480,9 @@ public final class IgniteSystemProperties { /** Disable SQL system views. */ public static final String IGNITE_SQL_DISABLE_SYSTEM_VIEWS = "IGNITE_SQL_DISABLE_SYSTEM_VIEWS"; + /** SQL retry timeout. */ + public static final String IGNITE_SQL_RETRY_TIMEOUT = "IGNITE_SQL_RETRY_TIMEOUT"; + /** Maximum size for affinity assignment history. */ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 1c89213..77bd69a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -361,7 +361,7 @@ public abstract class GridH2IndexBase extends BaseIndex { locNodeHnd, GridIoPolicy.IDX_POOL, false)) - throw new GridH2RetryException("Failed to send message to nodes: " + nodes + "."); + throw retryException("Failed to send message to nodes: " + nodes); } /** @@ -554,14 +554,15 @@ public abstract class GridH2IndexBase extends BaseIndex { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) - throw new GridH2RetryException("Failed to find node."); + throw retryException("Failed to get node by ID during broadcast [nodeId=" + nodeId + ']'); nodes.add(node); } } if (F.isEmpty(nodes)) - throw new GridH2RetryException("Failed to collect affinity nodes."); + throw retryException("Failed to collect affinity nodes during broadcast [" + + "cacheName=" + cctx.name() + ']'); } int segmentsCount = segmentsCount(); @@ -615,7 +616,7 @@ public abstract class GridH2IndexBase extends BaseIndex { node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion()); if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. - throw new GridH2RetryException("Failed to find node."); + throw retryException("Failed to get primary node by key for range segment."); } return new SegmentKey(node, segmentForPartition(partition)); @@ -1317,10 +1318,10 @@ public abstract class GridH2IndexBase extends BaseIndex { for (int attempt = 0;; attempt++) { if (qctx.isCleared()) - throw new GridH2RetryException("Query is cancelled."); + throw retryException("Query is cancelled."); if (kernalContext().isStopping()) - throw new GridH2RetryException("Stopping node."); + throw retryException("Local node is stopping."); GridH2IndexRangeResponse res; @@ -1328,7 +1329,7 @@ public abstract class GridH2IndexBase extends BaseIndex { res = respQueue.poll(500, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { - throw new GridH2RetryException("Interrupted."); + throw retryException("Interrupted while waiting for reply."); } if (res != null) { @@ -1355,10 +1356,10 @@ public abstract class GridH2IndexBase extends BaseIndex { case STATUS_NOT_FOUND: if (req == null || req.bounds() == null) // We have already received the first response. - throw new GridH2RetryException("Failure on remote node."); + throw retryException("Failure on remote node."); if (U.currentTimeMillis() - start > 30_000) - throw new GridH2RetryException("Timeout."); + throw retryException("Timeout reached."); try { U.sleep(20 * attempt); @@ -1381,7 +1382,7 @@ public abstract class GridH2IndexBase extends BaseIndex { } if (!kernalContext().discovery().alive(node)) - throw new GridH2RetryException("Node left: " + node); + throw retryException("Node has left topology: " + node.id()); } } @@ -1585,6 +1586,16 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** + * Create retry exception for distributed join. + * + * @param msg Message. + * @return Exception. + */ + private GridH2RetryException retryException(String msg) { + return new GridH2RetryException(msg); + } + + /** * */ private static final class CursorIteratorWrapper implements Iterator<GridH2Row> { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 930ada2..216a259 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -293,6 +294,8 @@ public class GridMapQueryExecutor { * @param topVer Topology version. * @param explicitParts Explicit partitions list. * @param reserved Reserved list. + * @param nodeId Node ID. + * @param reqId Request ID. * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ @@ -300,7 +303,9 @@ public class GridMapQueryExecutor { @Nullable List<Integer> cacheIds, AffinityTopologyVersion topVer, final int[] explicitParts, - List<GridReservable> reserved + List<GridReservable> reserved, + UUID nodeId, + long reqId ) throws IgniteCheckedException { assert topVer != null; @@ -312,8 +317,14 @@ public class GridMapQueryExecutor { for (int i = 0; i < cacheIds.size(); i++) { GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); - if (cctx == null) // Cache was not found, probably was not deployed yet. + // Cache was not found, probably was not deployed yet. + if (cctx == null) { + logRetry("Failed to reserve partitions for query (cache is not found on local node) [" + + "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + + cacheIds.get(i) + "]"); + return false; + } if (cctx.isLocal() || !cctx.rebalanceEnabled()) continue; @@ -325,8 +336,13 @@ public class GridMapQueryExecutor { if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. if (r != MapReplicatedReservation.INSTANCE) { - if (!r.reserve()) + if (!r.reserve()) { + logRetry("Failed to reserve partitions for query (group reservation failed) [" + + "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + + ", cacheId=" + cacheIds.get(i) + ", cacheName=" + cctx.name() + "]"); + return false; // We need explicit partitions here -> retry. + } reserved.add(r); } @@ -340,8 +356,17 @@ public class GridMapQueryExecutor { GridDhtLocalPartition part = partition(cctx, p); // We don't need to reserve partitions because they will not be evicted in replicated caches. - if (part == null || part.state() != OWNING) + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING) { + logRetry("Failed to reserve partitions for query (partition of " + + "REPLICATED cache is not in OWNING state) [rmtNodeId=" + nodeId + + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + + ", cacheName=" + cctx.name() + ", part=" + p + ", partFound=" + (part != null) + + ", partState=" + partState + "]"); + return false; + } } // Mark that we checked this replicated cache. @@ -355,14 +380,31 @@ public class GridMapQueryExecutor { for (int partId : partIds) { GridDhtLocalPartition part = partition(cctx, partId); - if (part == null || part.state() != OWNING || !part.reserve()) + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING || !part.reserve()) { + logRetry("Failed to reserve partitions for query (partition of " + + "PARTITIONED cache cannot be reserved) [rmtNodeId=" + nodeId + ", reqId=" + reqId + + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + + ", cacheName=" + cctx.name() + ", part=" + partId + ", partFound=" + (part != null) + + ", partState=" + partState + "]"); + return false; + } reserved.add(part); // Double check that we are still in owning state and partition contents are not cleared. - if (part.state() != OWNING) + partState = part.state(); + + if (part.state() != OWNING) { + logRetry("Failed to reserve partitions for query (partition of " + + "PARTITIONED cache is not in OWNING state after reservation) [rmtNodeId=" + nodeId + + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + + ", cacheName=" + cctx.name() + ", part=" + partId + ", partState=" + partState + "]"); + return false; + } } if (explicitParts == null) { @@ -388,6 +430,15 @@ public class GridMapQueryExecutor { } /** + * Load failed partition reservation. + * + * @param msg Message. + */ + private void logRetry(String msg) { + log.info(msg); + } + + /** * @param ints Integers. * @return Collection wrapper. */ @@ -622,7 +673,7 @@ public class GridMapQueryExecutor { try { if (topVer != null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) stopAndUnregisterCurrentLazyWorker(); @@ -739,8 +790,14 @@ public class GridMapQueryExecutor { if (lazy) stopAndUnregisterCurrentLazyWorker(); - if (X.hasCause(e, GridH2RetryException.class)) + GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); + + if (retryErr != null) { + logRetry("Failed to execute non-collocated query (will retry) [nodeId=" + node.id() + + ", reqId=" + reqId + ", errMsg=" + retryErr.getMessage() + ']'); + sendRetry(node, reqId, segmentId); + } else { U.error(log, "Failed to execute local query.", e); @@ -788,7 +845,7 @@ public class GridMapQueryExecutor { List<GridReservable> reserved = new ArrayList<>(); - if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() + ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index cd76bc1..d778fcc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -45,6 +45,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -101,6 +102,7 @@ import org.h2.value.Value; import org.jetbrains.annotations.Nullable; import static java.util.Collections.singletonList; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; @@ -111,6 +113,9 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySpl * Reduce query executor. */ public class GridReduceQueryExecutor { + /** Fail query after 10 seconds of unsuccessful attempts to reserve partitions. */ + public static final long DFLT_RETRY_TIMEOUT = 10_000L; + /** */ private static final String MERGE_INDEX_UNSORTED = "merge_scan"; @@ -442,14 +447,24 @@ public class GridReduceQueryExecutor { } /** + * Load failed partition reservation. + * + * @param msg Message. + */ + private void logRetry(String msg) { + log.info(msg); + } + + /** * @param isReplicatedOnly If we must only have replicated caches. * @param topVer Topology version. * @param cacheIds Participating cache IDs. * @param parts Partitions. + * @param qryId Query ID. * @return Data nodes or {@code null} if repartitioning started and we need to retry. */ private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer, - List<Integer> cacheIds, int[] parts) { + List<Integer> cacheIds, int[] parts, long qryId) { GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0)); // If the first cache is not partitioned, find it (if it's present) and move it to index 0. @@ -507,8 +522,15 @@ public class GridReduceQueryExecutor { disjoint = !extraNodes.equals(nodes); if (disjoint) { - if (isPreloadingActive(cacheIds)) + if (isPreloadingActive(cacheIds)) { + logRetry("Failed to calculate nodes for SQL query (got disjoint node map during rebalance) " + + "[qryId=" + qryId + ", affTopVer=" + topVer + ", cacheIds=" + cacheIds + + ", parts=" + (parts == null ? "[]" : Arrays.toString(parts)) + + ", replicatedOnly=" + isReplicatedOnly + ", lastCache=" + extraCctx.name() + + ", lastCacheId=" + extraCctx.cacheId() + ']'); + return null; // Retry. + } else throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + ", cache2=" + extraCacheName + "]"); @@ -546,8 +568,14 @@ public class GridReduceQueryExecutor { final boolean isReplicatedOnly = qry.isReplicatedOnly(); - // Fail if all caches are replicated and explicit partitions are set. + long retryTimeout = retryTimeout(); + + final long startTime = U.currentTimeMillis(); + for (int attempt = 0;; attempt++) { + if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) + throw new CacheException("Failed to map SQL query to topology."); + if (attempt != 0) { try { Thread.sleep(attempt * 10); // Wait for exchange. @@ -559,7 +587,7 @@ public class GridReduceQueryExecutor { } } - final long qryReqId = qryIdGen.incrementAndGet(); + long qryReqId = qryIdGen.incrementAndGet(); final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName, h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(), @@ -602,7 +630,8 @@ public class GridReduceQueryExecutor { if (qry.isLocal()) nodes = singletonList(ctx.discovery().localNode()); else { - NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly); + NodesForPartitionsResult nodesParts = + nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryReqId); nodes = nodesParts.nodes(); partsMap = nodesParts.partitionsMap(); @@ -704,9 +733,11 @@ public class GridReduceQueryExecutor { final boolean distributedJoins = qry.distributedJoins(); + final long qryReqId0 = qryReqId; + cancel.set(new Runnable() { @Override public void run() { - send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false); + send(finalNodes, new GridQueryCancelRequest(qryReqId0), null, false); } }); @@ -885,10 +916,10 @@ public class GridReduceQueryExecutor { ) { AffinityTopologyVersion topVer = h2.readyTopologyVersion(); - NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly); - final long reqId = qryIdGen.incrementAndGet(); + NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, reqId); + final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, false); @@ -1128,9 +1159,10 @@ public class GridReduceQueryExecutor { * Calculates data nodes for replicated caches on unstable topology. * * @param cacheIds Cache IDs. + * @param qryId Query ID. * @return Collection of all data nodes owning all the caches or {@code null} for retry. */ - private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) { + private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds, long qryId) { int i = 0; GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++)); @@ -1145,7 +1177,7 @@ public class GridReduceQueryExecutor { assert cctx.isReplicated(): "all the extra caches must be replicated here"; } - Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx); + Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId); if (F.isEmpty(nodes)) return null; // Retry. @@ -1161,15 +1193,20 @@ public class GridReduceQueryExecutor { "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " + "partitionedCache=" + extraCctx.name() + "]"); - Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); + Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx, qryId); if (F.isEmpty(extraOwners)) return null; // Retry. nodes.retainAll(extraOwners); - if (nodes.isEmpty()) + if (nodes.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (got disjoint node map for REPLICATED caches " + + "during rebalance) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']'); + return null; // Retry. + } } return nodes; @@ -1190,9 +1227,10 @@ public class GridReduceQueryExecutor { * Collects all the nodes owning all the partitions for the given replicated cache. * * @param cctx Cache context. + * @param qryId Query ID. * @return Owning nodes or {@code null} if we can't find owners for some partitions. */ - private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) { + private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx, long qryId) { assert cctx.isReplicated() : cctx.name() + " must be replicated"; String cacheName = cctx.name(); @@ -1206,13 +1244,23 @@ public class GridReduceQueryExecutor { for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = cctx.topology().owners(p); - if (F.isEmpty(owners)) + if (F.isEmpty(owners)) { + logRetry("Failed to calculate nodes for SQL query (partition of a REPLICATED cache has no owners) [" + + "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + + ", part=" + p + ']'); + return null; // Retry. + } dataNodes.retainAll(owners); - if (dataNodes.isEmpty()) + if (dataNodes.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (partitions of a REPLICATED has no common owners) [" + + "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + + ", lastPart=" + p + ']'); + return null; // Retry. + } } return dataNodes; @@ -1222,10 +1270,11 @@ public class GridReduceQueryExecutor { * Calculates partition mapping for partitioned cache on unstable topology. * * @param cacheIds Cache IDs. + * @param qryId Query ID. * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. */ @SuppressWarnings("unchecked") - private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) { + private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) { // If the main cache is replicated, just replace it with the first partitioned. GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds); @@ -1260,8 +1309,14 @@ public class GridReduceQueryExecutor { continue; } - else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) + else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) { + logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding " + + "cache group has data nodes) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p + + ", cacheGroupId=" + cctx.groupId() + ']'); + return null; // Retry. + } throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); } @@ -1289,8 +1344,15 @@ public class GridReduceQueryExecutor { continue; // Skip unmapped partitions. if (F.isEmpty(owners)) { - if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) + if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) { + logRetry("Failed to calculate nodes for SQL query (partition has no owners, but " + + "corresponding cache group has data nodes) [qryId=" + qryId + + ", cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() + + ", cacheId=" + extraCctx.cacheId() + ", part=" + p + + ", cacheGroupId=" + extraCctx.groupId() + ']'); + return null; // Retry. + } throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]"); @@ -1301,8 +1363,14 @@ public class GridReduceQueryExecutor { else { partLocs[p].retainAll(owners); // Intersection of owners. - if (partLocs[p].isEmpty()) + if (partLocs[p].isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + + "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastCacheName=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + + ", part=" + p + ']'); + return null; // Intersection is empty -> retry. + } } } } @@ -1314,19 +1382,29 @@ public class GridReduceQueryExecutor { if (!extraCctx.isReplicated()) continue; - Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx); + Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx, qryId); if (F.isEmpty(dataNodes)) return null; // Retry. + int part = 0; + for (Set<ClusterNode> partLoc : partLocs) { if (partLoc == UNMAPPED_PARTS) continue; // Skip unmapped partition. partLoc.retainAll(dataNodes); - if (partLoc.isEmpty()) + if (partLoc.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + + "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastReplicatedCacheName=" + extraCctx.name() + + ", lastReplicatedCacheId=" + extraCctx.cacheId() + ", part=" + part + ']'); + return null; // Retry. + } + + part++; } } } @@ -1475,19 +1553,20 @@ public class GridReduceQueryExecutor { * @param topVer Topology version. * @param parts Partitions array. * @param isReplicatedOnly Allow only replicated caches. + * @param qryId Query ID. * @return Result. */ private NodesForPartitionsResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, - int[] parts, boolean isReplicatedOnly) { + int[] parts, boolean isReplicatedOnly, long qryId) { Collection<ClusterNode> nodes = null; Map<ClusterNode, IntArray> partsMap = null; Map<ClusterNode, IntArray> qryMap = null; if (isPreloadingActive(cacheIds)) { if (isReplicatedOnly) - nodes = replicatedUnstableDataNodes(cacheIds); + nodes = replicatedUnstableDataNodes(cacheIds, qryId); else { - partsMap = partitionedUnstableDataNodes(cacheIds); + partsMap = partitionedUnstableDataNodes(cacheIds, qryId); if (partsMap != null) { qryMap = narrowForQuery(partsMap, parts); @@ -1497,7 +1576,7 @@ public class GridReduceQueryExecutor { } } else { - qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts); + qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts, qryId); if (qryMap != null) nodes = qryMap.keySet(); @@ -1676,6 +1755,13 @@ public class GridReduceQueryExecutor { return cp.isEmpty() ? null : cp; } + /** + * @return Query retry timeout. + */ + private static long retryTimeout() { + return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT); + } + /** */ private static class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode, Message, Message> { /** Partitions map. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java index 8525410..806fbbf 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java @@ -24,14 +24,31 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; /** * Tests distributed queries over set of partitions on unstable topology. */ -public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest - extends IgniteCacheDistributedPartitionQueryAbstractSelfTest { +public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest extends + IgniteCacheDistributedPartitionQueryAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, Long.toString(1000_000L)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, + Long.toString(GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT)); + + super.afterTestsStopped(); + } + /** * Tests join query within region on unstable topology. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce30b1e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java index 4f20078..bad5303 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.typedef.CAX; import org.apache.ignite.internal.util.typedef.X; @@ -41,6 +43,8 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, Long.toString(1000_000L)); + super.beforeTestsStarted(); if (totalNodes > GRID_CNT) { @@ -51,6 +55,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa totalNodes = GRID_CNT; } + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT, + Long.toString(GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT)); + } + /** * @throws Exception If failed. */