ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b8bcbca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b8bcbca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b8bcbca Branch: refs/heads/ignite-5075-pds Commit: 0b8bcbca15282f00e57bee6d98f08a243ee95a39 Parents: 927726d Author: sboikov <sboi...@gridgain.com> Authored: Tue May 30 10:48:16 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue May 30 13:10:38 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 11 +- .../processors/cache/GridCacheAdapter.java | 4 +- .../cache/GridCacheConcurrentMap.java | 32 +++- .../cache/GridCacheConcurrentMapImpl.java | 56 +++--- .../processors/cache/GridCacheContext.java | 12 ++ .../cache/GridCacheLocalConcurrentMap.java | 23 +-- .../processors/cache/GridCacheMapEntry.java | 20 +- .../processors/cache/GridNoStorageCacheMap.java | 4 +- .../cache/IgniteCacheOffheapManagerImpl.java | 15 +- .../dht/GridCachePartitionedConcurrentMap.java | 8 +- .../distributed/dht/GridDhtCacheEntry.java | 10 +- .../distributed/dht/GridDhtLocalPartition.java | 91 +++------- .../CacheContinuousQueryEventBuffer.java | 5 +- .../continuous/CacheContinuousQueryHandler.java | 109 ++++++++--- .../CacheContinuousQueryListener.java | 3 +- .../continuous/CacheContinuousQueryManager.java | 5 +- .../query/continuous/CounterSkipContext.java | 17 +- .../processors/cache/IgniteCacheGroupsTest.java | 181 +++++++++++++++++-- ...nuousQueryConcurrentPartitionUpdateTest.java | 137 ++++++++++---- .../yardstick/IgniteBenchmarkArguments.java | 3 + .../cache/IgniteCacheAbstractBenchmark.java | 1 - .../cache/IgnitePutObjectKeyBenchmark.java | 125 +++++++++++++ 22 files changed, 654 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 11efd77..97f9324 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -776,7 +776,8 @@ public class CacheGroupInfrastructure { public void onPartitionCounterUpdate(int cacheId, int part, long cntr, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer, + boolean primary) { assert sharedGroup(); if (isLocal()) @@ -793,15 +794,15 @@ public class CacheGroupInfrastructure { GridCacheContext cctx = contQryCaches.get(i); if (cacheId != cctx.cacheId()) - skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer); + skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary); } - final List<Runnable> entriesC = skipCtx != null ? skipCtx.readyEntries() : null; + final List<Runnable> sndC = skipCtx != null ? skipCtx.sendClosures() : null; - if (entriesC != null) { + if (sndC != null) { ctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - for (Runnable c : entriesC) + for (Runnable c : sndC) c.run(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8bd072b..67be149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -380,7 +380,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param e Map entry. */ public void incrementSize(GridCacheMapEntry e) { - map.incrementPublicSize(e); + map.incrementPublicSize(null, e); } /** @@ -388,7 +388,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param e Map entry. */ public void decrementSize(GridCacheMapEntry e) { - map.decrementPublicSize(e); + map.decrementPublicSize(null, e); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 6a464d0..816f0b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.cache; import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** @@ -82,15 +85,17 @@ public interface GridCacheConcurrentMap { * Increments public size. * * @param e Entry that caused public size change. + * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups). */ - public void incrementPublicSize(GridCacheEntryEx e); + public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e); /** * Decrements public size. * * @param e Entry that caused public size change. + * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups). */ - public void decrementPublicSize(GridCacheEntryEx e); + public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e); /** * @param cacheId Cache ID. @@ -105,4 +110,27 @@ public interface GridCacheConcurrentMap { * @return Set of the mappings contained in this map. */ public Set<GridCacheMapEntry> entrySet(int cacheId, CacheEntryPredicate... filter); + + /** + * + */ + static class CacheMapHolder { + /** */ + public final AtomicInteger size = new AtomicInteger(); + + /** */ + public final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map; + + /** + * @param map Map. + */ + public CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheMapHolder.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index c0b63f7..97dc3a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; @@ -55,9 +54,9 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { - ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false); + CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); - return map != null ? map.get(key) : null; + return hld != null ? hld.map.get(key) : null; } /** {@inheritDoc} */ @@ -67,7 +66,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM KeyCacheObject key, final boolean create, final boolean touch) { - ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false); + CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); GridCacheMapEntry cur = null; GridCacheMapEntry created = null; @@ -80,7 +79,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM try { while (!done) { - GridCacheMapEntry entry = map != null ? map.get(key) : null; + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; created = null; doomed = null; @@ -94,15 +93,18 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM reserved = true; } - if (map == null) - map = entriesMap(ctx.cacheId(), true); + if (hld == null) { + hld = entriesMap(ctx.cacheIdBoxed(), true); + + assert hld != null; + } created0 = factory.create(ctx, topVer, key); } cur = created = created0; - done = map.putIfAbsent(created.key(), created) == null; + done = hld.map.putIfAbsent(created.key(), created) == null; } else done = true; @@ -125,10 +127,10 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM cur = created = created0; - done = map.replace(entry.key(), doomed, created); + done = hld.map.replace(entry.key(), doomed, created); } else - done = map.remove(entry.key(), doomed); + done = hld.map.remove(entry.key(), doomed); } else { cur = entry; @@ -194,13 +196,13 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM } finally { if (reserved) - release(sizeChange, cur); + release(sizeChange, hld, cur); else { if (sizeChange != 0) { assert sizeChange == -1; assert doomed != null; - decrementPublicSize(doomed); + decrementPublicSize(hld, doomed); } } } @@ -211,8 +213,8 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM * @param create Create flag. * @return Map for given cache ID. */ - @Nullable protected abstract ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap( - int cacheId, + @Nullable protected abstract CacheMapHolder entriesMap( + Integer cacheId, boolean create); /** @@ -233,20 +235,20 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM * @param sizeChange Size delta. * @param e Map entry. */ - protected void release(int sizeChange, GridCacheEntryEx e) { + protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { if (sizeChange == 1) - incrementPublicSize(e); + incrementPublicSize(hld, e); else if (sizeChange == -1) - decrementPublicSize(e); + decrementPublicSize(hld, e); } /** {@inheritDoc} */ @Override public boolean removeEntry(final GridCacheEntryEx entry) { GridCacheContext ctx = entry.context(); - ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(ctx.cacheId(), false); + CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); - boolean rmv = map != null ? map.remove(entry.key(), entry) : null; + boolean rmv = hld != null ? hld.map.remove(entry.key(), entry) : null; if (rmv) { if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) { @@ -269,7 +271,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM synchronized (entry) { if (!entry.deleted()) - decrementPublicSize(entry); + decrementPublicSize(hld, entry); } } @@ -278,9 +280,9 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Override public Collection<GridCacheMapEntry> entries(int cacheId, final CacheEntryPredicate... filter) { - ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false); + CacheMapHolder hld = entriesMap(cacheId, false); - if (map == null) + if (hld == null) return Collections.emptyList(); final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() { @@ -289,14 +291,14 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM } }; - return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p); + return F.viewReadOnly(hld.map.values(), F.<GridCacheMapEntry>identity(), p); } /** {@inheritDoc} */ @Override public Set<GridCacheMapEntry> entrySet(int cacheId, final CacheEntryPredicate... filter) { - final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false); + final CacheMapHolder hld = entriesMap(cacheId, false); - if (map == null) + if (hld == null) return Collections.emptySet(); final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() { @@ -307,7 +309,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM return new AbstractSet<GridCacheMapEntry>() { @Override public Iterator<GridCacheMapEntry> iterator() { - return F.iterator0(map.values(), true, p); + return F.iterator0(hld.map.values(), true, p); } @Override public int size() { @@ -320,7 +322,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM GridCacheMapEntry entry = (GridCacheMapEntry)o; - return entry.equals(map.get(entry.key())) && p.apply(entry); + return entry.equals(hld.map.get(entry.key())) && p.apply(entry); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 0388725..ed1fdcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -200,6 +200,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache ID. */ private int cacheId; + /** Cache ID. */ + private Integer cacheIdBoxed; + /** Cache type. */ private CacheType cacheType; @@ -354,6 +357,8 @@ public class GridCacheContext<K, V> implements Externalizable { cacheId = CU.cacheId(cacheName); + cacheIdBoxed = cacheId; + plc = cacheType.ioPolicy(); Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory(); @@ -512,6 +517,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Cache ID. + */ + public Integer cacheIdBoxed() { + return cacheIdBoxed; + } + + /** * @return {@code True} if should use system transactions which are isolated from user transactions. */ public boolean systemTx() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java index d38b3f1..ea1c3eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import org.jsr166.ConcurrentHashMap8; /** @@ -27,13 +25,10 @@ import org.jsr166.ConcurrentHashMap8; */ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { /** */ - private final AtomicInteger pubSize = new AtomicInteger(); - - /** */ private final int cacheId; /** */ - private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entryMap; + private final CacheMapHolder entryMap; /** * @param cacheId Cache ID. @@ -44,16 +39,16 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { super(factory); this.cacheId = cacheId; - this.entryMap = new ConcurrentHashMap8<>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2); + this.entryMap = new CacheMapHolder(new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2)); } /** {@inheritDoc} */ @Override public int internalSize() { - return entryMap.size(); + return entryMap.map.size(); } /** {@inheritDoc} */ - @Override protected ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(int cacheId, boolean create) { + @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) { assert this.cacheId == cacheId; return entryMap; @@ -63,20 +58,20 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { @Override public int publicSize(int cacheId) { assert this.cacheId == cacheId; - return pubSize.get(); + return entryMap.size.get(); } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { assert cacheId == e.context().cacheId(); - pubSize.incrementAndGet(); + entryMap.size.incrementAndGet(); } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { assert cacheId == e.context().cacheId(); - pubSize.decrementAndGet(); + entryMap.size.decrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 727f2ea..0b4aab2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -981,7 +981,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); - updateCntr0 = nextPartitionCounter(topVer); + updateCntr0 = nextPartitionCounter(topVer, tx.local(), updateCntr); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1160,7 +1160,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - updateCntr0 = nextPartitionCounter(topVer); + updateCntr0 = nextPartitionCounter(topVer, tx.local(), updateCntr); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateMetrics(op, metrics); if (lsnrCol != null) { - long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE); + long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null); cctx.continuousQueries().onEntryUpdated( lsnrCol, @@ -1723,7 +1723,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else evtVal = (CacheObject)writeObj; - long updateCntr0 = nextPartitionCounter(topVer); + long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; @@ -2613,7 +2613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr = 0; if (!preload) - updateCntr = nextPartitionCounter(topVer); + updateCntr = nextPartitionCounter(topVer, true, null); if (walEnabled) { cctx.shared().wal().log(new DataRecord(new DataEntry( @@ -2672,7 +2672,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param topVer Topology version for current operation. * @return Update counter. */ - protected long nextPartitionCounter(AffinityTopologyVersion topVer) { + protected long nextPartitionCounter(AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { return 0; } @@ -3578,7 +3578,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridDhtLocalPartition locPart = localPartition(); if (locPart != null) - locPart.incrementPublicSize(this); + locPart.incrementPublicSize(null, this); else cctx.incrementPublicSize(this); } @@ -3590,7 +3590,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridDhtLocalPartition locPart = localPartition(); if (locPart != null) - locPart.decrementPublicSize(this); + locPart.decrementPublicSize(null, this); else cctx.decrementPublicSize(this); } @@ -4395,7 +4395,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ", locNodeId=" + cctx.localNodeId() + ']'; } - long updateCntr0 = entry.nextPartitionCounter(topVer); + long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; @@ -4479,7 +4479,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, entry.key); - long updateCntr0 = entry.nextPartitionCounter(topVer); + long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java index e7eec9b..77a9ba4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java @@ -60,12 +60,12 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap { } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { // No-op. } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 3cd4a5e..26e3f3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1653,18 +1653,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cmp; if (grp.sharedGroup()) { - assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided!"; - assert io.getCacheId(pageAddr, idx) != UNDEFINED_CACHE_ID : "Cache ID is not stored!"; + assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided: " + row; + + int cacheId = io.getCacheId(pageAddr, idx); - cmp = Integer.compare(io.getCacheId(pageAddr, idx), row.cacheId()); + assert cacheId != UNDEFINED_CACHE_ID : "Cache ID is not stored"; + + cmp = Integer.compare(cacheId, row.cacheId()); if (cmp != 0) return cmp; - if(cmp == 0 && row.key() == null) { - assert row.getClass() == SearchRow.class; + if (row.key() == null) { + assert row.getClass() == SearchRow.class : row; - // A search row with a cach ID only is used as a cache bound. + // A search row with a cache ID only is used as a cache bound. // The found position will be shifted until the exact cache bound is found; // See for details: // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound() http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java index 63a47ca..fd8932f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -120,13 +120,13 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { - localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e); + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(hld, e); } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { - localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e); + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(hld, e); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index abda6f2..2e86fb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -93,8 +93,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer) { - return locPart.nextUpdateCounter(cctx.cacheId(), topVer); + @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer, + boolean primary, + @Nullable Long primaryCntr) { + return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, primaryCntr); } /** {@inheritDoc} */ @@ -726,12 +728,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** {@inheritDoc} */ @Override protected void incrementMapPublicSize() { - locPart.incrementPublicSize(this); + locPart.incrementPublicSize(null, this); } /** {@inheritDoc} */ @Override protected void decrementMapPublicSize() { - locPart.decrementPublicSize(this); + locPart.decrementPublicSize(null, this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4a501a2..48fb352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; @@ -131,7 +132,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** */ @GridToStringExclude - private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> singleCacheEntryMap; + private final CacheMapHolder singleCacheEntryMap; /** Remove queue. */ @GridToStringExclude @@ -180,7 +181,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements cacheMaps = new ConcurrentHashMap<>(); } else { - singleCacheEntryMap = createEntriesMap(); + singleCacheEntryMap = new CacheMapHolder(createEntriesMap()); cacheMaps = null; } @@ -215,18 +216,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Runtime.getRuntime().availableProcessors() * 2); } - /** - * @param cacheId Cache ID. - * @return Size counter. - */ - private AtomicInteger cacheSizeCounter(int cacheId) { - assert grp.sharedGroup(); - - CacheMapHolder hld = cacheMapHolder(cacheId, true); - - return hld.size; - } - /** {@inheritDoc} */ @Override public int internalSize() { if (grp.sharedGroup()) { @@ -238,26 +227,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements return size; } - return singleCacheEntryMap.size(); + return singleCacheEntryMap.map.size(); } /** {@inheritDoc} */ - @Override protected ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(int cacheId, boolean create) { - if (grp.sharedGroup()) { - CacheMapHolder hld = cacheMapHolder(cacheId, create); - - return hld != null ? hld.map : null; - } + @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) { + if (grp.sharedGroup()) + return create ? cacheMapHolder(cacheId) : cacheMaps.get(cacheId); return singleCacheEntryMap; } /** * @param cacheId Cache ID. - * @param create Create flag. * @return Map holder. */ - private CacheMapHolder cacheMapHolder(int cacheId, boolean create) { + private CacheMapHolder cacheMapHolder(Integer cacheId) { assert grp.sharedGroup(); CacheMapHolder hld = cacheMaps.get(cacheId); @@ -265,9 +250,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (hld != null) return hld; - if (!create) - return null; - CacheMapHolder old = cacheMaps.putIfAbsent(cacheId, hld = new CacheMapHolder(createEntriesMap())); if (old != null) @@ -420,9 +402,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param ver Version. */ private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { - ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = entriesMap(cacheId, false); + CacheMapHolder hld = entriesMap(cacheId, false); - GridCacheMapEntry entry = map != null ? map.get(key) : null; + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; if (entry != null && entry.markObsoleteVersion(ver)) removeEntry(entry); @@ -561,9 +543,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override protected void release(int sizeChange, GridCacheEntryEx e) { + @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { if (grp.sharedGroup() && sizeChange != 0) - cacheSizeCounter(e.context().cacheId()).addAndGet(sizeChange); + hld.size.addAndGet(sizeChange); release0(sizeChange); } @@ -918,11 +900,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param topVer Topology version for current operation. * @return Next update index. */ - long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer) { + long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { long nextCntr = store.nextUpdateCounter(); if (grp.sharedGroup()) - grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer); + grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary); return nextCntr; } @@ -972,7 +954,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements clear(hld.map, extras, rec); } else - clear(singleCacheEntryMap, extras, rec); + clear(singleCacheEntryMap.map, extras, rec); if (!grp.allowFastEviction()) { GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext().dhtCache().context(); @@ -1146,9 +1128,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { - if (grp.sharedGroup()) - cacheSizeCounter(e.context().cacheId()).incrementAndGet(); + @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context().cacheIdBoxed()); + + hld.size.incrementAndGet(); + } while (true) { long state = this.state.get(); @@ -1159,9 +1145,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { - if (grp.sharedGroup()) - cacheSizeCounter(e.context().cacheId()).decrementAndGet(); + @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context().cacheIdBoxed()); + + hld.size.decrementAndGet(); + } while (true) { long state = this.state.get(); @@ -1303,27 +1293,4 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements return S.toString(RemovedEntryHolder.class, this); } } - - /** - * - */ - static class CacheMapHolder { - /** */ - final AtomicInteger size = new AtomicInteger(); - - /** */ - final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map; - - /** - * @param map Map. - */ - CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) { - this.map = map; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheMapHolder.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 336f650..7a7c045 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -155,9 +155,12 @@ public class CacheContinuousQueryEventBuffer { batch = initBatch(entry.topologyVersion()); if (batch == null || cntr < batch.startCntr) { - if (backup) + if (backup) { backupQ.add(entry); + return null; + } + return entry; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 8d6aa2c..3d56531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -487,19 +487,72 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler onEntryUpdated(evt, primary, false, null); } - @Override public CounterSkipContext skipUpdateCounter(GridCacheContext cctx, - @Nullable CounterSkipContext ctx, + @Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx, + @Nullable CounterSkipContext skipCtx, int part, long cntr, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer, + boolean primary) { CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); - if (ctx == null) - ctx = new CounterSkipContext(part, cntr, topVer); + if (skipCtx == null) + skipCtx = new CounterSkipContext(part, cntr, topVer); - buf.processEntry(ctx.entry(), true); + final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary); - return ctx; + if (entryOrList != null) { + if (loc && asyncCb) { + // TODO + return skipCtx; + } + + skipCtx.addSendClosure(new Runnable() { + @Override public void run() { + try { + if (loc) { + if (entryOrList instanceof CacheContinuousQueryEntry) { + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null, + cctx, + (CacheContinuousQueryEntry)entryOrList); + + handleLocalListener(evt); + } + else { + List<CacheContinuousQueryEntry> list = + (List<CacheContinuousQueryEntry>)entryOrList; + + for (CacheContinuousQueryEntry entry : list) { + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null, + cctx, + entry); + + handleLocalListener(evt); + } + } + } + else { + ctx.continuous().addNotification(nodeId, + routineId, + entryOrList, + topic, + false, + true); + } + } + catch (ClusterTopologyCheckedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), + "Failed to send event notification to node: " + nodeId, ex); + } + } + }); + } + + return skipCtx; } @Override public void onPartitionEvicted(int part) { @@ -759,6 +812,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * @param evt Event. + */ + private void handleLocalListener(CacheContinuousQueryEvent evt) { + CacheContinuousQueryEntry entry = evt.entry(); + + if (!locCache) { + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); + + if (!evts.isEmpty()) + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + } + } + + /** * @param evt Continuous query event. * @param notify Notify flag. * @param loc Listener deployed on this node. @@ -771,24 +845,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (cctx == null) return; - final CacheContinuousQueryEntry entry = evt.entry(); - - if (loc) { - if (!locCache) { - Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); - - if (!evts.isEmpty()) - locLsnr.onUpdated(evts); - - if (!internal && !skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); - } - else { - if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - } - } + if (loc) + handleLocalListener(evt); else { + CacheContinuousQueryEntry entry = evt.entry(); + if (!entry.isFiltered()) prepareEntry(cctx, nodeId, entry); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index fe9c198..5e73840 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -89,7 +89,8 @@ public interface CacheContinuousQueryListener<K, V> { @Nullable CounterSkipContext skipCtx, int part, long cntr, - AffinityTopologyVersion topVer); + AffinityTopologyVersion topVer, + boolean primary); /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 9910955..5156455 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -213,9 +213,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx, int part, long cntr, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer, + boolean primary) { for (CacheContinuousQueryListener lsnr : lsnrs.values()) - skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer); + skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary); return skipCtx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java index 41183c8..747d7d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; @@ -29,7 +30,7 @@ public class CounterSkipContext { private final CacheContinuousQueryEntry entry; /** */ - private List<Runnable> readySendC; + private List<Runnable> sndC; /** * @param part Partition. @@ -61,7 +62,17 @@ public class CounterSkipContext { /** * @return Entries */ - @Nullable public List<Runnable> readyEntries() { - return readySendC; + @Nullable public List<Runnable> sendClosures() { + return sndC; + } + + /** + * @param c Closure send + */ + void addSendClosure(Runnable c) { + if (sndC == null) + sndC = new ArrayList<>(); + + sndC.add(c); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index a6e009d..2076981 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.locks.Lock; import javax.cache.Cache; import javax.cache.CacheException; import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; @@ -62,6 +63,7 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; @@ -78,6 +80,7 @@ import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpir import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.GridPlainCallable; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -400,19 +403,61 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testContinuousQueryTxReplicated() throws Exception { + continuousQuery(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryTxPartitioned() throws Exception { + continuousQuery(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryTxLocal() throws Exception { + continuousQuery(LOCAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryAtomicReplicated() throws Exception { + continuousQuery(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryAtomicPartitioned() throws Exception { + continuousQuery(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryAtomicLocal() throws Exception { + continuousQuery(LOCAL, ATOMIC); + } + + /** * @param cacheMode Cache mode. * @param atomicityMode Cache atomicity mode. * @throws Exception If failed. */ private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { - int keys = 10000; + int keys = 10_000; Integer[] data1 = generateData(keys); Integer[] data2 = generateData(keys); - boolean local = cacheMode == LOCAL; + boolean loc = cacheMode == LOCAL; - if (local) + if (loc) startGrid(0); else startGridsMultiThreaded(4); @@ -422,14 +467,11 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)); srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); - if(!local) - awaitPartitionMapExchange(); - IgniteCache<Integer, Integer> cache1; IgniteCache<Integer, Integer> cache2; if (atomicityMode == TRANSACTIONAL) { - Ignite ignite = ignite(local ? 0 : 1); + Ignite ignite = ignite(loc ? 0 : 1); try (Transaction tx = ignite.transactions().txStart()) { cache1 = ignite.cache(CACHE1); @@ -450,8 +492,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { List<Callable<?>> cls = new ArrayList<>(ldrs * 2); for (int i = 0; i < ldrs ; i++) { - cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1)); - cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2)); + cls.add(putOperation(loc ? 0 : 1, ldrs, i, CACHE1, data1)); + cls.add(putOperation(loc ? 0 : 2, ldrs, i, CACHE2, data2)); } GridTestUtils.runMultiThreaded(cls, "loaders"); @@ -461,7 +503,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Set<Integer> keysSet = sequence(keys); - for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE1).query(qry)) { + for (Cache.Entry<Integer, Integer> entry : ignite(loc ? 0 : 3).cache(CACHE1).query(qry)) { assertTrue(keysSet.remove(entry.getKey())); assertEquals(data1[entry.getKey()], entry.getValue()); } @@ -472,7 +514,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { keysSet = sequence(keys); - for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE2).query(qry)) { + for (Cache.Entry<Integer, Integer> entry : ignite(loc ? 0 : 3).cache(CACHE2).query(qry)) { assertTrue(keysSet.remove(entry.getKey())); assertEquals(data2[entry.getKey()], entry.getValue()); } @@ -485,6 +527,106 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @param atomicityMode Cache atomicity mode. * @throws Exception If failed. */ + private void continuousQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + final int keys = 10_000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + boolean loc = cacheMode == LOCAL; + + if (loc) + startGrid(0); + else + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); + + final AtomicInteger cntr1 = new AtomicInteger(); + final AtomicInteger cntr2 = new AtomicInteger(); + + CacheEntryUpdatedListener lsnr1 = new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts) + cntr1.incrementAndGet(); + } + }; + + CacheEntryUpdatedListener lsnr2 = new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts) + cntr2.incrementAndGet(); + } + }; + + QueryCursor qry1 = ignite(loc ? 0 : 2).cache(CACHE1).query(new ContinuousQuery<>().setLocalListener(lsnr1)); + QueryCursor qry2 = ignite(loc ? 0 : 3).cache(CACHE2).query(new ContinuousQuery<>().setLocalListener(lsnr2)); + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(loc ? 0 : 1); + + try (Transaction tx = ignite.transactions().txStart()) { + IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1); + IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + int ldrs = 4; + + List<Callable<?>> cls = new ArrayList<>(ldrs * 2); + + for (int i = 0; i < ldrs ; i++) { + cls.add(putOperation(loc ? 0 : 1, ldrs, i, CACHE1, data1)); + cls.add(putOperation(loc ? 0 : 2, ldrs, i, CACHE2, data2)); + } + + GridTestUtils.runMultiThreaded(cls, "loaders"); + } + + assertTrue("Expected: [cntr1=" + keys + ", cntr2=" + keys + "] " + + "but was: [cntr1=" + cntr1.get() + ", cntr2=" + cntr2.get() + "]", + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr1.get() == keys && cntr2.get() == keys; + } + }, 2000)); + + qry1.close(); + + Map<Integer, Integer> map = generateDataMap(10); + + srv0.cache(CACHE1).putAll(map); + srv0.cache(CACHE2).putAll(map); + + assertTrue("Expected: <" + keys + 10 + "> but was: <" + cntr2.get() + ">", + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr2.get() == keys + 10; + } + }, 2000)); + + assertEquals(keys, cntr1.get()); + + qry2.close(); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { int keys = 10000; @@ -822,10 +964,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { IgniteCache cache = ignite(idx).cache(cacheName); for (int j = 0, size = data.length; j < size ; j++) { - if (j % ldrs == ldrIdx) { + if (j % ldrs == ldrIdx) cache.put(j, data[j]); - } } + return null; } }; @@ -855,12 +997,23 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @return Map with random integers. */ private Map<Integer, Integer> generateDataMap(int cnt) { + return generateDataMap(0, cnt); + } + + /** + * Creates a map with random integers. + * + * @param startKey Start key. + * @param cnt Map size length. + * @return Map with random integers. + */ + private Map<Integer, Integer> generateDataMap(int startKey, int cnt) { Random rnd = ThreadLocalRandom.current(); Map<Integer, Integer> data = U.newHashMap(cnt); for (int i = 0; i < cnt; i++) - data.put(i, rnd.nextInt()); + data.put(startKey++, rnd.nextInt()); return data; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java index 9c7c836..ed0dec0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -78,52 +78,74 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo * @throws Exception If failed. */ public void testConcurrentUpdatePartitionAtomic() throws Exception { - concurrentUpdatePartition(ATOMIC); + concurrentUpdatePartition(ATOMIC, false); } /** * @throws Exception If failed. */ public void testConcurrentUpdatePartitionTx() throws Exception { - concurrentUpdatePartition(TRANSACTIONAL); + concurrentUpdatePartition(TRANSACTIONAL, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception { + concurrentUpdatePartition(TRANSACTIONAL, true); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception { + concurrentUpdatePartition(ATOMIC, true); } /** * @param atomicityMode Cache atomicity mode. + * @param cacheGrp {@code True} if test cache multiple caches in the same group. * @throws Exception If failed. */ - private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception { + private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception { Ignite srv = startGrid(0); client = true; Ignite client = startGrid(1); - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + List<AtomicInteger> cntrs = new ArrayList<>(); + List<String> caches = new ArrayList<>(); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicityMode(atomicityMode); + if (cacheGrp) { + for (int i = 0; i < 3; i++) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME + i); - IgniteCache clientCache = client.createCache(ccfg); + ccfg.setGroupName("testGroup"); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); - final AtomicInteger evtCnt = new AtomicInteger(); + IgniteCache<Object, Object> cache = client.createCache(ccfg); - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + caches.add(cache.getName()); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent evt : evts) { - assertNotNull(evt.getKey()); - assertNotNull(evt.getValue()); - - evtCnt.incrementAndGet(); - } + cntrs.add(startListener(cache)); } - }); + } + else { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); - clientCache.query(qry); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); - Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + IgniteCache<Object, Object> cache = client.createCache(ccfg); + + caches.add(cache.getName()); + + cntrs.add(startListener(cache)); + } + + Affinity<Integer> aff = srv.affinity(caches.get(0)); final List<Integer> keys = new ArrayList<>(); @@ -143,7 +165,10 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo final int THREADS = 10; final int UPDATES = 1000; - final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>(); + + for (String cacheName : caches) + srvCaches.add(srv.cache(cacheName)); for (int i = 0; i < 15; i++) { log.info("Iteration: " + i); @@ -152,46 +177,90 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo @Override public Void call() throws Exception { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (int i = 0; i < UPDATES; i++) - srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + for (int i = 0; i < UPDATES; i++) { + for (int c = 0; c < srvCaches.size(); c++) + srvCaches.get(c).put(keys.get(rnd.nextInt(KEYS)), i); + } return null; } }, THREADS, "update"); - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - log.info("Events: " + evtCnt.get()); + for (final AtomicInteger evtCnt : cntrs) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); - return evtCnt.get() >= THREADS * UPDATES; - } - }, 5000); + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); - assertEquals(THREADS * UPDATES, evtCnt.get()); + assertEquals(THREADS * UPDATES, evtCnt.get()); - evtCnt.set(0); + evtCnt.set(0); + } } } /** + * @param cache Cache. + * @return Event counter. + */ + private AtomicInteger startListener(IgniteCache<Object, Object> cache) { + final AtomicInteger evtCnt = new AtomicInteger(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent evt : evts) { + assertNotNull(evt.getKey()); + assertNotNull(evt.getValue()); + + evtCnt.incrementAndGet(); + } + } + }); + + cache.query(qry); + + return evtCnt; + } + + /** * @throws Exception If failed. */ public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception { - concurrentUpdatesAndQueryStart(ATOMIC); + concurrentUpdatesAndQueryStart(ATOMIC, false); } /** * @throws Exception If failed. */ public void testConcurrentUpdatesAndQueryStartTx() throws Exception { - concurrentUpdatesAndQueryStart(TRANSACTIONAL); + concurrentUpdatesAndQueryStart(TRANSACTIONAL, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartAtomicCacheGroup() throws Exception { + concurrentUpdatesAndQueryStart(ATOMIC, true); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartTxCacheGroup() throws Exception { + concurrentUpdatesAndQueryStart(TRANSACTIONAL, true); } /** * @param atomicityMode Cache atomicity mode. + * @param cacheGrp {@code True} if test cache multiple caches in the same group. * @throws Exception If failed. */ - private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception { + private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode, boolean cacheGrp) throws Exception { Ignite srv = startGrid(0); client = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java index 33463b5..34d2de4 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java @@ -21,6 +21,7 @@ import com.beust.jcommander.Parameter; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -83,10 +84,12 @@ public class IgniteBenchmarkArguments { /** */ @Parameter(names = {"-r", "--range"}, description = "Key range") + @GridToStringInclude public int range = 1_000_000; /** */ @Parameter(names = {"-pa", "--preloadAmount"}, description = "Data pre-loading amount for load tests") + @GridToStringInclude public int preloadAmount = 500_000; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java index 183f478..b6c1440 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java @@ -86,7 +86,6 @@ public abstract class IgniteCacheAbstractBenchmark<K, V> extends IgniteAbstractB ", cacheGroup="+ grpName + ", cacheCfg=" + cache.getConfiguration(CacheConfiguration.class) + ']'); - cachesInGrp = args.cachesInGroup(); if (cachesInGrp > 1) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b8bcbca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java new file mode 100644 index 0000000..e8468cb --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgnitePutObjectKeyBenchmark.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; + +/** + * + */ +public class IgnitePutObjectKeyBenchmark extends IgniteCacheAbstractBenchmark<Object, Object> { + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + int key = nextRandom(args.range()); + + IgniteCache<Object, Object> cache = cacheForOperation(); + + cache.put(grpCaches != null ? new Key1(key) : new Key2(key, key), new SampleValue(key)); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<Object, Object> cache() { + return ignite().cache("atomic"); + } + + /** + * + */ + static class Key1 implements Serializable { + /** */ + private final int id; + + /** + * @param id ID. + */ + Key1(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key1 key1 = (Key1)o; + + return id == key1.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + static class Key2 implements Serializable { + /** */ + private final int id1; + + /** */ + private final int id2; + + /** + * @param id1 ID1. + * @param id2 ID2. + */ + Key2(int id1, int id2) { + this.id1 = id1; + this.id2 = id2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key2 key2 = (Key2) o; + + return id1 == key2.id1 && id2 == key2.id2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = id1; + + res = 31 * res + id2; + + return res; + } + } +}