http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 9ef9c8d..cd731cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectMap; @@ -62,11 +63,27 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Partitions update counters. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs; + private IgniteDhtPartitionCountersMap partCntrs; /** Serialized partitions counters. */ private byte[] partCntrsBytes; + /** Partitions history suppliers. */ + @GridToStringInclude + @GridDirectTransient + private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers; + + /** Serialized partitions history suppliers. */ + private byte[] partHistSuppliersBytes; + + /** Partitions that must be cleared and re-loaded. */ + @GridToStringInclude + @GridDirectTransient + private IgniteDhtPartitionsToReloadMap partsToReload; + + /** Serialized partitions that must be cleared and re-loaded. */ + private byte[] partsToReloadBytes; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -96,12 +113,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa */ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, - @NotNull AffinityTopologyVersion topVer) { + @NotNull AffinityTopologyVersion topVer, + @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable IgniteDhtPartitionsToReloadMap partsToReload) { super(id, lastVer); assert id == null || topVer.equals(id.topologyVersion()); this.topVer = topVer; + this.partHistSuppliers = partHistSuppliers; + this.partsToReload = partsToReload; } /** @@ -159,10 +180,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa */ public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) - partCntrs = new HashMap<>(); + partCntrs = new IgniteDhtPartitionCountersMap(); - if (!partCntrs.containsKey(cacheId)) - partCntrs.put(cacheId, cntrMap); + partCntrs.putIfAbsent(cacheId, cntrMap); } /** @@ -171,9 +191,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa */ @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId); - - return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); + return partCntrs.get(cacheId); } return Collections.emptyMap(); @@ -182,6 +200,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * */ + public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { + if (partHistSuppliers == null) + return IgniteDhtPartitionHistorySuppliersMap.empty(); + + return partHistSuppliers; + } + + public Set<Integer> partsToReload(UUID nodeId, int cacheId) { + if (partsToReload == null) + return Collections.emptySet(); + + return partsToReload.get(nodeId, cacheId); + } + + /** + * + */ public Map<UUID, Exception> getExceptionsMap() { return exs; } @@ -199,11 +234,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || + (partHistSuppliers != null && partHistSuppliersBytes == null) || + (partsToReload != null && partsToReloadBytes == null) || (exs != null && exsBytes == null); if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; + byte[] partHistSuppliersBytes0 = null; + byte[] partsToReloadBytes0 = null; byte[] exsBytes0 = null; if (parts != null && partsBytes == null) @@ -212,6 +251,12 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partCntrs != null && partCntrsBytes == null) partCntrsBytes0 = U.marshal(ctx, partCntrs); + if (partHistSuppliers != null && partHistSuppliersBytes == null) + partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers); + + if (partsToReload != null && partsToReloadBytes == null) + partsToReloadBytes0 = U.marshal(ctx, partsToReload); + if (exs != null && exsBytes == null) exsBytes0 = U.marshal(ctx, exs); @@ -221,10 +266,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa try { byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); + byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); + byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); byte[] exsBytesZip = U.zip(exsBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; + partHistSuppliersBytes0 = partHistSuppliersBytesZip; + partsToReloadBytes0 = partsToReloadBytesZip; exsBytes0 = exsBytesZip; compressed(true); @@ -236,6 +285,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; + partHistSuppliersBytes = partHistSuppliersBytes0; + partsToReloadBytes = partsToReloadBytes0; exsBytes = exsBytes0; } } @@ -302,10 +353,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partHistSuppliersBytes != null && partHistSuppliers == null) { + if (compressed()) + partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partHistSuppliers = U.unmarshal(ctx, partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + if (partsToReloadBytes != null && partsToReload == null) { + if (compressed()) + partsToReload = U.unmarshalZip(ctx.marshaller(), partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (partCntrs == null) - partCntrs = new HashMap<>(); + partCntrs = new IgniteDhtPartitionCountersMap(); - if (exsBytes != null && exs == null){ + if (exsBytes != null && exs == null) { if (compressed()) exs = U.unmarshalZip(ctx.marshaller(), exsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else @@ -350,12 +415,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa writer.incrementState(); case 9: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + return false; + + writer.incrementState(); + + case 12: if (!writer.writeMessage("topVer", topVer)) return false; @@ -402,7 +479,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 9: - partsBytes = reader.readByteArray("partsBytes"); + partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); if (!reader.isLastRead()) return false; @@ -410,6 +487,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 10: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -428,9 +521,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } //todo + /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 13; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 2f16c8c..8df7466 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -65,6 +65,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions counters. */ private byte[] partCntrsBytes; + /** Partitions history reservation counters. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partHistCntrs; + + /** Serialized partitions history reservation counters. */ + private byte[] partHistCntrsBytes; + /** Exception. */ @GridToStringInclude @GridDirectTransient @@ -159,6 +167,43 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @param cacheId Cache ID. + * @param cntrMap Partition history counters. + */ + public void partitionHistoryCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (cntrMap.isEmpty()) + return; + + if (partHistCntrs == null) + partHistCntrs = new HashMap<>(); + + partHistCntrs.put(cacheId, cntrMap); + } + + /** + * @param cntrMap Partition history counters. + */ + public void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) { + for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) { + partitionHistoryCounters(e.getKey(), e.getValue()); + } + } + + /** + * @param cacheId Cache ID. + * @return Partition history counters. + */ + public Map<Integer, Long> partitionHistoryCounters(int cacheId) { + if (partHistCntrs != null) { + Map<Integer, Long> res = partHistCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } + + return Collections.emptyMap(); + } + + /** * @return Local partitions. */ public Map<Integer, GridDhtPartitionMap2> partitions() { @@ -189,11 +234,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || + (partHistCntrs != null && partHistCntrsBytes == null) || (ex != null && exBytes == null); if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; + byte[] partHistCntrsBytes0 = null; byte[] exBytes0 = null; if (parts != null && partsBytes == null) @@ -202,6 +249,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partCntrs != null && partCntrsBytes == null) partCntrsBytes0 = U.marshal(ctx, partCntrs); + if (partHistCntrs != null && partHistCntrsBytes == null) + partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); + if (ex != null && exBytes == null) exBytes0 = U.marshal(ctx, ex); @@ -211,10 +261,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes try { byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); + byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0); byte[] exBytesZip = U.zip(exBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; + partHistCntrsBytes0 = partHistCntrsBytesZip; exBytes0 = exBytesZip; compressed(true); @@ -226,6 +278,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; + partHistCntrsBytes = partHistCntrsBytes0; exBytes = exBytes0; } } @@ -248,6 +301,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partHistCntrsBytes != null && partHistCntrs == null) { + if (compressed()) + partHistCntrs = U.unmarshalZip(ctx.marshaller(), partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (exBytes != null && ex == null) { if (compressed()) ex = U.unmarshalZip(ctx.marshaller(), exBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); @@ -316,6 +376,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 10: + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -370,6 +436,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 10: + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -388,9 +462,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } //todo add ex + /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 8ada4b3..55f6bcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -75,6 +75,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** @@ -235,6 +236,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { top = null; } + /** * @return Node stop exception. */ @@ -270,8 +272,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assert exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", cache=" + cctx.name() + - ", topVer=" + top.topologyVersion() + ']'; + ", cache=" + cctx.name() + + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); @@ -290,47 +292,95 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { // If partition belongs to local node. if (cctx.affinity().partitionLocalNode(p, topVer)) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); + GridDhtLocalPartition part = top.localPartition(p, topVer, true, true); assert part != null; assert part.id() == p; - if (part.state() != MOVING) { - if (log.isDebugEnabled()) - log.debug("Skipping partition assignment (state is not MOVING): " + part); - - continue; // For. - } + ClusterNode histSupplier = null; - Collection<ClusterNode> picked = pickedOwners(p, topVer); + if (cctx.shared().database().persistenceEnabled()) { + UUID nodeId = exchFut.partitionHistorySupplier(cctx.cacheId(), p); - if (picked.isEmpty()) { - top.own(part); + if (nodeId != null) + histSupplier = cctx.discovery().node(nodeId); + } - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + if (histSupplier != null) { + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); + continue; // For. } - if (log.isDebugEnabled()) - log.debug("Owning partition as there are no other owners: " + part); - } - else { - ClusterNode n = F.rand(picked); + assert cctx.shared().database().persistenceEnabled(); + assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer); - GridDhtPartitionDemandMessage msg = assigns.get(n); + GridDhtPartitionDemandMessage msg = assigns.get(histSupplier); if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage( + assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), exchFut.exchangeId().topologyVersion(), cctx.cacheId())); } - msg.addPartition(p); + msg.addPartition(p, true); + } + else { + if (cctx.shared().database().persistenceEnabled()) { + if (part.state() == RENTING || part.state() == EVICTED) { + try { + part.rent(false).get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Error while clearing outdated local partition", e); + } + + part = top.localPartition(p, topVer, true); + + assert part != null; + } + } + + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); + + continue; // For. + } + + Collection<ClusterNode> picked = pickedOwners(p, topVer); + + if (picked.isEmpty()) { + top.own(part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); + } + + if (log.isDebugEnabled()) + log.debug("Owning partition as there are no other owners: " + part); + } + else { + ClusterNode n = F.rand(picked); + + GridDhtPartitionDemandMessage msg = assigns.get(n); + + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + cctx.cacheId())); + } + + msg.addPartition(p, false); + } } } } @@ -379,7 +429,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { + @Override public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { if (!enterBusy()) return; @@ -399,7 +449,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { + @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { if (!enterBusy()) return; @@ -789,7 +839,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { try { part.tryEvict(); - if (part.state() != EVICTED) + if (part.state() == RENTING) partsToEvict.push(part); } catch (Throwable ex) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java new file mode 100644 index 0000000..9db80ae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * Partition counters map. + */ +public class IgniteDhtPartitionCountersMap implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Map<Integer, Map<Integer, T2<Long, Long>>> map; + + /** + * @param cacheId Cache ID. + * @param cntrMap Counters map. + */ + public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + if (map == null) + map = new HashMap<>(); + + if (!map.containsKey(cacheId)) + map.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Counters map. + */ + public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) { + if (map == null) + map = new HashMap<>(); + + Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId); + + if (cntrMap == null) + return Collections.emptyMap(); + + return cntrMap; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java new file mode 100644 index 0000000..333eb97 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.T2; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteDhtPartitionHistorySuppliersMap implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new IgniteDhtPartitionHistorySuppliersMap(); + + /** */ + private Map<UUID, Map<T2<Integer, Integer>, Long>> map; + + /** + * @return Empty map. + */ + public static IgniteDhtPartitionHistorySuppliersMap empty() { + return EMPTY; + } + + /** + * @param cacheId Cache ID. + * @param partId Partition ID. + * @return Supplier UUID. + */ + @Nullable public synchronized UUID getSupplier(int cacheId, int partId) { + if (map == null) + return null; + + for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet()) { + if (e.getValue().containsKey(new T2<>(cacheId, partId))) + return e.getKey(); + } + + return null; + } + + /** + * @param nodeId Node ID to check. + * @return Reservations for the given node. + */ + @Nullable public synchronized Map<T2<Integer, Integer>, Long> getReservations(UUID nodeId) { + if (map == null) + return null; + + return map.get(nodeId); + } + + /** + * @param nodeId Node ID. + * @param cacheId Cache ID. + * @param partId Partition ID. + * @param cntr Partition counter. + */ + public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) { + Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId); + + if (nodeMap == null) { + nodeMap = new HashMap<>(); + + map.put(nodeId, nodeMap); + } + + nodeMap.put(new T2<>(cacheId, partId), cntr); + } + + /** + * @return {@code True} if empty. + */ + public synchronized boolean isEmpty() { + return map == null || map.isEmpty(); + } + + /** + * @param that Other map to put. + */ + public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap that) { + map = that.map; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java new file mode 100644 index 0000000..2a72e95 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * Partition reload map. + */ +public class IgniteDhtPartitionsToReloadMap implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Map<UUID, Map<Integer, Set<Integer>>> map; + + /** + * @param nodeId Node ID. + * @param cacheId Cache ID. + * @return Collection of partitions to reload. + */ + public synchronized Set<Integer> get(UUID nodeId, int cacheId) { + if (map == null) + return Collections.emptySet(); + + Map<Integer, Set<Integer>> nodeMap = map.get(nodeId); + + if (nodeMap == null) + return Collections.emptySet(); + + Set<Integer> parts = nodeMap.get(cacheId); + + if (parts == null) + return Collections.emptySet(); + + return parts; + } + + /** + * @param nodeId Node ID. + * @param cacheId Cache ID. + * @param partId Partition ID. + */ + public synchronized void put(UUID nodeId, int cacheId, int partId) { + if (map == null) + map = new HashMap<>(); + + Map<Integer, Set<Integer>> nodeMap = map.get(nodeId); + + if (nodeMap == null) { + nodeMap = new HashMap<>(); + + map.put(nodeId, nodeMap); + } + + Set<Integer> parts = nodeMap.get(cacheId); + + if (parts == null) { + parts = new HashSet<>(); + + nodeMap.put(cacheId, parts); + } + + parts.add(partId); + } +}
