Repository: ignite Updated Branches: refs/heads/ignite-5075 f04db168a -> bd0171579
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd017157 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd017157 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd017157 Branch: refs/heads/ignite-5075 Commit: bd017157997163a462218232608d70c3a437d96c Parents: f04db16 Author: sboikov <[email protected]> Authored: Tue May 16 12:42:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 16 12:42:06 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 2 +- .../cache/CacheGroupInfrastructure.java | 170 ++++++++++++++++--- .../processors/cache/GridCacheContext.java | 41 ++++- .../processors/cache/GridCacheEventManager.java | 36 ---- .../GridCachePartitionExchangeManager.java | 22 +-- .../cache/GridCachePreloaderAdapter.java | 3 +- .../dht/GridClientPartitionTopology.java | 5 +- .../distributed/dht/GridDhtLocalPartition.java | 32 ++-- .../dht/GridDhtPartitionTopologyImpl.java | 28 +-- .../dht/preloader/GridDhtPartitionDemander.java | 69 ++------ .../dht/preloader/GridDhtPreloader.java | 25 ++- .../query/h2/database/H2PkHashIndex.java | 1 - 12 files changed, 257 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index bf48cf3..fb6637a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2692,7 +2692,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * TODO IGNTIE-5075: also store list of started caches. + * */ private static class CacheGroupAffinity { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 816993b..ed4ba46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -22,8 +22,10 @@ import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataPageEvictionMode; +import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; @@ -40,13 +42,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; /** @@ -206,31 +211,157 @@ public class CacheGroupInfrastructure { * @param cctx Cache context. */ private void addCacheContext(GridCacheContext cctx) { - assert sharedGroup() || caches.isEmpty(); + synchronized (caches) { + assert sharedGroup() || caches.isEmpty(); - boolean add = caches.add(cctx); + boolean add = caches.add(cctx); - assert add : cctx.name(); + assert add : cctx.name(); + } } /** * @param cctx Cache context. */ private void removeCacheContext(GridCacheContext cctx) { - assert sharedGroup() || caches.size() == 1 : caches.size(); + synchronized (caches) { + assert sharedGroup() || caches.size() == 1 : caches.size(); - boolean rmv = caches.remove(cctx); + boolean rmv = caches.remove(cctx); - assert rmv : cctx.name(); + assert rmv : cctx.name(); + } } /** * @return Cache context if group contains single cache. */ public GridCacheContext singleCacheContext() { - assert !sharedGroup() && caches.size() == 1; + synchronized (caches) { + assert !sharedGroup() && caches.size() == 1; + + return caches.get(0); + } + } + + /** + * + */ + public void unwindUndeploys() { + synchronized (caches) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); - return caches.get(0); + cctx.deploy().unwind(cctx); + } + } + } + + /** + * @param type Event type to check. + * @return {@code True} if given event type should be recorded. + */ + public boolean eventRecordable(int type) { + return ctx.gridEvents().isRecordable(type); + } + + /** + * Adds preloading event. + * + * @param part Partition. + * @param type Event type. + * @param discoNode Discovery node. + * @param discoType Discovery event type. + * @param discoTs Discovery event timestamp. + */ + public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { + assert discoNode != null; + assert type > 0; + assert discoType > 0; + assert discoTs > 0; + + if (!eventRecordable(type)) + LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); + + synchronized (caches) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.recordEvent(type)) { + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache rebalancing event.", + type, + part, + discoNode, + discoType, + discoTs)); + } + } + } + } + /** + * Adds partition unload event. + * + * @param part Partition. + */ + public void addUnloadEvent(int part) { + if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) + LT.warn(log, "Added event without checking if event is recordable: " + + U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); + + synchronized (caches) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache unloading event.", + EVT_CACHE_REBALANCE_PART_UNLOADED, + part, + null, + 0, + 0)); + } + } + } + + public void addCacheEvent( + int part, + KeyCacheObject key, + UUID evtNodeId, + @Nullable IgniteUuid xid, + @Nullable Object lockId, + int type, + @Nullable CacheObject newVal, + boolean hasNewVal, + @Nullable CacheObject oldVal, + boolean hasOldVal, + UUID subjId, + @Nullable String cloClsName, + @Nullable String taskName, + boolean keepBinary + ) { + synchronized (caches) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.events().addEvent(part, + key, + evtNodeId, + xid, + lockId, + type, + newVal, + hasNewVal, + oldVal, + hasOldVal, + subjId, + cloClsName, + taskName, + keepBinary); + } + } } // TODO IGNITE-5075: need separate caches with/without queries? @@ -238,10 +369,6 @@ public class CacheGroupInfrastructure { return QueryUtils.isEnabled(ccfg); } - public boolean started() { - return true; // TODO IGNITE-5075. - } - /** * @return Free List. */ @@ -257,7 +384,6 @@ public class CacheGroupInfrastructure { } /** - * TODO IGNITE-5075: get rid of CacheObjectContext? * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { @@ -418,22 +544,26 @@ public class CacheGroupInfrastructure { * @return {@code True} if group contains caches. */ boolean hasCaches() { - return !caches.isEmpty(); + synchronized (caches) { + return !caches.isEmpty(); + } } /** * @param part Partition ID. */ public void onPartitionEvicted(int part) { - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); + synchronized (caches) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(part); + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(part); - cctx.continuousQueries().onPartitionEvicted(part); + cctx.continuousQueries().onPartitionEvicted(part); - cctx.dataStructures().onPartitionEvicted(part); + cctx.dataStructures().onPartitionEvicted(part); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 58a3775..47ff283 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -45,6 +45,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; @@ -56,8 +57,6 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; -import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; -import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -106,9 +105,10 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -249,6 +249,12 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean customAffMapper; + /** Whether {@link EventType#EVT_CACHE_REBALANCE_STARTED} was sent (used only for REPLICATED cache). */ + private volatile boolean rebalanceStartedEvtSent; + + /** Whether {@link EventType#EVT_CACHE_REBALANCE_STOPPED} was sent (used only for REPLICATED cache). */ + private volatile boolean rebalanceStoppedEvtSent; + /** * Empty constructor required for {@link Externalizable}. */ @@ -2066,6 +2072,35 @@ public class GridCacheContext<K, V> implements Externalizable { || (top.partitionState(localNodeId(), part) == OWNING); } + /** + * @param type Event type. + * @return {@code True} if event should be recorded. + */ + public boolean recordEvent(int type) { + if (isReplicated()) { + if (type == EVT_CACHE_REBALANCE_STARTED) { + if (!rebalanceStartedEvtSent) { + rebalanceStartedEvtSent = true; + + return true; + } + else + return false; + } + else if (type == EVT_CACHE_REBALANCE_STOPPED) { + if (!rebalanceStoppedEvtSent) { + rebalanceStoppedEvtSent = true; + + return true; + } + else + return false; + } + } + + return true; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, igniteInstanceName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 687b132..7a5dea6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -368,42 +368,6 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { } /** - * Adds preloading event. - * - * @param part Partition. - * @param type Event type. - * @param discoNode Discovery node. - * @param discoType Discovery event type. - * @param discoTs Discovery event timestamp. - */ - public void addPreloadEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { - assert discoNode != null; - assert type > 0; - assert discoType > 0; - assert discoTs > 0; - - if (!cctx.events().isRecordable(type)) - LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); - - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(), - "Cache rebalancing event.", type, part, discoNode, discoType, discoTs)); - } - - /** - * Adds partition unload event. - * - * @param part Partition. - */ - public void addUnloadEvent(int part) { - if (!cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) - LT.warn(log, "Added event without checking if event is recordable: " + - U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); - - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(), - "Cache unloading event.", EVT_CACHE_REBALANCE_PART_UNLOADED, part, null, 0, 0)); - } - - /** * @param type Event type. * @return {@code True} if event is recordable. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a666297..7eb6824 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -899,31 +899,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { - boolean ready; + GridAffinityAssignmentCache affCache = grp.affinity(); - if (exchId != null) { - AffinityTopologyVersion startTopVer = grp.localStartVersion(); - - ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0; - } - else - ready = grp.started(); + GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); - if (ready) { - GridAffinityAssignmentCache affCache = grp.affinity(); - - GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); + assert locMap != null || exchId == null : grp.nameForLog(); + if (locMap != null) { addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey()); - - if (exchId != null) - m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); } + + if (exchId != null) + m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index d005aae..9ca4852 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -122,8 +122,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** {@inheritDoc} */ @Override public void unwindUndeploys() { - // TODO IGNITE-5075. - // cctx.deploy().unwind(cctx); + grp.unwindUndeploys(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 7c2248c..e94415c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -542,7 +542,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { lock.readLock().lock(); try { - assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + if (stopping || node2part == null) + return null; + + assert node2part.valid() : "Invalid node2part [node2part: " + node2part + ", locNodeId=" + cctx.localNodeId() + ", igniteInstanceName=" + cctx.igniteInstanceName() + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 03959c7..ac8316b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -951,8 +951,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements public void clearAll() throws NodeStoppingException { GridCacheVersion clearVer = ctx.versions().next(); - // TODO IGNITE-5075. - boolean rec = grp.shared().gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); Collection<ConcurrentMap<KeyCacheObject, GridCacheMapEntry>> maps = grp.sharedGroup() ? cachesEntryMaps.values() : Collections.singleton(singleCacheEntryMap); @@ -975,21 +974,20 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (!cached.isInternal()) { if (rec) { - // TODO IGNITE-5075. -// cctx.events().addEvent(cached.partition(), -// cached.key(), -// ctx.localNodeId(), -// (IgniteUuid)null, -// null, -// EVT_CACHE_REBALANCE_OBJECT_UNLOADED, -// null, -// false, -// cached.rawGet(), -// cached.hasValue(), -// null, -// null, -// null, -// false); + grp.addCacheEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8bb9ba0..4696d27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -661,14 +661,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed = true; -// TODO IGNITE-5075. -// if (ctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { -// DiscoveryEvent discoEvt = exchFut.discoveryEvent(); -// -// cctx.events().addPreloadEvent(p, -// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), -// discoEvt.type(), discoEvt.timestamp()); -// } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } if (log.isDebugEnabled()) log.debug("Owned partition: " + locPart); @@ -1471,10 +1472,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } -// TODO: IGNITE-5075. -// if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) -// cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, -// discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + grp.addRebalanceEvent(part, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } } if (plc != PartitionLossPolicy.IGNORE) http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 1a9eb68..fbe0aaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -110,18 +110,6 @@ public class GridDhtPartitionDemander { private final Map<Integer, Object> rebalanceTopics; /** - * Started event sent. - * Make sense for replicated cache only. - */ - private final AtomicBoolean startedEvtSent = new AtomicBoolean(); - - /** - * Stopped event sent. - * Make sense for replicated cache only. - */ - private final AtomicBoolean stoppedEvtSent = new AtomicBoolean(); - - /** * @param grp Ccahe group. */ public GridDhtPartitionDemander(CacheGroupInfrastructure grp) { @@ -254,11 +242,10 @@ public class GridDhtPartitionDemander { * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - // TODO IGNITE-5075. - // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** @@ -293,7 +280,7 @@ public class GridDhtPartitionDemander { if (delay == 0 || force) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent, stoppedEvtSent, cnt); + final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -823,20 +810,11 @@ public class GridDhtPartitionDemander { */ public static class RebalanceFuture extends GridFutureAdapter<Boolean> { /** */ - private static final long serialVersionUID = 1L; - - /** */ private final GridCacheSharedContext<?, ?> ctx; /** */ private final CacheGroupInfrastructure grp; - /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */ - private final AtomicBoolean startedEvtSent; - - /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */ - private final AtomicBoolean stoppedEvtSent; - /** */ private final IgniteLogger log; @@ -860,16 +838,12 @@ public class GridDhtPartitionDemander { * @param assigns Assigns. * @param grp Cache group. * @param log Logger. - * @param startedEvtSent Start event sent flag. - * @param stoppedEvtSent Stop event sent flag. * @param updateSeq Update sequence. */ RebalanceFuture( CacheGroupInfrastructure grp, GridDhtPreloaderAssignments assigns, IgniteLogger log, - AtomicBoolean startedEvtSent, - AtomicBoolean stoppedEvtSent, long updateSeq) { assert assigns != null; @@ -877,8 +851,6 @@ public class GridDhtPartitionDemander { this.topVer = assigns.topologyVersion(); this.grp = grp; this.log = log; - this.startedEvtSent = startedEvtSent; - this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; ctx= grp.shared(); @@ -893,8 +865,6 @@ public class GridDhtPartitionDemander { this.ctx = null; this.grp = null; this.log = null; - this.startedEvtSent = null; - this.stoppedEvtSent = null; this.updateSeq = -1; } @@ -1023,10 +993,8 @@ public class GridDhtPartitionDemander { if (isDone()) return; - // TODO IGNITE-5075. -// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) -// preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, -// exchFut.discoveryEvent()); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent()); T2<Long, Collection<Integer>> t = remaining.get(nodeId); @@ -1057,19 +1025,18 @@ public class GridDhtPartitionDemander { * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - // TODO IGNITE-5075. - // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int type, DiscoveryEvent discoEvt) { - preloadEvent(-1, type, discoEvt); + private void rebalanceEvent(int type, DiscoveryEvent discoEvt) { + rebalanceEvent(-1, type, discoEvt); } /** @@ -1119,26 +1086,16 @@ public class GridDhtPartitionDemander { * */ private void sendRebalanceStartedEvent() { - // TODO IGNITE-5075. -// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && -// (!cctx.isReplicated() || !startedEvtSent.get())) { -// preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); -// -// startedEvtSent.set(true); -// } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); } /** * */ private void sendRebalanceFinishedEvent() { - // TODO IGNITE-5075. -// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && -// (!cctx.isReplicated() || !stoppedEvtSent.get())) { -// preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); -// -// stoppedEvtSent.set(true); -// } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 8dfb4d2..afde2cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -300,14 +300,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (picked.isEmpty()) { top.own(part); -// TODO IGNITE-5075. -// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { -// DiscoveryEvent discoEvt = exchFut.discoveryEvent(); -// -// cctx.events().addPreloadEvent(p, -// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), -// discoEvt.type(), discoEvt.timestamp()); -// } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } if (log.isDebugEnabled()) log.debug("Owning partition as there are no other owners: " + part); @@ -594,9 +595,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { try { top.onEvicted(part, updateSeq); -// TODO IGNITE-5075. -// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) -// cctx.events().addUnloadEvent(part.id()); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) + grp.addUnloadEvent(part.id()); if (updateSeq) ctx.exchange().scheduleResendPartitions(); @@ -696,8 +696,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { demandLock.writeLock().lock(); try { - // TODO IGNITE-5075. - // cctx.deploy().unwind(cctx); + grp.unwindUndeploys(); } finally { demandLock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index 3011afa..9292b5b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -126,7 +126,6 @@ public class H2PkHashIndex extends GridH2IndexBase { @Override public GridH2Row findOne(GridH2Row row) { try { for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) { - // TODO IGNITE-5075. CacheDataRow found = store.find(cctx, row.key); if (found != null)
