IGNITE-7157: Fixed deadlock when partition eviction run in different threads.
Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0cf65adc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0cf65adc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0cf65adc Branch: refs/heads/ignite-zk-ce Commit: 0cf65adc12f355c223f0577618e2223adade057d Parents: fe36e62 Author: Andrey V. Mashenkov <[email protected]> Authored: Mon Dec 11 17:23:37 2017 +0300 Committer: Andrey Gura <[email protected]> Committed: Mon Dec 11 17:23:37 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 475 ++++++++++--------- 1 file changed, 255 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0cf65adc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 7abe09b..abe276f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -809,56 +809,63 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean created = false; - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - loc = locParts.get(p); + lock.writeLock().lock(); - state = loc != null ? loc.state() : null; + try { + loc = locParts.get(p); - boolean belongs = partitionLocalNode(p, topVer); + state = loc != null ? loc.state() : null; - if (loc != null && state == EVICTED) { - try { - loc.rent(false).get(); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + boolean belongs = partitionLocalNode(p, topVer); + + if (loc != null && state == EVICTED) { + try { + loc.rent(false).get(); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } - locParts.set(p, loc = null); + locParts.set(p, loc = null); - if (!belongs) { - throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + - "(often may be caused by inconsistent 'key.hashCode()' implementation) " + - "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']'); + if (!belongs) { + throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']'); + } + } + else if (loc != null && state == RENTING && !showRenting) { + throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + + "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs + + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); } - } - else if (loc != null && state == RENTING && !showRenting) { - throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + - "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs + - ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); - } - if (loc == null) { - if (!belongs) - throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + - "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + - "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']'); + if (loc == null) { + if (!belongs) + throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + + "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']'); - locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - if (updateSeq) - this.updateSeq.incrementAndGet(); + if (updateSeq) + this.updateSeq.incrementAndGet(); - created = true; + created = true; - if (log.isDebugEnabled()) - log.debug("Created local partition: " + loc); + if (log.isDebugEnabled()) + log.debug("Created local partition: " + loc); + } + } + finally { + lock.writeLock().unlock(); } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } if (created && ctx.pageStore() != null) { @@ -1544,120 +1551,127 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; } - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - if (stopping) - return false; - - if (!force) { - if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { - U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + - "lastTopChange=" + lastTopChangeVer + - ", readTopVer=" + readyTopVer + - ", exch=" + exchId.topologyVersion() + ']'); + lock.writeLock().lock(); + try { + if (stopping) return false; + + if (!force) { + if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { + U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + + "lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", exch=" + exchId.topologyVersion() + ']'); + + return false; + } } - } - if (node2part == null) - // Create invalid partition map. - node2part = new GridDhtPartitionFullMap(); + if (node2part == null) + // Create invalid partition map. + node2part = new GridDhtPartitionFullMap(); - GridDhtPartitionMap cur = node2part.get(parts.nodeId()); + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); - if (force) { - if (cur != null && cur.topologyVersion().initialized()) - parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); - } - else if (isStaleUpdate(cur, parts)) { - U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId + - ", curMap=" + cur + - ", newMap=" + parts + ']'); + if (force) { + if (cur != null && cur.topologyVersion().initialized()) + parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); + } + else if (isStaleUpdate(cur, parts)) { + U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId + + ", curMap=" + cur + + ", newMap=" + parts + ']'); - return false; - } + return false; + } - long updateSeq = this.updateSeq.incrementAndGet(); + long updateSeq = this.updateSeq.incrementAndGet(); - node2part.newUpdateSequence(updateSeq); + node2part.newUpdateSequence(updateSeq); - boolean changed = false; + boolean changed = false; - if (cur == null || !cur.equals(parts)) - changed = true; + if (cur == null || !cur.equals(parts)) + changed = true; - node2part.put(parts.nodeId(), parts); + node2part.put(parts.nodeId(), parts); - // During exchange diff is calculated after all messages are received and affinity initialized. - if (exchId == null && !grp.isReplicated()) { - if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { - AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); + // During exchange diff is calculated after all messages are received and affinity initialized. + if (exchId == null && !grp.isReplicated()) { + if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { + AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); - // Add new mappings. - for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { - int p = e.getKey(); + // Add new mappings. + for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { + int p = e.getKey(); - Set<UUID> diffIds = diffFromAffinity.get(p); + Set<UUID> diffIds = diffFromAffinity.get(p); - if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) - && !affAssignment.getIds(p).contains(parts.nodeId())) { - if (diffIds == null) - diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) + && !affAssignment.getIds(p).contains(parts.nodeId())) { + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); - if (diffIds.add(parts.nodeId())) - changed = true; - } - else { - if (diffIds != null && diffIds.remove(parts.nodeId())) { - changed = true; + if (diffIds.add(parts.nodeId())) + changed = true; + } + else { + if (diffIds != null && diffIds.remove(parts.nodeId())) { + changed = true; - if (diffIds.isEmpty()) - diffFromAffinity.remove(p); + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } } } - } - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = diffFromAffinity.get(p); + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = diffFromAffinity.get(p); - if (ids != null && ids.remove(parts.nodeId())) { - changed = true; + if (ids != null && ids.remove(parts.nodeId())) { + changed = true; - if (ids.isEmpty()) - diffFromAffinity.remove(p); + if (ids.isEmpty()) + diffFromAffinity.remove(p); + } } } - } - diffFromAffinityVer = readyTopVer; + diffFromAffinityVer = readyTopVer; + } } - } - if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { - AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); + if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { + AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); - if (exchId == null) - changed |= checkEvictions(updateSeq, aff); + if (exchId == null) + changed |= checkEvictions(updateSeq, aff); - updateRebalanceVersion(aff.assignment()); - } + updateRebalanceVersion(aff.assignment()); + } - consistencyCheck(); + consistencyCheck(); - if (log.isDebugEnabled()) - log.debug("Partition map after single update: " + fullMapString()); + if (log.isDebugEnabled()) + log.debug("Partition map after single update: " + fullMapString()); - if (changed && exchId == null) - ctx.exchange().scheduleResendPartitions(); + if (changed && exchId == null) + ctx.exchange().scheduleResendPartitions(); - return changed; + return changed; + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } @@ -1775,124 +1789,138 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) { - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - if (node2part == null) - return false; + lock.writeLock().lock(); - int parts = grp.affinity().partitions(); + try { + if (node2part == null) + return false; - Set<Integer> lost = new HashSet<>(parts); + int parts = grp.affinity().partitions(); - for (int p = 0; p < parts; p++) - lost.add(p); + Set<Integer> lost = new HashSet<>(parts); - for (GridDhtPartitionMap partMap : node2part.values()) { - for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { - if (e.getValue() == OWNING) { - lost.remove(e.getKey()); + for (int p = 0; p < parts; p++) + lost.add(p); - if (lost.isEmpty()) - break; + for (GridDhtPartitionMap partMap : node2part.values()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { + if (e.getValue() == OWNING) { + lost.remove(e.getKey()); + + if (lost.isEmpty()) + break; + } } } - } - boolean changed = false; + boolean changed = false; - if (!F.isEmpty(lost)) { - PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); + if (!F.isEmpty(lost)) { + PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); - assert plc != null; + assert plc != null; - // Update partition state on all nodes. - for (Integer part : lost) { - long updSeq = updateSeq.incrementAndGet(); + // Update partition state on all nodes. + for (Integer part : lost) { + long updSeq = updateSeq.incrementAndGet(); - GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); + GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); - if (locPart != null) { - boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); + if (locPart != null) { + boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); - if (!marked && locPart.state() == RENTING) - try { - //TODO https://issues.apache.org/jira/browse/IGNITE-6433 - locPart.tryEvict(); - locPart.rent(false).get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to wait for RENTING partition eviction after partition LOST event", - e); - } + if (!marked && locPart.state() == RENTING) + try { + //TODO https://issues.apache.org/jira/browse/IGNITE-6433 + locPart.tryEvict(); + locPart.rent(false).get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to wait for RENTING partition eviction after partition LOST event", + e); + } - if (marked) - updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + if (marked) + updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); - changed |= marked; - } - // Update map for remote node. - else if (plc != PartitionLossPolicy.IGNORE) { - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - if (e.getKey().equals(ctx.localNodeId())) - continue; - - if (e.getValue().get(part) != EVICTED) - e.getValue().put(part, LOST); + changed |= marked; + } + // Update map for remote node. + else if (plc != PartitionLossPolicy.IGNORE) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + if (e.getKey().equals(ctx.localNodeId())) + continue; + + if (e.getValue().get(part) != EVICTED) + e.getValue().put(part, LOST); + } } - } - if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - grp.addRebalanceEvent(part, - EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), - discoEvt.type(), - discoEvt.timestamp()); + if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + grp.addRebalanceEvent(part, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } } + + if (plc != PartitionLossPolicy.IGNORE) + grp.needsRecovery(true); } - if (plc != PartitionLossPolicy.IGNORE) - grp.needsRecovery(true); + return changed; + } + finally { + lock.writeLock().unlock(); } - - return changed; } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } /** {@inheritDoc} */ @Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) { - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - long updSeq = updateSeq.incrementAndGet(); + lock.writeLock().lock(); - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - if (e0.getValue() != LOST) - continue; + try { + long updSeq = updateSeq.incrementAndGet(); - e0.setValue(OWNING); + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + if (e0.getValue() != LOST) + continue; + + e0.setValue(OWNING); - GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false); + GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false); - if (locPart != null && locPart.state() == LOST) { - boolean marked = locPart.own(); + if (locPart != null && locPart.state() == LOST) { + boolean marked = locPart.own(); - if (marked) - updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + if (marked) + updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + } } } - } - checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); + checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); - grp.needsRecovery(false); + grp.needsRecovery(false); + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } } @@ -1930,61 +1958,68 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); - lock.writeLock().lock(); + ctx.database().checkpointReadLock(); try { - GridDhtLocalPartition locPart = locParts.get(p); + lock.writeLock().lock(); - if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { - if (haveHistory) - locPart.moving(); - else { - locPart.rent(false); + try { + GridDhtLocalPartition locPart = locParts.get(p); - locPart.reload(true); + if (locPart != null) { + if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { + if (haveHistory) + locPart.moving(); + else { + locPart.rent(false); - result.add(ctx.localNodeId()); - } + locPart.reload(true); + + result.add(ctx.localNodeId()); + } - U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + ctx.localNodeId() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + - ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + + "[nodeId=" + ctx.localNodeId() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + + ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); + } } - } - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - GridDhtPartitionMap partMap = e.getValue(); + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + GridDhtPartitionMap partMap = e.getValue(); - if (!partMap.containsKey(p)) - continue; + if (!partMap.containsKey(p)) + continue; - if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { - if (haveHistory) - partMap.put(p, MOVING); - else { - partMap.put(p, RENTING); + if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { + if (haveHistory) + partMap.put(p, MOVING); + else { + partMap.put(p, RENTING); - result.add(e.getKey()); - } + result.add(e.getKey()); + } - partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); - if (partMap.nodeId().equals(ctx.localNodeId())) - this.updateSeq.setIfGreater(partMap.updateSequence()); + if (partMap.nodeId().equals(ctx.localNodeId())) + this.updateSeq.setIfGreater(partMap.updateSequence()); - U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + e.getKey() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + - ", partId=" + p + ", haveHistory=" + haveHistory + "]"); + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + + "[nodeId=" + e.getKey() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); + } } - } - if (updateSeq) - node2part = new GridDhtPartitionFullMap(node2part, this.updateSeq.incrementAndGet()); + if (updateSeq) + node2part = new GridDhtPartitionFullMap(node2part, this.updateSeq.incrementAndGet()); + } + finally { + lock.writeLock().unlock(); + } } finally { - lock.writeLock().unlock(); + ctx.database().checkpointReadUnlock(); } return result;
