This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new af5d1ddbee2 IGNITE-23975 SQL Calcite: Add group partitions reservation
- Fixes #11758.
af5d1ddbee2 is described below
commit af5d1ddbee2a6216e9f5a6b17b499df22b1e9aa0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Dec 27 21:22:05 2024 +0300
IGNITE-23975 SQL Calcite: Add group partitions reservation - Fixes #11758.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/AbstractCacheScan.java | 94 ++---
.../processors/query/calcite/exec/IndexScan.java | 2 +-
.../processors/query/calcite/exec/TableScan.java | 4 +-
.../query/calcite/metadata/ColocationGroup.java | 54 ++-
.../calcite/metadata/FragmentDescription.java | 5 +-
.../calcite/schema/CacheTableDescriptorImpl.java | 23 +-
.../dht/topology}/PartitionReservation.java | 2 +-
.../dht/topology}/PartitionReservationKey.java | 2 +-
.../dht/topology/PartitionReservationManager.java | 443 +++++++++++++++++++++
.../processors/query/GridQueryProcessor.java | 15 +
.../processors/query/h2/IgniteH2Indexing.java | 8 +-
.../processors/query/h2/opt/QueryContext.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 1 +
.../h2/twostep/PartitionReservationManager.java | 375 -----------------
.../internal/processors/query/KillQueryTest.java | 4 +-
.../h2/twostep/RetryCauseMessageSelfTest.java | 2 +
16 files changed, 582 insertions(+), 454 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index a8126fbce4d..1f46ed0d2da 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -18,17 +18,18 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.typedef.F;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
/** */
public abstract class AbstractCacheScan<Row> implements Iterable<Row>,
AutoCloseable {
@@ -45,15 +46,35 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
protected final int[] parts;
/** */
- protected volatile List<GridDhtLocalPartition> reserved;
+ protected final boolean explicitParts;
+
+ /** */
+ private PartitionReservation reservation;
+
+ /** */
+ protected volatile List<GridDhtLocalPartition> reservedParts;
/** */
AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx,
int[] parts) {
this.ectx = ectx;
this.cctx = cctx;
- this.parts = parts;
topVer = ectx.topologyVersion();
+
+ explicitParts = parts != null;
+
+ if (cctx.isReplicated())
+ this.parts = IntStream.range(0,
cctx.affinity().partitions()).toArray();
+ else {
+ if (parts != null)
+ this.parts = parts;
+ else {
+ Collection<Integer> primaryParts =
cctx.affinity().primaryPartitions(
+ cctx.kernalContext().localNodeId(), topVer);
+
+ this.parts =
primaryParts.stream().mapToInt(Integer::intValue).toArray();
+ }
+ }
}
/** {@inheritDoc} */
@@ -80,7 +101,7 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
/** */
private synchronized void reserve() {
- if (reserved != null)
+ if (reservation != null)
return;
GridDhtPartitionTopology top = cctx.topology();
@@ -98,61 +119,42 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
throw new ClusterTopologyException("Topology was changed. Please
retry on stable topology.");
}
- List<GridDhtLocalPartition> toReserve;
-
- if (cctx.isReplicated()) {
- int partsCnt = cctx.affinity().partitions();
-
- toReserve = new ArrayList<>(partsCnt);
-
- for (int i = 0; i < partsCnt; i++)
- toReserve.add(top.localPartition(i));
- }
- else if (cctx.isPartitioned()) {
- assert parts != null;
+ try {
+ PartitionReservation reservation;
- toReserve = new ArrayList<>(parts.length);
+ try {
+ reservation =
cctx.kernalContext().query().partitionReservationManager().reservePartitions(
+ cctx, topVer, explicitParts ? parts : null,
ectx.originatingNodeId(), "qryId=" + ectx.queryId());
+ }
+ catch (IgniteCheckedException e) {
+ throw new ClusterTopologyException("Failed to reserve
partition for query execution", e);
+ }
- for (int i = 0; i < parts.length; i++)
- toReserve.add(top.localPartition(parts[i]));
- }
- else
- toReserve = Collections.emptyList();
+ if (reservation.failed()) {
+ reservation.release();
- List<GridDhtLocalPartition> reserved = new
ArrayList<>(toReserve.size());
+ throw new ClusterTopologyException(reservation.error());
+ }
- try {
- for (GridDhtLocalPartition part : toReserve) {
- if (part == null || !part.reserve())
- throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
- else if (part.state() != GridDhtPartitionState.OWNING) {
- part.release();
+ this.reservation = reservation;
- throw new ClusterTopologyException("Failed to reserve
partition for query execution. Retry on stable topology.");
- }
+ List<GridDhtLocalPartition> reservedParts = new
ArrayList<>(parts.length);
- reserved.add(part);
- }
- }
- catch (Exception e) {
- release();
+ for (int i = 0; i < parts.length; i++)
+ reservedParts.add(top.localPartition(parts[i]));
- throw e;
+ this.reservedParts = reservedParts;
}
finally {
- this.reserved = reserved;
-
top.readUnlock();
}
}
/** */
private synchronized void release() {
- if (F.isEmpty(reserved))
- return;
-
- reserved.forEach(GridDhtLocalPartition::release);
+ if (reservation != null)
+ reservation.release();
- reserved = null;
+ reservation = null;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 2077996aa09..d61f302b0ab 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -116,7 +116,7 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
txChanges = ectx.transactionChanges(
cctx.cacheId(),
- parts,
+ cctx.isReplicated() ? null : this.parts,
r -> new IndexRowImpl(rowHnd, r),
this::compare
);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index f008f3f6af3..c16252b902a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -74,9 +74,9 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
/** */
private IteratorImpl() {
- assert reserved != null;
+ assert reservedParts != null;
- parts = new ArrayDeque<>(reserved);
+ parts = new ArrayDeque<>(reservedParts);
txChanges = F.isEmpty(ectx.getQryTxEntries())
? TransactionChanges.empty()
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index c5e652a00af..af9fadc5723 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage {
@GridDirectTransient
private List<List<UUID>> assignments;
+ /**
+ * Flag, indacating that assignment is formed by original cache assignment
for given topology.
+ * In case of {@code true} value we can skip assignment marshalling and
calc assignment on remote nodes.
+ */
+ @GridDirectTransient
+ private boolean primaryAssignment;
+
/** Marshalled assignments. */
private int[] marshalledAssignments;
@@ -68,7 +75,7 @@ public class ColocationGroup implements MarshalableMessage {
/** */
public static ColocationGroup forAssignments(List<List<UUID>> assignments)
{
- return new ColocationGroup(null, null, assignments);
+ return new ColocationGroup(null, null, assignments, true);
}
/** */
@@ -100,6 +107,13 @@ public class ColocationGroup implements MarshalableMessage
{
this.assignments = assignments;
}
+ /** */
+ private ColocationGroup(long[] sourceIds, List<UUID> nodeIds,
List<List<UUID>> assignments, boolean primaryAssignment) {
+ this(sourceIds, nodeIds, assignments);
+
+ this.primaryAssignment = primaryAssignment;
+ }
+
/**
* @return Lists of nodes capable to execute a query fragment for what the
mapping is calculated.
*/
@@ -143,10 +157,10 @@ public class ColocationGroup implements
MarshalableMessage {
*/
public ColocationGroup colocate(ColocationGroup other) throws
ColocationMappingException {
long[] srcIds;
- if (this.sourceIds == null || other.sourceIds == null)
- srcIds = U.firstNotNull(this.sourceIds, other.sourceIds);
+ if (sourceIds == null || other.sourceIds == null)
+ srcIds = U.firstNotNull(sourceIds, other.sourceIds);
else
- srcIds = LongStream.concat(Arrays.stream(this.sourceIds),
Arrays.stream(other.sourceIds)).distinct().toArray();
+ srcIds = LongStream.concat(Arrays.stream(sourceIds),
Arrays.stream(other.sourceIds)).distinct().toArray();
List<UUID> nodeIds;
if (this.nodeIds == null || other.nodeIds == null)
@@ -159,6 +173,8 @@ public class ColocationGroup implements MarshalableMessage {
"Replicated query parts are not co-located on all nodes");
}
+ boolean primaryAssignment = this.primaryAssignment ||
other.primaryAssignment;
+
List<List<UUID>> assignments;
if (this.assignments == null || other.assignments == null) {
assignments = U.firstNotNull(this.assignments, other.assignments);
@@ -170,11 +186,14 @@ public class ColocationGroup implements
MarshalableMessage {
for (int i = 0; i < assignments.size(); i++) {
List<UUID> assignment = Commons.intersect(filter,
assignments.get(i));
- if (assignment.isEmpty()) { // TODO check with partition
filters
+ if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map
fragment to location. " +
"Partition mapping is empty [part=" + i + "]");
}
+ if (!assignment.get(0).equals(assignments.get(i).get(0)))
+ primaryAssignment = false;
+
assignments0.add(assignment);
}
@@ -191,14 +210,20 @@ public class ColocationGroup implements
MarshalableMessage {
if (filter != null)
assignment.retainAll(filter);
- if (assignment.isEmpty()) // TODO check with partition filters
- throw new ColocationMappingException("Failed to map
fragment to location. Partition mapping is empty [part=" + i + "]");
+ if (assignment.isEmpty()) {
+ throw new ColocationMappingException("Failed to map
fragment to location. " +
+ "Partition mapping is empty [part=" + i + "]");
+ }
+
+ if (!assignment.get(0).equals(this.assignments.get(i).get(0))
+ ||
!assignment.get(0).equals(other.assignments.get(i).get(0)))
+ primaryAssignment = false;
assignments.add(assignment);
}
}
- return new ColocationGroup(srcIds, nodeIds, assignments);
+ return new ColocationGroup(srcIds, nodeIds, assignments,
primaryAssignment);
}
/** */
@@ -216,7 +241,16 @@ public class ColocationGroup implements MarshalableMessage
{
assignments.add(first != null ? Collections.singletonList(first) :
Collections.emptyList());
}
- return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
+ return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments, primaryAssignment);
+ }
+
+ /** */
+ public ColocationGroup explicitMapping() {
+ if (assignments == null || !primaryAssignment)
+ return this;
+
+ // Make a shallow copy without cacheAssignment flag.
+ return new ColocationGroup(sourceIds, nodeIds, assignments, false);
}
/** */
@@ -359,7 +393,7 @@ public class ColocationGroup implements MarshalableMessage {
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
- if (assignments != null && marshalledAssignments == null) {
+ if (assignments != null && marshalledAssignments == null &&
!primaryAssignment) {
Map<UUID, Integer> nodeIdxs = new HashMap<>();
for (int i = 0; i < nodeIds.size(); i++)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index dfef48e3346..f1dc050aa82 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -192,8 +192,11 @@ public class FragmentDescription implements
MarshalableMessage {
if (mapping != null)
mapping.prepareMarshal(ctx);
- if (target != null)
+ if (target != null) {
+ target = target.explicitMapping();
+
target.prepareMarshal(ctx);
+ }
if (remoteSources0 == null && remoteSources != null) {
remoteSources0 = U.newHashMap(remoteSources.size());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index ab704a6850d..11dca7c96ff 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -599,18 +599,25 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
List<ClusterNode> nodes =
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
List<UUID> nodes0;
- if (!top.rebalanceFinished(topVer)) {
- nodes0 = new ArrayList<>(nodes.size());
+ top.readLock();
- int parts = top.partitions();
+ try {
+ if (!top.rebalanceFinished(topVer)) {
+ nodes0 = new ArrayList<>(nodes.size());
+
+ int parts = top.partitions();
- for (ClusterNode node : nodes) {
- if (isOwner(node.id(), top, parts))
- nodes0.add(node.id());
+ for (ClusterNode node : nodes) {
+ if (isOwner(node.id(), top, parts))
+ nodes0.add(node.id());
+ }
}
+ else
+ nodes0 = Commons.transform(nodes, ClusterNode::id);
+ }
+ finally {
+ top.readUnlock();
}
- else
- nodes0 = Commons.transform(nodes, ClusterNode::id);
return ColocationGroup.forNodes(nodes0);
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
similarity index 96%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
index b5593e58d8e..ae21192819d 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
similarity index 96%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
index 0fad2c4dd09..60911e7ac84 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
new file mode 100644
index 00000000000..fa44fc6a10a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.tracing.MTC;
+import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
+import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE;
+
+/**
+ * Class responsible for partition reservation for queries executed on local
node. Prevents partitions from being
+ * evicted from node during query execution.
+ */
+public class PartitionReservationManager implements PartitionsExchangeAware {
+ /** Special instance of reservable object for REPLICATED caches. */
+ private static final ReplicatedReservable REPLICATED_RESERVABLE = new
ReplicatedReservable();
+
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /**
+ * Group reservations cache. When affinity version is not changed and all
primary partitions must be reserved we get
+ * group reservation from this map instead of create new reservation group.
+ */
+ private final ConcurrentMap<PartitionReservationKey, GridReservable>
reservations = new ConcurrentHashMap<>();
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public PartitionReservationManager(GridKernalContext ctx) {
+ this.ctx = ctx;
+
+ log = ctx.log(PartitionReservationManager.class);
+
+ ctx.cache().context().exchange().registerExchangeAwareComponent(this);
+ }
+
+ /**
+ * @param top Partition topology.
+ * @param partId Partition ID.
+ * @return Partition.
+ */
+ private static GridDhtLocalPartition partition(GridDhtPartitionTopology
top, int partId) {
+ return top.localPartition(partId, NONE, false);
+ }
+
+ /**
+ * @param cacheIds Cache IDs.
+ * @param reqTopVer Topology version from request.
+ * @param explicitParts Explicit partitions list.
+ * @param nodeId Node ID.
+ * @param reqId Request ID.
+ * @return PartitionReservation instance with reservation result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public PartitionReservation reservePartitions(
+ @Nullable List<Integer> cacheIds,
+ AffinityTopologyVersion reqTopVer,
+ int[] explicitParts,
+ UUID nodeId,
+ long reqId
+ ) throws IgniteCheckedException {
+ try (TraceSurroundings ignored =
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
+ assert reqTopVer != null;
+
+ AffinityTopologyVersion topVer =
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
+
+ if (F.isEmpty(cacheIds))
+ return new PartitionReservation(Collections.emptyList());
+
+ Collection<Integer> partIds = partsToCollection(explicitParts);
+
+ List<GridReservable> reserved = new ArrayList<>();
+
+ for (int i = 0; i < cacheIds.size(); i++) {
+ GridCacheContext<?, ?> cctx =
ctx.cache().context().cacheContext(cacheIds.get(i));
+
+ // Cache was not found, probably was not deployed yet.
+ if (cctx == null) {
+ return new PartitionReservation(reserved,
+ String.format("Failed to reserve partitions for query
(cache is not " +
+ "found on local node) [localNodeId=%s,
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
+ ctx.localNodeId(), nodeId, reqId, topVer,
cacheIds.get(i)));
+ }
+
+ if (!cctx.rebalanceEnabled())
+ continue;
+
+ String err = reservePartitions(reserved, cctx, partIds,
topVer, nodeId, "reqId=" + reqId);
+
+ if (err != null)
+ return new PartitionReservation(reserved, err);
+ }
+
+ return new PartitionReservation(reserved);
+ }
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param reqTopVer Topology version from request.
+ * @param explicitParts Explicit partitions list.
+ * @param nodeId Node ID.
+ * @param qryInfo Query info.
+ * @return PartitionReservation instance with reservation result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public PartitionReservation reservePartitions(
+ GridCacheContext<?, ?> cctx,
+ AffinityTopologyVersion reqTopVer,
+ int[] explicitParts,
+ UUID nodeId,
+ String qryInfo
+ ) throws IgniteCheckedException {
+ try (TraceSurroundings ignored =
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
+ assert reqTopVer != null;
+
+ AffinityTopologyVersion topVer =
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
+
+ Collection<Integer> partIds = partsToCollection(explicitParts);
+
+ List<GridReservable> reserved = new ArrayList<>();
+
+ String err = reservePartitions(reserved, cctx, partIds, topVer,
nodeId, qryInfo);
+
+ return new PartitionReservation(reserved, err);
+ }
+ }
+
+ /**
+ * @return Error message or {@code null}.
+ */
+ private @Nullable String reservePartitions(
+ List<GridReservable> reserved,
+ GridCacheContext<?, ?> cctx,
+ @Nullable Collection<Integer> explicitParts,
+ AffinityTopologyVersion topVer,
+ UUID nodeId,
+ String qryInfo
+ ) throws IgniteCheckedException {
+ // For replicated cache topology version does not make sense.
+ PartitionReservationKey grpKey = new
PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
+
+ GridReservable r = reservations.get(grpKey);
+
+ if (explicitParts == null && r != null) // Try to reserve group
partition if any and no explicits.
+ return groupPartitionReservation(reserved, r, cctx, topVer,
nodeId, qryInfo);
+ else { // Try to reserve partitions one by one.
+ int partsCnt = cctx.affinity().partitions();
+
+ if (cctx.isReplicated()) { // Check all the partitions are in
owning state for replicated cache.
+ if (r == null) { // Check only once.
+ GridDhtPartitionTopology top = cctx.topology();
+
+ top.readLock();
+
+ try {
+ for (int p = 0; p < partsCnt; p++) {
+ GridDhtLocalPartition part = partition(top, p);
+
+ // We don't need to reserve partitions because
they will not be evicted in replicated caches.
+ GridDhtPartitionState partState = part != null ?
part.state() : null;
+
+ if (partState != OWNING) {
+ return String.format("Failed to reserve
partitions for " +
+ "query (partition of REPLICATED cache is
not in OWNING state) [" +
+ "localNodeId=%s, rmtNodeId=%s, %s,
affTopVer=%s, cacheId=%s, " +
+ "cacheName=%s, part=%s, partFound=%s,
partState=%s]",
+ ctx.localNodeId(),
+ nodeId,
+ qryInfo,
+ topVer,
+ cctx.cacheId(),
+ cctx.name(),
+ p,
+ (part != null),
+ partState
+ );
+ }
+ }
+ }
+ finally {
+ top.readUnlock();
+ }
+
+ // Mark that we checked this replicated cache.
+ reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE);
+
+ MTC.span().addLog(() -> "Cache partitions were reserved
[cache=" + cctx.name() +
+ ", partitions=[0.." + partsCnt + ']');
+ }
+ }
+ else { // Reserve primary partitions for partitioned cache (if no
explicit given).
+ Collection<Integer> partIds = explicitParts != null ?
explicitParts
+ : cctx.affinity().primaryPartitions(ctx.localNodeId(),
topVer);
+
+ int reservedCnt = 0;
+
+ GridDhtPartitionTopology top = cctx.topology();
+
+ top.readLock();
+
+ try {
+ for (int partId : partIds) {
+ GridDhtLocalPartition part = partition(top, partId);
+
+ GridDhtPartitionState partState = part != null ?
part.state() : null;
+
+ if (partState != OWNING) {
+ if (partState == LOST) {
+ reserved.forEach(GridReservable::release);
+
+ failQueryOnLostData(cctx, part);
+ }
+ else {
+ return String.format("Failed to reserve
partitions " +
+ "for query (partition of PARTITIONED cache
is not found or not in OWNING " +
+ "state) [localNodeId=%s, rmtNodeId=%s, %s,
affTopVer=%s, cacheId=%s, " +
+ "cacheName=%s, part=%s, partFound=%s,
partState=%s]",
+ ctx.localNodeId(),
+ nodeId,
+ qryInfo,
+ topVer,
+ cctx.cacheId(),
+ cctx.name(),
+ partId,
+ (part != null),
+ partState
+ );
+ }
+ }
+
+ if (!part.reserve()) {
+ return String.format("Failed to reserve partitions
for query " +
+ "(partition of PARTITIONED cache cannot be
reserved) [" +
+ "localNodeId=%s, rmtNodeId=%s, %s,
affTopVer=%s, cacheId=%s, " +
+ "cacheName=%s, part=%s, partFound=%s,
partState=%s]",
+ ctx.localNodeId(),
+ nodeId,
+ qryInfo,
+ topVer,
+ cctx.cacheId(),
+ cctx.name(),
+ partId,
+ true,
+ partState
+ );
+ }
+
+ reserved.add(part);
+
+ reservedCnt++;
+ }
+ }
+ finally {
+ top.readUnlock();
+ }
+
+ MTC.span().addLog(() -> "Cache partitions were reserved
[cache=" + cctx.name() +
+ ", partitions=" + partIds + ", topology=" + topVer + ']');
+
+ if (explicitParts == null && reservedCnt > 0) {
+ // We reserved all the primary partitions for cache,
attempt to add group reservation.
+ GridDhtPartitionsReservation grp = new
GridDhtPartitionsReservation(topVer, cctx, "SQL");
+
+ synchronized (this) {
+ // Double check under lock.
+ GridReservable grpReservation =
reservations.get(grpKey);
+
+ if (grpReservation != null)
+ return groupPartitionReservation(reserved,
grpReservation, cctx, topVer, nodeId, qryInfo);
+ else {
+ if (grp.register(reserved.subList(reserved.size()
- reservedCnt, reserved.size()))) {
+ reservations.put(grpKey, grp);
+
+ grp.onPublish(new CI1<>() {
+ @Override public void
apply(GridDhtPartitionsReservation r) {
+ reservations.remove(grpKey, r);
+ }
+ });
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public void onCacheStop(String cacheName) {
+ // Drop group reservations.
+ for (PartitionReservationKey grpKey : reservations.keySet()) {
+ if (F.eq(grpKey.cacheName(), cacheName))
+ reservations.remove(grpKey);
+ }
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param part Partition.
+ */
+ private static void failQueryOnLostData(
+ GridCacheContext<?, ?> cctx,
+ GridDhtLocalPartition part
+ ) throws IgniteCheckedException {
+ throw new CacheInvalidStateException("Failed to execute query because
cache partition has been " +
+ "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
+ }
+
+ /**
+ * Cleanup group reservations cache on change affinity version.
+ */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ // Must not do anything at the exchange thread. Dispatch to the
management thread pool.
+ ctx.closure().runLocal(
+ new GridPlainRunnable() {
+ @Override public void run() {
+ AffinityTopologyVersion topVer =
ctx.cache().context().exchange()
+
.lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+ reservations.forEach((key, r) -> {
+ if (r != REPLICATED_RESERVABLE &&
!F.eq(key.topologyVersion(), topVer)) {
+ assert r instanceof
GridDhtPartitionsReservation;
+
+ ((GridDhtPartitionsReservation)r).invalidate();
+ }
+ });
+ }
+ },
+ GridIoPolicy.MANAGEMENT_POOL);
+ }
+ catch (Throwable e) {
+ log.error("Unexpected exception on start reservations cleanup.");
+ ctx.failure().process(new FailureContext(CRITICAL_ERROR, e));
+ }
+ }
+
+ /** */
+ private static Collection<Integer> partsToCollection(int[] explicitParts) {
+ if (explicitParts == null)
+ return null;
+ else if (explicitParts.length == 0)
+ return Collections.emptyList();
+ else {
+ List<Integer> partIds = new ArrayList<>(explicitParts.length);
+
+ for (int explicitPart : explicitParts)
+ partIds.add(explicitPart);
+
+ return partIds;
+ }
+ }
+
+ /** */
+ private String groupPartitionReservation(
+ List<GridReservable> reserved,
+ GridReservable grpReservation,
+ GridCacheContext<?, ?> cctx,
+ AffinityTopologyVersion topVer,
+ UUID nodeId,
+ String qryInfo
+ ) {
+ if (grpReservation != REPLICATED_RESERVABLE) {
+ if (!grpReservation.reserve()) {
+ return String.format("Failed to reserve partitions for query
(group " +
+ "reservation failed) [localNodeId=%s, rmtNodeId=%s, %s,
affTopVer=%s, cacheId=%s, " +
+ "cacheName=%s]", ctx.localNodeId(), nodeId, qryInfo,
topVer, cctx.cacheId(), cctx.name());
+ }
+
+ reserved.add(grpReservation);
+
+ MTC.span().addLog(() -> "Cache partitions were reserved " +
grpReservation);
+ }
+
+ return null;
+ }
+
+ /**
+ * Mapper fake reservation object for replicated caches.
+ */
+ private static class ReplicatedReservable implements GridReservable {
+ /** {@inheritDoc} */
+ @Override public boolean reserve() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release() {
+ throw new IllegalStateException();
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 78b47d4f0fd..255910bfcf0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -90,6 +90,7 @@ import
org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
@@ -308,6 +309,9 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
/** Global schema SQL views manager. */
private final SchemaSqlViewManager schemaSqlViewMgr;
+ /** Partition reservation manager. */
+ private final PartitionReservationManager partReservationMgr;
+
/** @see TransactionConfiguration#isTxAwareQueriesEnabled() */
private final boolean txAwareQueriesEnabled;
@@ -331,6 +335,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
schemaSqlViewMgr = new SchemaSqlViewManager(ctx);
+ partReservationMgr = new PartitionReservationManager(ctx);
+
idxProc = ctx.indexProcessor();
idxQryPrc = new IndexQueryProcessor(idxProc);
@@ -1060,6 +1066,11 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
return runningQryMgr;
}
+ /** Partition reservation manager. */
+ public PartitionReservationManager partitionReservationManager() {
+ return partReservationMgr;
+ }
+
/**
* Create type descriptors from schema and initialize indexing for given
cache.<p>
* Use with {@link #busyLock} where appropriate.
@@ -1327,6 +1338,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
try {
if (schemaMgr.clearCacheContext(cacheInfo.cacheContext())) {
+ partReservationMgr.onCacheStop(cacheInfo.name());
+
if (idx != null)
idx.unregisterCache(cacheInfo);
}
@@ -2448,6 +2461,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
schemaMgr.onCacheStopped(cacheName, destroy, clearIdx);
+ partReservationMgr.onCacheStop(cacheName);
+
// Notify indexing.
if (idx != null)
idx.unregisterCache(cacheInfo);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index fa0804b555a..86ee2d79e8a 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,6 +65,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
@@ -97,7 +98,6 @@ import
org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -1565,7 +1565,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
this.ctx = ctx;
- partReservationMgr = new PartitionReservationManager(ctx);
+ partReservationMgr = ctx.query().partitionReservationManager();
connMgr = new ConnectionManager(ctx);
@@ -1839,10 +1839,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
/** {@inheritDoc} */
@Override public void unregisterCache(GridCacheContextInfo<?, ?>
cacheInfo) {
- String cacheName = cacheInfo.name();
-
- partReservationMgr.onCacheStop(cacheName);
-
// Unregister connection.
connMgr.onCacheDestroyed();
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
index 8930a89fd8c..1b0d4511f0f 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import java.util.Objects;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
import
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 4bd579a7eb4..a3b5d3a9c90 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
deleted file mode 100644
index d092c3cd2d8..00000000000
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
-import org.apache.ignite.internal.processors.tracing.MTC;
-import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE;
-
-/**
- * Class responsible for partition reservation for queries executed on local
node. Prevents partitions from being
- * evicted from node during query execution.
- */
-public class PartitionReservationManager implements PartitionsExchangeAware {
- /** Special instance of reservable object for REPLICATED caches. */
- private static final ReplicatedReservable REPLICATED_RESERVABLE = new
ReplicatedReservable();
-
- /** Kernal context. */
- private final GridKernalContext ctx;
-
- /**
- * Group reservations cache. When affinity version is not changed and all
primary partitions must be reserved we get
- * group reservation from this map instead of create new reservation group.
- */
- private final ConcurrentMap<PartitionReservationKey, GridReservable>
reservations = new ConcurrentHashMap<>();
-
- /** Logger. */
- private final IgniteLogger log;
-
- /**
- * Constructor.
- *
- * @param ctx Context.
- */
- public PartitionReservationManager(GridKernalContext ctx) {
- this.ctx = ctx;
-
- log = ctx.log(PartitionReservationManager.class);
-
- ctx.cache().context().exchange().registerExchangeAwareComponent(this);
- }
-
- /**
- * @param cctx Cache context.
- * @param p Partition ID.
- * @return Partition.
- */
- private static GridDhtLocalPartition partition(GridCacheContext<?, ?>
cctx, int p) {
- return cctx.topology().localPartition(p, NONE, false);
- }
-
- /**
- * @param cacheIds Cache IDs.
- * @param reqTopVer Topology version from request.
- * @param explicitParts Explicit partitions list.
- * @param nodeId Node ID.
- * @param reqId Request ID.
- * @return String which is null in case of success or with causeMessage if
failed
- * @throws IgniteCheckedException If failed.
- */
- public PartitionReservation reservePartitions(
- @Nullable List<Integer> cacheIds,
- AffinityTopologyVersion reqTopVer,
- final int[] explicitParts,
- UUID nodeId,
- long reqId
- ) throws IgniteCheckedException {
- try (TraceSurroundings ignored =
MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) {
- assert reqTopVer != null;
-
- AffinityTopologyVersion topVer =
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
-
- if (F.isEmpty(cacheIds))
- return new PartitionReservation(Collections.emptyList());
-
- Collection<Integer> partIds;
-
- if (explicitParts == null)
- partIds = null;
- else if (explicitParts.length == 0)
- partIds = Collections.emptyList();
- else {
- partIds = new ArrayList<>(explicitParts.length);
-
- for (int explicitPart : explicitParts)
- partIds.add(explicitPart);
- }
-
- List<GridReservable> reserved = new ArrayList<>();
-
- for (int i = 0; i < cacheIds.size(); i++) {
- GridCacheContext<?, ?> cctx =
ctx.cache().context().cacheContext(cacheIds.get(i));
-
- // Cache was not found, probably was not deployed yet.
- if (cctx == null) {
- return new PartitionReservation(reserved,
- String.format("Failed to reserve partitions for query
(cache is not " +
- "found on local node) [localNodeId=%s,
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
- ctx.localNodeId(), nodeId, reqId, topVer,
cacheIds.get(i)));
- }
-
- if (!cctx.rebalanceEnabled())
- continue;
-
- // For replicated cache topology version does not make sense.
- final PartitionReservationKey grpKey = new
PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
-
- GridReservable r = reservations.get(grpKey);
-
- if (explicitParts == null && r != null) { // Try to reserve
group partition if any and no explicits.
- if (r != REPLICATED_RESERVABLE) {
- if (!r.reserve())
- return new PartitionReservation(reserved,
- String.format("Failed to reserve partitions
for query (group " +
- "reservation failed) [localNodeId=%s,
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
- "cacheName=%s]", ctx.localNodeId(),
nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
-
- reserved.add(r);
-
- MTC.span().addLog(() -> "Cache partitions were
reserved " + r);
- }
- }
- else { // Try to reserve partitions one by one.
- int partsCnt = cctx.affinity().partitions();
-
- if (cctx.isReplicated()) { // Check all the partitions are
in owning state for replicated cache.
- if (r == null) { // Check only once.
- for (int p = 0; p < partsCnt; p++) {
- GridDhtLocalPartition part = partition(cctx,
p);
-
- // We don't need to reserve partitions because
they will not be evicted in replicated caches.
- GridDhtPartitionState partState = part != null
? part.state() : null;
-
- if (partState != OWNING)
- return new PartitionReservation(reserved,
- String.format("Failed to reserve
partitions for " +
- "query (partition of
REPLICATED cache is not in OWNING state) [" +
- "localNodeId=%s, rmtNodeId=%s,
reqId=%s, affTopVer=%s, cacheId=%s, " +
- "cacheName=%s, part=%s,
partFound=%s, partState=%s]",
- ctx.localNodeId(),
- nodeId,
- reqId,
- topVer,
- cacheIds.get(i),
- cctx.name(),
- p,
- (part != null),
- partState
- ));
- }
-
- // Mark that we checked this replicated cache.
- reservations.putIfAbsent(grpKey,
REPLICATED_RESERVABLE);
-
- MTC.span().addLog(() -> "Cache partitions were
reserved [cache=" + cctx.name() +
- ", partitions=[0.." + partsCnt + ']');
- }
- }
- else { // Reserve primary partitions for partitioned cache
(if no explicit given).
- if (explicitParts == null)
- partIds =
cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
-
- int reservedCnt = 0;
-
- for (int partId : partIds) {
- GridDhtLocalPartition part = partition(cctx,
partId);
-
- GridDhtPartitionState partState = part != null ?
part.state() : null;
-
- if (partState != OWNING) {
- if (partState == LOST)
- failQueryOnLostData(cctx, part);
- else {
- return new PartitionReservation(reserved,
- String.format("Failed to reserve
partitions " +
- "for query (partition of
PARTITIONED cache is not found or not in OWNING " +
- "state) [localNodeId=%s,
rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
- "cacheName=%s, part=%s,
partFound=%s, partState=%s]",
- ctx.localNodeId(),
- nodeId,
- reqId,
- topVer,
- cacheIds.get(i),
- cctx.name(),
- partId,
- (part != null),
- partState
- ));
- }
- }
-
- if (!part.reserve()) {
- return new PartitionReservation(reserved,
- String.format("Failed to reserve
partitions for query " +
- "(partition of PARTITIONED cache
cannot be reserved) [" +
- "localNodeId=%s, rmtNodeId=%s,
reqId=%s, affTopVer=%s, cacheId=%s, " +
- "cacheName=%s, part=%s,
partFound=%s, partState=%s]",
- ctx.localNodeId(),
- nodeId,
- reqId,
- topVer,
- cacheIds.get(i),
- cctx.name(),
- partId,
- true,
- partState
- ));
- }
-
- reserved.add(part);
-
- reservedCnt++;
-
- // Double check that we are still in owning state
and partition contents are not cleared.
- partState = part.state();
-
- if (partState != OWNING) {
- if (partState == LOST)
- failQueryOnLostData(cctx, part);
- else {
- return new PartitionReservation(reserved,
- String.format("Failed to reserve
partitions for " +
- "query (partition of
PARTITIONED cache is not in OWNING state after " +
- "reservation) [localNodeId=%s,
rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
- "cacheId=%s, cacheName=%s,
part=%s, partState=%s]",
- ctx.localNodeId(),
- nodeId,
- reqId,
- topVer,
- cacheIds.get(i),
- cctx.name(),
- partId,
- partState
- ));
- }
- }
- }
-
- final Collection<Integer> finalPartIds = partIds;
-
- MTC.span().addLog(() -> "Cache partitions were
reserved [cache=" + cctx.name() +
- ", partitions=" + finalPartIds + ", topology=" +
topVer + ']');
-
- if (explicitParts == null && reservedCnt > 0) {
- // We reserved all the primary partitions for
cache, attempt to add group reservation.
- GridDhtPartitionsReservation grp = new
GridDhtPartitionsReservation(topVer, cctx, "SQL");
-
- if (grp.register(reserved.subList(reserved.size()
- reservedCnt, reserved.size()))) {
- if (reservations.putIfAbsent(grpKey, grp) !=
null)
- throw new
IllegalStateException("Reservation already exists.");
-
- grp.onPublish(new
CI1<GridDhtPartitionsReservation>() {
- @Override public void
apply(GridDhtPartitionsReservation r) {
- reservations.remove(grpKey, r);
- }
- });
- }
- }
- }
- }
- }
-
- return new PartitionReservation(reserved);
- }
- }
-
- /**
- * @param cacheName Cache name.
- */
- public void onCacheStop(String cacheName) {
- // Drop group reservations.
- for (PartitionReservationKey grpKey : reservations.keySet()) {
- if (F.eq(grpKey.cacheName(), cacheName))
- reservations.remove(grpKey);
- }
- }
-
- /**
- * @param cctx Cache context.
- * @param part Partition.
- */
- private static void failQueryOnLostData(GridCacheContext cctx,
GridDhtLocalPartition part)
- throws IgniteCheckedException {
- throw new CacheInvalidStateException("Failed to execute query because
cache partition has been " +
- "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
- }
-
- /**
- * Cleanup group reservations cache on change affinity version.
- */
- @Override public void onDoneAfterTopologyUnlock(final
GridDhtPartitionsExchangeFuture fut) {
- try {
- // Must not do anything at the exchange thread. Dispatch to the
management thread pool.
- ctx.closure().runLocal(
- new GridPlainRunnable() {
- @Override public void run() {
- AffinityTopologyVersion topVer =
ctx.cache().context().exchange()
-
.lastAffinityChangedTopologyVersion(fut.topologyVersion());
-
- reservations.forEach((key, r) -> {
- if (r != REPLICATED_RESERVABLE &&
!F.eq(key.topologyVersion(), topVer)) {
- assert r instanceof
GridDhtPartitionsReservation;
-
- ((GridDhtPartitionsReservation)r).invalidate();
- }
- });
- }
- },
- GridIoPolicy.MANAGEMENT_POOL);
- }
- catch (Throwable e) {
- log.error("Unexpected exception on start reservations cleanup.");
- ctx.failure().process(new FailureContext(CRITICAL_ERROR, e));
- }
- }
-
- /**
- * Mapper fake reservation object for replicated caches.
- */
- private static class ReplicatedReservable implements GridReservable {
- /** {@inheritDoc} */
- @Override public boolean reserve() {
- throw new IllegalStateException();
- }
-
- /** {@inheritDoc} */
- @Override public void release() {
- throw new IllegalStateException();
- }
- }
-}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index 0c30057fd8f..70847fabff3 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -69,11 +69,11 @@ import
org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
-import
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
import
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index 0f5491934e9..7b3c83e5bc9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -38,6 +38,8 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationKey;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;