http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java deleted file mode 100644 index 845d3ed..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; - -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; - -/** - * Reservation mechanism for multiple partitions allowing to do a reservation in one operation. - */ -public class GridDhtPartitionsReservation implements GridReservable { - /** */ - private static final GridDhtLocalPartition[] EMPTY = {}; - - /** */ - private static final CI1<GridDhtPartitionsReservation> NO_OP = new CI1<GridDhtPartitionsReservation>() { - @Override public void apply(GridDhtPartitionsReservation gridDhtPartitionsReservation) { - throw new IllegalStateException(); - } - }; - - /** */ - private final Object appKey; - - /** */ - private final GridCacheContext<?,?> cctx; - - /** */ - private final AffinityTopologyVersion topVer; - - /** */ - private final AtomicReference<GridDhtLocalPartition[]> parts = new AtomicReference<>(); - - /** */ - private final AtomicReference<CI1<GridDhtPartitionsReservation>> unpublish = new AtomicReference<>(); - - /** */ - private final AtomicInteger reservations = new AtomicInteger(); - - /** - * @param topVer AffinityTopologyVersion version. - * @param cctx Cache context. - * @param appKey Application key for reservation. - */ - public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, GridCacheContext<?,?> cctx, Object appKey) { - assert topVer != null; - assert cctx != null; - assert appKey != null; - - this.topVer = topVer; - this.cctx = cctx; - this.appKey = appKey; - } - - /** - * Registers all the given partitions for this reservation. - * - * @param parts Partitions. - * @return {@code true} If registration succeeded and this reservation can be published. - */ - public boolean register(Collection<? extends GridReservable> parts) { - assert !F.isEmpty(parts) : "empty partitions list"; - - GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()]; - - int i = 0; - int prevPart = -1; - boolean sorted = true; // Most probably it is a sorted list. - - for (GridReservable part : parts) { - arr[i] = (GridDhtLocalPartition)part; - - if (sorted) { // Make sure it will be a sorted array. - int id = arr[i].id(); - - if (id <= prevPart) - sorted = false; - - prevPart = id; - } - - i++; - } - - if (!sorted) - Arrays.sort(arr); - - i = 0; - prevPart = -1; - - // Register in correct sort order. - for (GridDhtLocalPartition part : arr) { - if (prevPart == part.id()) - throw new IllegalStateException("Duplicated partitions."); - - prevPart = part.id(); - - if (!part.addReservation(this)) { - if (i != 0) - throw new IllegalStateException( - "Trying to reserve different sets of partitions for the same topology version."); - - return false; - } - - i++; - } - - if (!this.parts.compareAndSet(null, arr)) - throw new IllegalStateException("Partitions can be registered only once."); - - assert reservations.get() != -1 : "all the partitions must be reserved before register, we can't be invalidated"; - - return true; - } - - /** - * Must be called when this reservation is published. - * - * @param unpublish Closure to unpublish this reservation when it will become invalid. - */ - public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) { - assert unpublish != null; - - if (!this.unpublish.compareAndSet(null, unpublish)) - throw new IllegalStateException("Unpublishing closure can be set only once."); - - if (reservations.get() == -1) - unregister(); - } - - /** - * Reserves all the registered partitions. - * - * @return {@code true} If succeeded. - */ - @Override public boolean reserve() { - assert parts.get() != null : "partitions must be registered before the first reserve attempt"; - - for (;;) { - int r = reservations.get(); - - if (r == -1) // Invalidated. - return false; - - assert r >= 0 : r; - - if (reservations.compareAndSet(r, r + 1)) - return true; - } - } - - /** - * @param parts Partitions. - */ - private static void tryEvict(GridDhtLocalPartition[] parts) { - if (parts == null) // Can be not initialized yet. - return ; - - for (GridDhtLocalPartition part : parts) - tryEvict(part); - } - - /** - * @param part Partition. - */ - private static void tryEvict(GridDhtLocalPartition part) { - if (part.state() == RENTING && part.reservations() == 0) - part.tryContinueClearing(); - } - - /** - * Releases all the registered partitions. - */ - @Override public void release() { - for (;;) { - int r = reservations.get(); - - if (r <= 0) - throw new IllegalStateException("Method 'reserve' must be called before 'release'."); - - if (reservations.compareAndSet(r, r - 1)) { - // If it was the last reservation and topology version changed -> attempt to evict partitions. - if (r == 1 && !cctx.kernalContext().isStopping() && - !topVer.equals(cctx.topology().lastTopologyChangeVersion())) - tryEvict(parts.get()); - - return; - } - } - } - - /** - * Unregisters from all the partitions and unpublishes this reservation. - */ - private void unregister() { - GridDhtLocalPartition[] arr = parts.get(); - - // Unregister from partitions. - if (!F.isEmpty(arr) && parts.compareAndSet(arr, EMPTY)) { - // Reverse order makes sure that addReservation on the same topVer - // reservation will fail on the first partition. - for (int i = arr.length - 1; i >= 0; i--) { - GridDhtLocalPartition part = arr[i]; - - part.removeReservation(this); - } - } - - // Unpublish. - CI1<GridDhtPartitionsReservation> u = unpublish.get(); - - if (u != null && u != NO_OP && unpublish.compareAndSet(u, NO_OP)) - u.apply(this); - } - - /** - * Must be checked in {@link GridDhtLocalPartition#tryClear(EvictionContext)}. - * If returns {@code true} this reservation object becomes invalid and partitions - * can be evicted or at least cleared. - * Also this means that after returning {@code true} here method {@link #reserve()} can not - * return {@code true} anymore. - * - * @return {@code true} If this reservation was successfully invalidated because it was not - * reserved and partitions can be evicted. - */ - public boolean invalidate() { - assert parts.get() != null : "all parts must be reserved before registration"; - - int r = reservations.get(); - - assert r >= -1 : r; - - if (r != 0) - return r == -1; - - if (reservations.compareAndSet(0, -1)) { - unregister(); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o; - - return cctx == that.cctx && topVer.equals(that.topVer) && appKey.equals(that.appKey); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - String name = cctx.name(); - - int result = name == null ? 0 : name.hashCode(); - - result = 31 * result + appKey.hashCode(); - result = 31 * result + topVer.hashCode(); - - return result; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java deleted file mode 100644 index 3b99758..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; -import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.lang.IgniteProductVersion; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; - -/** - * Class to validate partitions update counters and cache sizes during exchange process. - */ -public class GridDhtPartitionsStateValidator { - /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */ - private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE = IgniteProductVersion.fromString("2.5.0"); - - /** Cache shared context. */ - private final GridCacheSharedContext<?, ?> cctx; - - /** - * Constructor. - * - * @param cctx Cache shared context. - */ - public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) { - this.cctx = cctx; - } - - /** - * Validates partition states - update counters and cache sizes for all nodes. - * If update counter value or cache size for the same partitions are different on some nodes - * method throws exception with full information about inconsistent partitions. - * - * @param fut Current exchange future. - * @param top Topology to validate. - * @param messages Single messages received from all nodes. - * @throws IgniteCheckedException If validation failed. Exception message contains - * full information about all partitions which update counters or cache sizes are not consistent. - */ - public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut, - GridDhtPartitionTopology top, - Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException { - final Set<UUID> ignoringNodes = new HashSet<>(); - - // Ignore just joined nodes. - for (DiscoveryEvent evt : fut.events().events()) - if (evt.type() == EVT_NODE_JOINED) - ignoringNodes.add(evt.eventNode().id()); - - AffinityTopologyVersion topVer = fut.context().events().topologyVersion(); - - // Validate update counters. - Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes); - if (!result.isEmpty()) - throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result)); - - // For sizes validation ignore also nodes which are not able to send cache sizes. - for (UUID id : messages.keySet()) { - ClusterNode node = cctx.discovery().node(id); - if (node != null && node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0) - ignoringNodes.add(id); - } - - if (!MvccUtils.mvccEnabled(cctx.kernalContext())) { // TODO: Remove "if" clause in IGNITE-9451. - // Validate cache sizes. - result = validatePartitionsSizes(top, messages, ignoringNodes); - - if (!result.isEmpty()) - throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result)); - } - } - - /** - * Checks what partitions from given {@code singleMsg} message should be excluded from validation. - * - * @param top Topology to validate. - * @param nodeId Node which sent single message. - * @param singleMsg Single message. - * @return Set of partition ids should be excluded from validation. - */ - @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) { - CachePartitionPartialCountersMap countersMap = singleMsg.partitionUpdateCounters(top.groupId(), top.partitions()); - Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId()); - - Set<Integer> ignore = null; - - for (int p = 0; p < top.partitions(); p++) { - if (top.partitionState(nodeId, p) != GridDhtPartitionState.OWNING) { - if (ignore == null) - ignore = new HashSet<>(); - - ignore.add(p); - - continue; - } - - int partIdx = countersMap.partitionIndex(p); - long updateCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0; - long size = sizesMap.containsKey(p) ? sizesMap.get(p) : 0; - - // Do not validate partitions with zero update counter and size. - if (updateCounter == 0 && size == 0) { - if (ignore == null) - ignore = new HashSet<>(); - - ignore.add(p); - } - } - - return ignore; - } - - /** - * Validate partitions update counters for given {@code top}. - * - * @param top Topology to validate. - * @param messages Single messages received from all nodes. - * @param ignoringNodes Nodes for what we ignore validation. - * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)). - * If map is empty validation is successful. - */ - Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters( - GridDhtPartitionTopology top, - Map<UUID, GridDhtPartitionsSingleMessage> messages, - Set<UUID> ignoringNodes) { - Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); - - Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>(); - - // Populate counters statistics from local node partitions. - for (GridDhtLocalPartition part : top.currentLocalPartitions()) { - if (part.state() != GridDhtPartitionState.OWNING) - continue; - - if (part.updateCounter() == 0 && part.fullSize() == 0) - continue; - - updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter())); - } - - int partitions = top.partitions(); - - // Then process and validate counters from other nodes. - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) { - UUID nodeId = e.getKey(); - if (ignoringNodes.contains(nodeId)) - continue; - - CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions); - - Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); - - for (int part = 0; part < partitions; part++) { - if (ignorePartitions != null && ignorePartitions.contains(part)) - continue; - - int partIdx = countersMap.partitionIndex(part); - long currentCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0; - - process(invalidPartitions, updateCountersAndNodesByPartitions, part, nodeId, currentCounter); - } - } - - return invalidPartitions; - } - - /** - * Validate partitions cache sizes for given {@code top}. - * - * @param top Topology to validate. - * @param messages Single messages received from all nodes. - * @param ignoringNodes Nodes for what we ignore validation. - * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)). - * If map is empty validation is successful. - */ - Map<Integer, Map<UUID, Long>> validatePartitionsSizes( - GridDhtPartitionTopology top, - Map<UUID, GridDhtPartitionsSingleMessage> messages, - Set<UUID> ignoringNodes) { - Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); - - Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>(); - - // Populate sizes statistics from local node partitions. - for (GridDhtLocalPartition part : top.currentLocalPartitions()) { - if (part.state() != GridDhtPartitionState.OWNING) - continue; - - if (part.updateCounter() == 0 && part.fullSize() == 0) - continue; - - sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize())); - } - - int partitions = top.partitions(); - - // Then process and validate sizes from other nodes. - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) { - UUID nodeId = e.getKey(); - if (ignoringNodes.contains(nodeId)) - continue; - - Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId()); - - Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); - - for (int part = 0; part < partitions; part++) { - if (ignorePartitions != null && ignorePartitions.contains(part)) - continue; - - long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L; - - process(invalidPartitions, sizesAndNodesByPartitions, part, nodeId, currentSize); - } - } - - return invalidPartitions; - } - - /** - * Processes given {@code counter} for partition {@code part} reported by {@code node}. - * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different. - * - * @param invalidPartitions Invalid partitions map. - * @param countersAndNodes Current map of counters and nodes by partitions. - * @param part Processing partition. - * @param node Node id. - * @param counter Counter value reported by {@code node}. - */ - private void process(Map<Integer, Map<UUID, Long>> invalidPartitions, - Map<Integer, T2<UUID, Long>> countersAndNodes, - int part, - UUID node, - long counter) { - T2<UUID, Long> existingData = countersAndNodes.get(part); - - if (existingData == null) - countersAndNodes.put(part, new T2<>(node, counter)); - - if (existingData != null && counter != existingData.get2()) { - if (!invalidPartitions.containsKey(part)) { - Map<UUID, Long> map = new HashMap<>(); - map.put(existingData.get1(), existingData.get2()); - invalidPartitions.put(part, map); - } - - invalidPartitions.get(part).put(node, counter); - } - } - - /** - * Folds given map of invalid partition states to string representation in the following format: - * Part [id]: [consistentId=value*] - * - * Value can be both update counter or cache size. - * - * @param topVer Last topology version. - * @param invalidPartitions Invalid partitions map. - * @return String representation of invalid partitions. - */ - private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, Long>> invalidPartitions) { - SB sb = new SB(); - - NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new TreeMap<>(invalidPartitions); - - for (Map.Entry<Integer, Map<UUID, Long>> p : sortedPartitions.entrySet()) { - sb.a("Part ").a(p.getKey()).a(": ["); - for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) { - Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId(); - sb.a(consistentId).a("=").a(e.getValue()).a(" "); - } - sb.a("] "); - } - - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index f6df80b..4480dae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -52,6 +52,9 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index 2b34a41..ad164e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -46,6 +46,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index a0c9d15..ffa383b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 609bc4a..6662a1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index a76844a..39e0774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -41,10 +41,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index fd9bc77..f5689f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; @@ -60,7 +61,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; /** * http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java deleted file mode 100644 index 780ca91..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java +++ /dev/null @@ -1,569 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.Collection; -import java.util.Comparator; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVICTION_PERMITS; -import static org.apache.ignite.IgniteSystemProperties.getInteger; -import static org.apache.ignite.IgniteSystemProperties.getLong; - -/** - * Class that serves asynchronous part eviction process. - * Multiple partition from group can be evicted at the same time. - */ -public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { - - /** Default eviction progress show frequency. */ - private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. - - /** Eviction progress frequency property name. */ - private static final String SHOW_EVICTION_PROGRESS_FREQ = "SHOW_EVICTION_PROGRESS_FREQ"; - - /** Eviction thread pool policy. */ - private static final byte EVICT_POOL_PLC = GridIoPolicy.SYSTEM_POOL; - - /** Eviction progress frequency in ms. */ - private final long evictionProgressFreqMs = getLong(SHOW_EVICTION_PROGRESS_FREQ, DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS); - - /** */ - private final int confPermits = getInteger(IGNITE_EVICTION_PERMITS, -1); - - /** Next time of show eviction progress. */ - private long nextShowProgressTime; - - private final Map<Integer, GroupEvictionContext> evictionGroupsMap = new ConcurrentHashMap<>(); - - /** Flag indicates that eviction process has stopped. */ - private volatile boolean stop; - - /** Check stop eviction context. */ - private final EvictionContext sharedEvictionContext = () -> stop; - - /** Number of maximum concurrent operations. */ - private volatile int threads; - - /** How many eviction task may execute concurrent. */ - private volatile int permits; - - /** Bucket queue for load balance partitions to the threads via count of partition size. - * Is not thread-safe. - * All method should be called under mux synchronization. - */ - private volatile BucketQueue evictionQueue; - - /** Lock object. */ - private final Object mux = new Object(); - - /** - * Stops eviction process for group. - * - * Method awaits last offered partition eviction. - * - * @param grp Group context. - */ - public void onCacheGroupStopped(CacheGroupContext grp){ - GroupEvictionContext groupEvictionContext = evictionGroupsMap.remove(grp.groupId()); - - if (groupEvictionContext != null){ - groupEvictionContext.stop(); - - groupEvictionContext.awaitFinishAll(); - } - } - - /** - * Adds partition to eviction queue and starts eviction process if permit available. - * - * @param grp Group context. - * @param part Partition to evict. - */ - public void evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part) { - // Check node stop. - if (sharedEvictionContext.shouldStop()) - return; - - GroupEvictionContext groupEvictionContext = evictionGroupsMap.computeIfAbsent( - grp.groupId(), (k) -> new GroupEvictionContext(grp)); - - PartitionEvictionTask evictionTask = groupEvictionContext.createEvictPartitionTask(part); - - if (evictionTask == null) - return; - - int bucket; - - synchronized (mux) { - bucket = evictionQueue.offer(evictionTask); - } - - scheduleNextPartitionEviction(bucket); - } - - /** - * Gets next partition from the queue and schedules it for eviction. - * - * @param bucket Bucket. - */ - private void scheduleNextPartitionEviction(int bucket) { - // Check node stop. - if (sharedEvictionContext.shouldStop()) - return; - - synchronized (mux) { - // Check that we have permits for next operation. - if (permits > 0) { - // If queue is empty not need to do. - if (evictionQueue.isEmpty()) - return; - - // Get task until we have permits. - while (permits >= 0) { - // Get task from bucket. - PartitionEvictionTask evictionTask = evictionQueue.poll(bucket); - - // If bucket empty try get from another. - if (evictionTask == null) { - // Until queue have tasks. - while (!evictionQueue.isEmpty()) { - // Get task from any other bucket. - evictionTask = evictionQueue.pollAny(); - - // Stop iteration if we found task. - if (evictionTask != null) - break; - } - - // If task not found no need to do some. - if (evictionTask == null) - return; - } - - // Print current eviction progress. - showProgress(); - - GroupEvictionContext groupEvictionContext = evictionTask.groupEvictionContext; - - // Check that group or node stopping. - if (groupEvictionContext.shouldStop()) - continue; - - // Get permit for this task. - permits--; - - // Register task future, may need if group or node will be stopped. - groupEvictionContext.taskScheduled(evictionTask); - - evictionTask.finishFut.listen(f -> { - synchronized (mux) { - // Return permit after task completed. - permits++; - } - - // Re-schedule new one task form same bucket. - scheduleNextPartitionEviction(bucket); - }); - - // Submit task to executor. - cctx.kernalContext() - .closure() - .runLocalSafe(evictionTask, EVICT_POOL_PLC); - } - } - } - } - - /** - * Shows progress of eviction. - */ - private void showProgress() { - if (U.currentTimeMillis() >= nextShowProgressTime) { - int size = evictionQueue.size() + 1; // Queue size plus current partition. - - if (log.isInfoEnabled()) - log.info("Eviction in progress [permits=" + permits+ - ", threads=" + threads + - ", groups=" + evictionGroupsMap.keySet().size() + - ", remainingPartsToEvict=" + size + "]"); - - evictionGroupsMap.values().forEach(GroupEvictionContext::showProgress); - - nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; - } - } - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - super.start0(); - - // If property is not setup, calculate permits as parts of sys pool. - if (confPermits == -1) { - int sysPoolSize = cctx.kernalContext().config().getSystemThreadPoolSize(); - - threads = permits = sysPoolSize / 4; - } - else - threads = permits = confPermits; - - // Avoid 0 permits if sys pool size less that 4. - if (threads == 0) - threads = permits = 1; - - log.info("Evict partition permits=" + permits); - - evictionQueue = new BucketQueue(threads); - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - super.stop0(cancel); - - stop = true; - - Collection<GroupEvictionContext> evictionGrps = evictionGroupsMap.values(); - - evictionGrps.forEach(GroupEvictionContext::stop); - - evictionGrps.forEach(GroupEvictionContext::awaitFinishAll); - } - - /** - * - */ - private class GroupEvictionContext implements EvictionContext { - /** */ - private final CacheGroupContext grp; - - /** Deduplicate set partition ids. */ - private final Set<Integer> partIds = new GridConcurrentHashSet<>(); - - /** Future for currently running partition eviction task. */ - private final Map<Integer, IgniteInternalFuture<?>> partsEvictFutures = new ConcurrentHashMap<>(); - - /** Flag indicates that eviction process has stopped for this group. */ - private volatile boolean stop; - - /** Total partition to evict. */ - private AtomicInteger totalTasks = new AtomicInteger(); - - /** Total partition evict in progress. */ - private int taskInProgress; - - /** - * @param grp Group context. - */ - private GroupEvictionContext(CacheGroupContext grp) { - this.grp = grp; - } - - /** {@inheritDoc} */ - @Override public boolean shouldStop() { - return stop || sharedEvictionContext.shouldStop(); - } - - /** - * - * @param part Grid local partition. - */ - private PartitionEvictionTask createEvictPartitionTask(GridDhtLocalPartition part){ - if (shouldStop() || !partIds.add(part.id())) - return null; - - totalTasks.incrementAndGet(); - - return new PartitionEvictionTask(part, this); - } - - /** - * - * @param task Partition eviction task. - */ - private synchronized void taskScheduled(PartitionEvictionTask task) { - if (shouldStop()) - return; - - taskInProgress++; - - GridFutureAdapter<?> fut = task.finishFut; - - int partId = task.part.id(); - - partsEvictFutures.put(partId, fut); - - fut.listen(f -> { - synchronized (this) { - taskInProgress--; - - partsEvictFutures.remove(partId, f); - - if (totalTasks.decrementAndGet() == 0) - evictionGroupsMap.remove(grp.groupId()); - } - }); - } - - /** - * Stop eviction for group. - */ - private void stop() { - stop = true; - } - - /** - * Await evict finish. - */ - private void awaitFinishAll(){ - partsEvictFutures.forEach(this::awaitFinish); - - evictionGroupsMap.remove(grp.groupId()); - } - - /** - * Await evict finish partition. - */ - private void awaitFinish(Integer part, IgniteInternalFuture<?> fut) { - // Wait for last offered partition eviction completion - try { - log.info("Await partition evict, grpName=" + grp.cacheOrGroupName() + - ", grpId=" + grp.groupId() + ", partId=" + part); - - fut.get(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.warning("Failed to await partition eviction during stopping.", e); - } - } - - /** - * Shows progress group of eviction. - */ - private void showProgress() { - if (log.isInfoEnabled()) - log.info("Group eviction in progress [grpName=" + grp.cacheOrGroupName()+ - ", grpId=" + grp.groupId() + - ", remainingPartsToEvict=" + (totalTasks.get() - taskInProgress) + - ", partsEvictInProgress=" + taskInProgress + - ", totalParts= " + grp.topology().localPartitions().size() + "]"); - } - } - - /** - * Task for self-scheduled partition eviction / clearing. - */ - private class PartitionEvictionTask implements Runnable { - /** Partition to evict. */ - private final GridDhtLocalPartition part; - - private final long size; - - /** Eviction context. */ - private final GroupEvictionContext groupEvictionContext; - - /** */ - private final GridFutureAdapter<?> finishFut = new GridFutureAdapter<>(); - - /** - * @param part Partition. - */ - private PartitionEvictionTask( - GridDhtLocalPartition part, - GroupEvictionContext groupEvictionContext - ) { - this.part = part; - this.groupEvictionContext = groupEvictionContext; - - size = part.fullSize(); - } - - /** {@inheritDoc} */ - @Override public void run() { - if (groupEvictionContext.shouldStop()) { - finishFut.onDone(); - - return; - } - - try { - boolean success = part.tryClear(groupEvictionContext); - - if (success) { - if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) - part.destroy(); - } - else // Re-offer partition if clear was unsuccessful due to partition reservation. - evictionQueue.offer(this); - - // Complete eviction future before schedule new to prevent deadlock with - // simultaneous eviction stopping and scheduling new eviction. - finishFut.onDone(); - } - catch (Throwable ex) { - finishFut.onDone(ex); - - if (cctx.kernalContext().isStopping()) { - LT.warn(log, ex, "Partition eviction failed (current node is stopping).", - false, - true); - } - else{ - LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); - } - } - } - } - - /** - * - */ - private class BucketQueue { - /** Queues contains partitions scheduled for eviction. */ - private final Queue<PartitionEvictionTask>[] buckets; - - /** */ - private final long[] bucketSizes; - - /** - * @param buckets Number of buckets. - */ - BucketQueue(int buckets) { - this.buckets = new Queue[buckets]; - - for (int i = 0; i < buckets; i++) - this.buckets[i] = createEvictPartitionQueue(); - - bucketSizes = new long[buckets]; - } - - /** - * Poll eviction task from queue for specific bucket. - * - * @param bucket Bucket index. - * @return Partition evict task, or {@code null} if bucket queue is empty. - */ - PartitionEvictionTask poll(int bucket) { - PartitionEvictionTask task = buckets[bucket].poll(); - - if (task != null) - bucketSizes[bucket] -= task.size; - - return task; - } - - /** - * Poll eviction task from queue (bucket is not specific). - * - * @return Partition evict task. - */ - PartitionEvictionTask pollAny() { - for (int bucket = 0; bucket < bucketSizes.length; bucket++){ - if (!buckets[bucket].isEmpty()) - return poll(bucket); - } - - return null; - } - - /** - * Offer task to queue. - * - * @return Bucket index. - */ - int offer(PartitionEvictionTask task) { - int bucket = calculateBucket(); - - buckets[bucket].offer(task); - - bucketSizes[bucket] += task.size; - - return bucket; - } - - - /** - * @return {@code True} if queue is empty, {@code} False if not empty. - */ - boolean isEmpty(){ - return size() == 0; - } - - /** - * @return Queue size. - */ - int size(){ - int size = 0; - - for (Queue<PartitionEvictionTask> queue : buckets) { - size += queue.size(); - } - - return size; - } - - /*** - * @return Bucket index. - */ - private int calculateBucket() { - int min = 0; - - for (int bucket = min; bucket < bucketSizes.length; bucket++) { - if (bucketSizes[min] > bucketSizes[bucket]) - min = bucket; - } - - return min; - } - - /** - * 0 - PRIORITY QUEUE (compare by partition size). - * default (any other values) - FIFO. - */ - private static final byte QUEUE_TYPE = 1; - - /** - * - * @return Queue for evict partitions. - */ - private Queue<PartitionEvictionTask> createEvictPartitionQueue() { - switch (QUEUE_TYPE) { - case 1: - return new PriorityBlockingQueue<>( - 1000, Comparator.comparingLong(p -> p.part.fullSize())); - default: - return new LinkedBlockingQueue<>(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4c42315..8edefa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -68,8 +68,8 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f6de594..4d5fa13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -50,7 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUn import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 3b03958..b37acf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -40,8 +40,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; @@ -61,8 +61,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 29573cb..40defa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -49,9 +49,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; @@ -78,7 +78,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOAD import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @@ -438,8 +438,8 @@ public class GridDhtPartitionDemander { } if (!ctx.kernalContext().grid().isRebalanceEnabled()) { - if (log.isDebugEnabled()) - log.debug("Cancel partition demand because rebalance disabled on current node."); + if (log.isTraceEnabled()) + log.trace("Cancel partition demand because rebalance disabled on current node."); fut.cancel(); @@ -455,6 +455,8 @@ public class GridDhtPartitionDemander { final CacheConfiguration cfg = grp.config(); + int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize(); + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) { final ClusterNode node = e.getKey(); @@ -467,13 +469,11 @@ public class GridDhtPartitionDemander { parts = fut.remaining.get(node.id()).get2(); - U.log(log, "Starting rebalancing [grp=" + grp.cacheOrGroupName() - + ", mode=" + cfg.getRebalanceMode() + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() - + ", topology=" + fut.topologyVersion() + ", rebalanceId=" + fut.rebalanceId + "]"); + U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName() + + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size() + + ", topVer=" + fut.topologyVersion() + ", parallelism=" + totalStripes + "]"); } - int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize(); - int stripes = totalStripes; final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes); @@ -521,10 +521,11 @@ public class GridDhtPartitionDemander { fut.cleanupRemoteContexts(node.id()); } - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - topicId + " " + demandMsg.rebalanceId() + ", partitions count=" + stripePartitions.get(topicId).size() + - " (" + stripePartitions.get(topicId).partitionsList() + ")]"); + if (log.isInfoEnabled()) + log.info("Started rebalance routine [" + grp.cacheOrGroupName() + + ", supplier=" + node.id() + ", topic=" + topicId + + ", fullPartitions=" + S.compact(stripePartitions.get(topicId).fullSet()) + + ", histPartitions=" + S.compact(stripePartitions.get(topicId).historicalSet()) + "]"); } catch (IgniteCheckedException e1) { ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class); @@ -613,16 +614,15 @@ public class GridDhtPartitionDemander { } if (log.isDebugEnabled()) - log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() - + ", remaining=" + remaining + "]"); + log.debug("Partition is ready for rebalance [grp=" + grp.cacheOrGroupName() + + ", p=" + part.id() + ", remaining=" + remaining + "]"); if (remaining == 0) clearAllFuture.onDone(); } } - else { + else clearAllFuture.onDone(); - } }); } else { @@ -636,10 +636,6 @@ public class GridDhtPartitionDemander { } } - if (log.isDebugEnabled()) - log.debug("Remaining clearing partitions [grp=" + grp.cacheOrGroupName() - + ", remaining=" + remaining + "]"); - if (remaining == 0) clearAllFuture.onDone(); } @@ -660,38 +656,41 @@ public class GridDhtPartitionDemander { * * @param topicId Topic id. * @param nodeId Node id. - * @param supply Supply message. + * @param supplyMsg Supply message. */ public void handleSupplyMessage( int topicId, final UUID nodeId, - final GridDhtPartitionSupplyMessage supply + final GridDhtPartitionSupplyMessage supplyMsg ) { - AffinityTopologyVersion topVer = supply.topologyVersion(); + AffinityTopologyVersion topVer = supplyMsg.topologyVersion(); final RebalanceFuture fut = rebalanceFut; ClusterNode node = ctx.node(nodeId); - if (node == null) - return; + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]"); - if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on). return; + } + + // Topology already changed (for the future that supply message based on). + if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) { + if (log.isDebugEnabled()) + log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]"); - if (!fut.isActual(supply.rebalanceId())) { - // Current future have another rebalance id. - // Supple message based on another future. return; } if (log.isDebugEnabled()) - log.debug("Received supply message [grp=" + grp.cacheOrGroupName() + ", msg=" + supply + ']'); + log.debug("Received supply message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]"); // Check whether there were error during supply message unmarshalling process. - if (supply.classError() != null) { - U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId + - "]. Supply message couldn't be unmarshalled: " + supply.classError()); + if (supplyMsg.classError() != null) { + U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" + + ". Supply message couldn't be unmarshalled: " + supplyMsg.classError()); fut.cancel(nodeId); @@ -699,9 +698,9 @@ public class GridDhtPartitionDemander { } // Check whether there were error during supplying process. - if (supply.error() != null) { - U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId + - "]. Supplier has failed with error: " + supply.error()); + if (supplyMsg.error() != null) { + U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" + + "]. Supplier has failed with error: " + supplyMsg.error()); fut.cancel(nodeId); @@ -713,13 +712,13 @@ public class GridDhtPartitionDemander { if (grp.sharedGroup()) { for (GridCacheContext cctx : grp.caches()) { if (cctx.statisticsEnabled()) { - long keysCnt = supply.keysForCache(cctx.cacheId()); + long keysCnt = supplyMsg.keysForCache(cctx.cacheId()); if (keysCnt != -1) cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt); // Can not be calculated per cache. - cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize()); } } } @@ -727,10 +726,10 @@ public class GridDhtPartitionDemander { GridCacheContext cctx = grp.singleCacheContext(); if (cctx.statisticsEnabled()) { - if (supply.estimatedKeysCount() != -1) - cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); + if (supplyMsg.estimatedKeysCount() != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount()); - cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize()); } } @@ -738,7 +737,7 @@ public class GridDhtPartitionDemander { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); // Preload. - for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { + for (Map.Entry<Integer, CacheEntryInfoCollection> e : supplyMsg.infos().entrySet()) { int p = e.getKey(); if (aff.get(p).contains(ctx.localNode())) { @@ -746,7 +745,7 @@ public class GridDhtPartitionDemander { assert part != null; - boolean last = supply.last().containsKey(p); + boolean last = supplyMsg.last().containsKey(p); if (part.state() == MOVING) { boolean reserved = part.reserve(); @@ -795,7 +794,8 @@ public class GridDhtPartitionDemander { fut.partitionDone(nodeId, p, true); if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + part); + log.debug("Finished rebalancing partition: " + + "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); } } finally { @@ -808,29 +808,31 @@ public class GridDhtPartitionDemander { fut.partitionDone(nodeId, p, false); if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + log.debug("Skipping rebalancing partition (state is not MOVING): " + + "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); } } else { fut.partitionDone(nodeId, p, false); if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + log.debug("Skipping rebalancing partition (affinity changed): " + + "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); } } // Only request partitions based on latest topology version. - for (Integer miss : supply.missed()) { + for (Integer miss : supplyMsg.missed()) { if (aff.get(miss).contains(ctx.localNode())) fut.partitionMissed(nodeId, miss); } - for (Integer miss : supply.missed()) + for (Integer miss : supplyMsg.missed()) fut.partitionDone(nodeId, miss, false); GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - supply.rebalanceId(), - supply.topologyVersion(), + supplyMsg.rebalanceId(), + supplyMsg.topologyVersion(), grp.groupId()); d.timeout(grp.config().getRebalanceTimeout()); @@ -842,18 +844,24 @@ public class GridDhtPartitionDemander { try { ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.config().getRebalanceTimeout()); + + if (log.isDebugEnabled()) + log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]"); } catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) { - log.debug("Node left during rebalancing [grp=" + grp.cacheOrGroupName() + - ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - } + if (log.isDebugEnabled()) + log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + + ", errMsg=" + e.getMessage() + ']'); } } + else { + if (log.isDebugEnabled()) + log.debug("Will not request next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + + ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]"); + } } catch (IgniteSpiException | IgniteCheckedException e) { - LT.error(log, e, "Error during rebalancing [grp=" + grp.cacheOrGroupName() + - ", srcNode=" + node.id() + + LT.error(log, e, "Error during rebalancing [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", err=" + e + ']'); } } @@ -946,6 +954,17 @@ public class GridDhtPartitionDemander { return true; } + /** + * String representation of demand routine. + * + * @param topicId Topic id. + * @param supplier Supplier. + * @param supplyMsg Supply message. + */ + private String demandRoutineInfo(int topicId, UUID supplier, GridDhtPartitionSupplyMessage supplyMsg) { + return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier + ", topic=" + topicId; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionDemander.class, this); @@ -980,6 +999,9 @@ public class GridDhtPartitionDemander { /** Unique (per demander) rebalance id. */ private final long rebalanceId; + /** The number of rebalance routines. */ + private final long routines; + /** * @param grp Cache group. * @param assignments Assignments. @@ -1003,6 +1025,8 @@ public class GridDhtPartitionDemander { remaining.put(k.id(), new T2<>(U.currentTimeMillis(), v.partitions())); }); + this.routines = remaining.size(); + this.grp = grp; this.log = log; this.rebalanceId = rebalanceId; @@ -1020,6 +1044,7 @@ public class GridDhtPartitionDemander { this.grp = null; this.log = null; this.rebalanceId = -1; + this.routines = 0; } /** @@ -1054,7 +1079,8 @@ public class GridDhtPartitionDemander { if (isDone()) return true; - U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']'); + U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() + + ", topVer=" + topologyVersion() + "]"); if (!ctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) @@ -1077,8 +1103,8 @@ public class GridDhtPartitionDemander { if (isDone()) return; - U.log(log, ("Cancelled rebalancing [cache=" + grp.cacheOrGroupName() + - ", fromNode=" + nodeId + ", topology=" + topologyVersion() + + U.log(log, ("Cancelled rebalancing [grp=" + grp.cacheOrGroupName() + + ", supplier=" + nodeId + ", topVer=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); cleanupRemoteContexts(nodeId); @@ -1134,7 +1160,7 @@ public class GridDhtPartitionDemander { } catch (IgniteCheckedException ignored) { if (log.isDebugEnabled()) - log.debug("Failed to send failover context cleanup request to node"); + log.debug("Failed to send failover context cleanup request to node " + nodeId); } } @@ -1166,11 +1192,14 @@ public class GridDhtPartitionDemander { ", part=" + p + ", left=" + parts + "]"; if (parts.isEmpty()) { - U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + - "rebalancing [fromNode=" + nodeId + - ", cacheOrGroup=" + grp.cacheOrGroupName() + - ", topology=" + topologyVersion() + - ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]")); + int remainingRoutines = remaining.size() - 1; + + U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") + + "rebalancing [grp=" + grp.cacheOrGroupName() + + ", supplier=" + nodeId + + ", topVer=" + topologyVersion() + + ", progress=" + (routines - remainingRoutines) + "/" + routines + + ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]")); remaining.remove(nodeId); } @@ -1215,6 +1244,10 @@ public class GridDhtPartitionDemander { if (log.isInfoEnabled()) log.info("Completed rebalance future: " + this); + if (log.isDebugEnabled()) + log.debug("Partitions have been scheduled to resend [reason=" + + "Rebalance is done [grp=" + grp.cacheOrGroupName() + "]"); + ctx.exchange().scheduleResendPartitions(); Collection<Integer> m = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index 28c8c84..e84869d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -27,12 +27,12 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; /** * Partition map.