IGNITE-2329: Implemented a bunch of optimizations: - Garbageless NIO Selector - Get rid of unnecessary ArrayList allocations in GridCacheMvccManager. - Optimized "force keys" futures logic.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/75961eee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/75961eee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/75961eee Branch: refs/heads/ignite-1786 Commit: 75961eee2513427d94a1c7e0dbb96ac46195544b Parents: 4210989 Author: Yakov Zhdanov <[email protected]> Authored: Fri Feb 5 21:13:26 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Feb 5 21:13:26 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 12 +- .../processors/cache/GridCacheAdapter.java | 37 +- .../processors/cache/GridCacheMvccManager.java | 42 +- .../processors/cache/GridCachePreloader.java | 6 + .../cache/GridCachePreloaderAdapter.java | 5 + .../processors/cache/GridCacheUtils.java | 21 +- .../dht/GridClientPartitionTopology.java | 5 + .../distributed/dht/GridDhtCacheAdapter.java | 72 ++- .../distributed/dht/GridDhtEmbeddedFuture.java | 13 +- .../cache/distributed/dht/GridDhtGetFuture.java | 176 ++++--- .../distributed/dht/GridDhtGetSingleFuture.java | 476 +++++++++++++++++++ .../distributed/dht/GridDhtLocalPartition.java | 76 +-- .../distributed/dht/GridDhtPartitionState.java | 2 +- .../dht/GridDhtPartitionTopology.java | 5 + .../dht/GridDhtPartitionTopologyImpl.java | 9 + .../distributed/dht/GridDhtTxPrepareFuture.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 40 +- .../dht/preloader/GridDhtPreloader.java | 16 + .../cache/distributed/near/GridNearTxLocal.java | 1 - .../IgniteCacheObjectProcessorImpl.java | 2 +- .../util/future/GridCompoundFuture.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 143 +++++- .../util/nio/GridSelectorNioSessionImpl.java | 2 +- .../util/nio/SelectedSelectionKeySet.java | 111 +++++ .../org/apache/ignite/lang/IgniteBiTuple.java | 6 +- .../IgniteTxPreloadAbstractTest.java | 2 +- .../near/GridCacheNearReadersSelfTest.java | 19 +- .../apache/ignite/lang/GridTupleSelfTest.java | 42 +- parent/pom.xml | 1 + 30 files changed, 1119 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index de7c10b..6f07702 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -370,11 +370,21 @@ public final class IgniteSystemProperties { /** * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for * {@link Serializable} classes. - * */ + */ public static final String IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID = "IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID"; /** + * If set to {@code true}, then default selected keys set is used inside + * {@code GridNioServer} which lead to some extra garbage generation when + * processing selected keys. + * <p> + * Default value is {@code false}. Should be switched to {@code true} if there are + * any problems in communication layer. + */ + public static final String IGNITE_NO_SELECTOR_OPTS = "IGNITE_NO_SELECTOR_OPTS"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9f54ddb..84eb0b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -101,7 +101,6 @@ import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -1823,7 +1822,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param needVer If {@code true} returns values as tuples containing value and version. * @return Future. */ - public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys, + public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0( + @Nullable final Collection<KeyCacheObject> keys, final boolean readThrough, boolean checkTx, @Nullable final UUID subjId, @@ -1834,7 +1834,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean keepCacheObjects, boolean canRemap, final boolean needVer - ) { + ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); @@ -1853,11 +1853,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (tx == null || tx.implicit()) { try { - final AffinityTopologyVersion topVer = tx == null - ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion()) - : tx.topologyVersion(); + final AffinityTopologyVersion topVer = tx == null ? + (canRemap ? + ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); + + int keysSize = keys.size(); - final Map<K1, V1> map = new GridLeanMap<>(keys.size()); + final Map<K1, V1> map = keysSize == 1 ? + (Map<K1, V1>)new IgniteBiTuple<>() : + U.<K1, V1>newHashMap(keysSize); final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); @@ -1893,7 +1898,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheVersion ver = entry.version(); if (misses == null) - misses = new GridLeanMap<>(); + misses = new HashMap<>(); misses.put(key, ver); } @@ -1913,7 +1918,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); - if (keys.size() == 1) + if (keysSize == 1) // Safe to return because no locks are required in READ_COMMITTED mode. return new GridFinishedFuture<>(map); } @@ -2051,17 +2056,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } ); } - else { - // If misses is not empty and store is disabled, we should touch missed entries. - if (misses != null) { - for (KeyCacheObject key : misses.keySet()) { - GridCacheEntryEx entry = peekEx(key); - - if (entry != null) - ctx.evicts().touch(entry, topVer); - } - } - } + else + // Misses can be non-zero only if store is enabled. + assert misses == null; return new GridFinishedFuture<>(map); } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c7d1f62..b2c23f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -17,6 +17,18 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.DiscoveryEvent; @@ -52,18 +64,6 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; @@ -77,12 +77,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private static final int MAX_REMOVED_LOCKS = 10240; /** Pending locks per thread. */ - private final ThreadLocal<LinkedList<GridCacheMvccCandidate>> pending = - new ThreadLocal<LinkedList<GridCacheMvccCandidate>>() { - @Override protected LinkedList<GridCacheMvccCandidate> initialValue() { - return new LinkedList<>(); - } - }; + private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal<>(); /** Pending near local locks and topology version per thread. */ private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit; @@ -683,7 +678,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Remote candidates. */ public Collection<GridCacheMvccCandidate> remoteCandidates() { - Collection<GridCacheMvccCandidate> rmtCands = new LinkedList<>(); + Collection<GridCacheMvccCandidate> rmtCands = new ArrayList<>(); for (GridDistributedCacheEntry entry : locked()) rmtCands.addAll(entry.remoteMvccSnapshot()); @@ -697,7 +692,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Local candidates. */ public Collection<GridCacheMvccCandidate> localCandidates() { - Collection<GridCacheMvccCandidate> locCands = new LinkedList<>(); + Collection<GridCacheMvccCandidate> locCands = new ArrayList<>(); for (GridDistributedCacheEntry entry : locked()) { try { @@ -726,7 +721,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (cacheCtx.isNear() || cand.singleImplicit()) return true; - LinkedList<GridCacheMvccCandidate> queue = pending.get(); + Deque<GridCacheMvccCandidate> queue = pending.get(); + + if (queue == null) + pending.set(queue = new ArrayDeque<>()); GridCacheMvccCandidate prev = null; @@ -751,7 +749,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * Reset MVCC context. */ public void contextReset() { - pending.set(new LinkedList<GridCacheMvccCandidate>()); + pending.set(null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index c8fcb90..be019fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -137,6 +137,12 @@ public interface GridCachePreloader { public IgniteInternalFuture<Boolean> rebalanceFuture(); /** + * @return {@code true} if there is no need to force keys preloading + * (e.g. rebalancing has been completed). + */ + public boolean needForceKeys(); + + /** * Requests that preloader sends the request for the key. * * @param keys Keys to request. http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index a1704fc..5d98c6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -93,6 +93,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public boolean needForceKeys() { + return false; + } + + /** {@inheritDoc} */ @Override public void onReconnected() { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 8723827..cd21794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -751,23 +751,28 @@ public class GridCacheUtils { * @param <T> Collection element type. * @return Reducer. */ - public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer() { + public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer(final int size) { return new IgniteReducer<Collection<T>, Collection<T>>() { - private final Collection<T> ret = new ConcurrentLinkedQueue<>(); + private List<T> ret; + + @Override public synchronized boolean collect(Collection<T> c) { + if (c == null) + return true; + + if (ret == null) + ret = new ArrayList<>(size); - @Override public boolean collect(Collection<T> c) { - if (c != null) - ret.addAll(c); + ret.addAll(c); return true; } - @Override public Collection<T> reduce() { - return ret; + @Override public synchronized Collection<T> reduce() { + return ret == null ? Collections.<T>emptyList() : ret; } /** {@inheritDoc} */ - @Override public String toString() { + @Override public synchronized String toString() { return "Collection reducer: " + ret; } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index dcfc038..ad4943e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -336,6 +336,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void releasePartitions(int... parts) { + // No-op. + } + + /** {@inheritDoc} */ @Override public List<GridDhtLocalPartition> localPartitions() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 5be4e72..8e456e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -698,7 +698,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, - boolean skipVals) { + boolean skipVals + ) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, @@ -718,21 +719,63 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param nodeId Node ID. + * @param msgId Message ID. + * @param key Key. + * @param addRdr Add reader flag. + * @param readThrough Read through flag. + * @param topVer Topology version flag. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param expiry Expiry. + * @param skipVals Skip vals flag. + * @return Future for the operation. + */ + private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync( + UUID nodeId, + long msgId, + KeyCacheObject key, + boolean addRdr, + boolean readThrough, + AffinityTopologyVersion topVer, + @Nullable UUID subjId, + int taskNameHash, + @Nullable IgniteCacheExpiryPolicy expiry, + boolean skipVals + ) { + GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>( + ctx, + msgId, + nodeId, + key, + addRdr, + readThrough, + /*tx*/null, + topVer, + subjId, + taskNameHash, + expiry, + skipVals); + + fut.init(); + + return fut; + } + + /** + * @param nodeId Node ID. * @param req Get request. */ protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) { assert ctx.affinityNode(); - long ttl = req.accessTtl(); - - final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); - - Map<KeyCacheObject, Boolean> map = Collections.singletonMap(req.key(), req.addReader()); + final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl()); - IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = - getDhtAsync(nodeId, + IgniteInternalFuture<GridCacheEntryInfo> fut = + getDhtSingleAsync( + nodeId, req.messageId(), - map, + req.key(), + req.addReader(), req.readThrough(), req.topologyVersion(), req.subjectId(), @@ -740,19 +783,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap expiryPlc, req.skipValues()); - fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { - @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) { + fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() { + @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) { GridNearSingleGetResponse res; - GridDhtFuture<Collection<GridCacheEntryInfo>> fut = - (GridDhtFuture<Collection<GridCacheEntryInfo>>)f; + GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f; try { - Collection<GridCacheEntryInfo> entries = fut.get(); + GridCacheEntryInfo info = fut.get(); if (F.isEmpty(fut.invalidPartitions())) { - GridCacheEntryInfo info = F.first(entries); - Message res0 = null; if (info != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java index 0d10a93..1b9f743 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.Collections; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiClosure; @@ -32,10 +31,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem /** */ private static final long serialVersionUID = 0L; - /** Retries. */ - @GridToStringInclude - private Collection<Integer> invalidParts; - /** * @param c Closure. * @param embedded Embedded. @@ -45,8 +40,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem IgniteInternalFuture<B> embedded ) { super(c, embedded); - - invalidParts = Collections.emptyList(); } /** @@ -58,17 +51,15 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c ) { super(embedded, c); - - invalidParts = Collections.emptyList(); } /** {@inheritDoc} */ @Override public Collection<Integer> invalidPartitions() { - return invalidParts; + return Collections.emptyList(); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtEmbeddedFuture.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index c926c13..fa753b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -82,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private Map<KeyCacheObject, Boolean> keys; /** Reserved partitions. */ - private Collection<GridDhtLocalPartition> parts = new HashSet<>(); + private int[] parts; /** Future ID. */ private IgniteUuid futId; @@ -137,7 +138,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals ) { - super(CU.<GridCacheEntryInfo>collectionsReducer()); + super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size())); assert reader != null; assert !F.isEmpty(keys); @@ -194,8 +195,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) { if (super.onDone(res, err)) { // Release all partitions reserved by this future. - for (GridDhtLocalPartition part : parts) - part.release(); + if (parts != null) + cctx.topology().releasePartitions(parts); return true; } @@ -209,68 +210,92 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private void map(final Map<KeyCacheObject, Boolean> keys) { GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); - if (!F.isEmpty(fut.invalidPartitions())) { - if (retries == null) - retries = new HashSet<>(); + if (fut != null) { + if (!F.isEmpty(fut.invalidPartitions())) { + if (retries == null) + retries = new HashSet<>(); - retries.addAll(fut.invalidPartitions()); - } + retries.addAll(fut.invalidPartitions()); + } - add(new GridEmbeddedFuture<>( - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { - if (e != null) { // Check error first. - if (log.isDebugEnabled()) - log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); + add(new GridEmbeddedFuture<>( + new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { + if (e != null) { // Check error first. + if (log.isDebugEnabled()) + log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - onDone(e); + onDone(e); + } + else + map0(keys); + + // Finish this one. + return Collections.emptyList(); } + }, + fut)); + } + else + map0(keys); + } - Map<KeyCacheObject, Boolean> mappedKeys = null; + /** + * @param keys Keys to map. + */ + private void map0(Map<KeyCacheObject, Boolean> keys) { + Map<KeyCacheObject, Boolean> mappedKeys = null; - // Assign keys to primary nodes. - for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { - int part = cctx.affinity().partition(key.getKey()); + // Assign keys to primary nodes. + for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { + int part = cctx.affinity().partition(key.getKey()); - if (retries == null || !retries.contains(part)) { - if (!map(key.getKey(), parts)) { - if (retries == null) - retries = new HashSet<>(); + if (retries == null || !retries.contains(part)) { + if (!map(key.getKey())) { + if (retries == null) + retries = new HashSet<>(); - retries.add(part); + retries.add(part); - if (mappedKeys == null) { - mappedKeys = U.newLinkedHashMap(keys.size()); + if (mappedKeys == null) { + mappedKeys = U.newLinkedHashMap(keys.size()); - for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) { - if (key1.getKey() == key.getKey()) - break; + for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) { + if (key1.getKey() == key.getKey()) + break; - mappedKeys.put(key.getKey(), key1.getValue()); - } - } - } - else if (mappedKeys != null) - mappedKeys.put(key.getKey(), key.getValue()); + mappedKeys.put(key.getKey(), key1.getValue()); } } + } + else if (mappedKeys != null) + mappedKeys.put(key.getKey(), key.getValue()); + } + } - // Add new future. - add(getAsync(mappedKeys == null ? keys : mappedKeys)); + // Add new future. + IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getAsync(mappedKeys == null ? keys : mappedKeys); - // Finish this one. - return Collections.emptyList(); - } - }, - fut)); + // Optimization to avoid going through compound future, + // if getAsync() has been completed and no other futures added to this + // compound future. + if (fut.isDone() && futuresSize() == 0) { + if (fut.error() != null) + onDone(fut.error()); + else + onDone(fut.result()); + + return; + } + + add(fut); } /** * @param key Key. - * @param parts Parts to map. * @return {@code True} if mapped. */ - private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) { + private boolean map(KeyCacheObject key) { GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : cache().topology().localPartition(key, false); @@ -278,10 +303,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (part == null) return false; - if (!parts.contains(part)) { + if (parts == null || !F.contains(parts, part.id())) { // By reserving, we make sure that partition won't be unloaded while processed. if (part.reserve()) { - parts.add(part); + parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1); + + parts[parts.length - 1] = part.id(); return true; } @@ -422,37 +449,56 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col ); } + if (fut.isDone()) { + if (fut.error() != null) + onDone(fut.error()); + else + return new GridFinishedFuture<>(toEntryInfos(fut.result())); + } + return new GridEmbeddedFuture<>( new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) { + @Override public Collection<GridCacheEntryInfo> apply( + Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e + ) { if (e != null) { onDone(e); return Collections.emptyList(); } - else { - Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size()); + else + return toEntryInfos(map); + } + }, + fut); + } - for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) { - T2<CacheObject, GridCacheVersion> val = entry.getValue(); + /** + * @param map Map to convert. + * @return List of infos. + */ + private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) { + if (map.isEmpty()) + return Collections.emptyList(); - assert val != null; + Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size()); - GridCacheEntryInfo info = new GridCacheEntryInfo(); + for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) { + T2<CacheObject, GridCacheVersion> val = entry.getValue(); - info.cacheId(cctx.cacheId()); - info.key(entry.getKey()); - info.value(skipVals ? null : val.get1()); - info.version(val.get2()); + assert val != null; - infos.add(info); - } + GridCacheEntryInfo info = new GridCacheEntryInfo(); - return infos; - } - } - }, - fut); + info.cacheId(cctx.cacheId()); + info.key(entry.getKey()); + info.value(skipVals ? null : val.get1()); + info.version(val.get2()); + + infos.add(info); + } + + return infos; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java new file mode 100644 index 0000000..d9851c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo> + implements GridDhtFuture<GridCacheEntryInfo> { + /** */ + private static final long serialVersionUID = 0L; + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static IgniteLogger log; + + /** Message ID. */ + private long msgId; + + /** */ + private UUID reader; + + /** Read through flag. */ + private boolean readThrough; + + /** Context. */ + private GridCacheContext<K, V> cctx; + + /** Key. */ + private KeyCacheObject key; + + /** */ + private boolean addRdr; + + /** Reserved partitions. */ + private int part = -1; + + /** Future ID. */ + private IgniteUuid futId; + + /** Version. */ + private GridCacheVersion ver; + + /** Topology version .*/ + private AffinityTopologyVersion topVer; + + /** Transaction. */ + private IgniteTxLocalEx tx; + + /** Retries because ownership changed. */ + private Collection<Integer> retries; + + /** Subject ID. */ + private UUID subjId; + + /** Task name. */ + private int taskNameHash; + + /** Expiry policy. */ + private IgniteCacheExpiryPolicy expiryPlc; + + /** Skip values flag. */ + private boolean skipVals; + + /** + * @param cctx Context. + * @param msgId Message ID. + * @param reader Reader. + * @param key Key. + * @param addRdr Add reader flag. + * @param readThrough Read through flag. + * @param tx Transaction. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + */ + public GridDhtGetSingleFuture( + GridCacheContext<K, V> cctx, + long msgId, + UUID reader, + KeyCacheObject key, + Boolean addRdr, + boolean readThrough, + @Nullable IgniteTxLocalEx tx, + @NotNull AffinityTopologyVersion topVer, + @Nullable UUID subjId, + int taskNameHash, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals + ) { + assert reader != null; + assert key != null; + + this.reader = reader; + this.cctx = cctx; + this.msgId = msgId; + this.key = key; + this.addRdr = addRdr; + this.readThrough = readThrough; + this.tx = tx; + this.topVer = topVer; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.expiryPlc = expiryPlc; + this.skipVals = skipVals; + + futId = IgniteUuid.randomUuid(); + + ver = tx == null ? cctx.versions().next() : tx.xidVersion(); + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class); + } + + /** + * Initializes future. + */ + void init() { + map(); + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Future version. + */ + public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) { + if (super.onDone(res, err)) { + // Release all partitions reserved by this future. + if (part != -1) + cctx.topology().releasePartitions(part); + + return true; + } + + return false; + } + + /** + * + */ + private void map() { + if (cctx.dht().dhtPreloader().needForceKeys()) { + GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request( + Collections.singleton(key), + topVer); + + if (fut != null) { + if (F.isEmpty(fut.invalidPartitions())) { + if (retries == null) + retries = new HashSet<>(); + + retries.addAll(fut.invalidPartitions()); + } + + fut.listen( + new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> fut) { + Throwable e = fut.error(); + + if (e != null) { // Check error first. + if (log.isDebugEnabled()) + log.debug("Failed to request keys from preloader " + + "[keys=" + key + ", err=" + e + ']'); + + onDone(e); + } + else + map0(); + } + } + ); + + return; + } + } + + map0(); + } + + /** + * + */ + private void map0() { + // Assign keys to primary nodes. + int part = cctx.affinity().partition(key); + + if (retries == null || !retries.contains(part)) { + if (!map(key)) { + retries = Collections.singleton(part); + + onDone((GridCacheEntryInfo)null); + + return; + } + } + + getAsync(); + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> invalidPartitions() { + return retries == null ? Collections.<Integer>emptyList() : retries; + } + + /** + * @param key Key. + * @return {@code True} if mapped. + */ + private boolean map(KeyCacheObject key) { + GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? + cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : + cache().topology().localPartition(key, false); + + if (part == null) + return false; + + assert this.part == -1; + + // By reserving, we make sure that partition won't be unloaded while processed. + if (part.reserve()) { + this.part = part.id(); + + return true; + } + else + return false; + } + + /** + * + */ + @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) + private void getAsync() { + assert part != -1; + + String taskName0 = cctx.kernalContext().job().currentTaskName(); + + if (taskName0 == null) + taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash); + + final String taskName = taskName0; + + IgniteInternalFuture<Boolean> rdrFut = null; + + ClusterNode readerNode = cctx.discovery().node(reader); + + if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { + while (true) { + GridDhtCacheEntry e = cache().entryExx(key, topVer); + + try { + if (e.obsolete()) + continue; + + boolean addReader = (!e.deleted() && addRdr && !skipVals); + + if (addReader) + e.unswap(false); + + // Register reader. If there are active transactions for this entry, + // then will wait for their completion before proceeding. + // TODO: GG-4003: + // TODO: What if any transaction we wait for actually removes this entry? + // TODO: In this case seems like we will be stuck with untracked near entry. + // TODO: To fix, check that reader is contained in the list of readers once + // TODO: again after the returned future completes - if not, try again. + rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null; + + break; + } + catch (IgniteCheckedException err) { + onDone(err); + + return; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when getting a DHT value: " + e); + } + finally { + cctx.evicts().touch(e, topVer); + } + } + } + + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; + + if (rdrFut == null || rdrFut.isDone()) { + if (tx == null) { + fut = cache().getDhtAllAsync( + Collections.singleton(key), + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, + /*can remap*/true); + } + else { + fut = tx.getAllAsync(cctx, + Collections.singleton(key), + /*deserialize binary*/false, + skipVals, + /*keep cache objects*/true, + /*skip store*/!readThrough, + false); + } + } + else { + rdrFut.listen( + new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + Throwable e = fut.error(); + + if (e != null) { + onDone(e); + + return; + } + + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0; + + if (tx == null) { + fut0 = cache().getDhtAllAsync( + Collections.singleton(key), + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, + /*can remap*/true); + } + else { + fut0 = tx.getAllAsync(cctx, + Collections.singleton(key), + /*deserialize binary*/false, + skipVals, + /*keep cache objects*/true, + /*skip store*/!readThrough, + false + ); + } + + fut0.listen(createGetFutureListener()); + } + } + ); + + return; + } + + if (fut.isDone()) + onResult(fut); + else + fut.listen(createGetFutureListener()); + } + + /** + * @return Listener for get future. + */ + @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>> + createGetFutureListener() { + return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() { + @Override public void apply( + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut + ) { + onResult(fut); + } + }; + } + + /** + * @param fut Completed future to finish this process with. + */ + private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut) { + assert fut.isDone(); + + if (fut.error() != null) + onDone(fut.error()); + else { + try { + onDone(toEntryInfo(fut.get())); + } + catch (IgniteCheckedException e) { + assert false; // Should never happen. + } + } + } + + /** + * @param map Map to convert. + * @return List of infos. + */ + private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) { + if (map.isEmpty()) + return null; + + T2<CacheObject, GridCacheVersion> val = map.get(key); + + assert val != null; + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.cacheId(cctx.cacheId()); + info.key(key); + info.value(skipVals ? null : val.get1()); + info.version(val.get2()); + + return info; + } + + /** + * @return DHT cache. + */ + private GridDhtCacheAdapter<K, V> cache() { + return (GridDhtCacheAdapter<K, V>)cctx.cache(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index c4312b5..4fc1eaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicStampedReference; import java.util.concurrent.locks.ReentrantLock; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -83,8 +82,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** State. */ @GridToStringExclude - private final AtomicStampedReference<GridDhtPartitionState> state = - new AtomicStampedReference<>(MOVING, 0); + private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32); /** Rent future. */ @GridToStringExclude @@ -153,8 +151,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return {@code false} If such reservation already added. */ public boolean addReservation(GridDhtPartitionsReservation r) { - assert state.getReference() != EVICTED : "we can reserve only active partitions"; - assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation"; + assert GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32)) != EVICTED : + "we can reserve only active partitions"; + assert (state.get() & 0xFFFF) != 0 : "partition must be already reserved before adding group reservation"; return reservations.addIfAbsent(r); } @@ -185,14 +184,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return Partition state. */ public GridDhtPartitionState state() { - return state.getReference(); + return GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32)); } /** * @return Reservations. */ public int reservations() { - return state.getStamp(); + return (int)(state.get() & 0xFFFF); } /** @@ -385,14 +384,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, */ @Override public boolean reserve() { while (true) { - int reservations = state.getStamp(); + long reservations = state.get(); - GridDhtPartitionState s = state.getReference(); - - if (s == EVICTED) + if ((int)(reservations >> 32) == EVICTED.ordinal()) return false; - if (state.compareAndSet(s, s, reservations, reservations + 1)) + if (state.compareAndSet(reservations, reservations + 1)) return true; } } @@ -402,17 +399,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, */ @Override public void release() { while (true) { - int reservations = state.getStamp(); + long reservations = state.get(); - if (reservations == 0) + if ((int)(reservations & 0xFFFF) == 0) return; - GridDhtPartitionState s = state.getReference(); - - assert s != EVICTED; + assert (int)(reservations >> 32) != EVICTED.ordinal(); // Decrement reservations. - if (state.compareAndSet(s, s, reservations, --reservations)) { + if (state.compareAndSet(reservations, --reservations)) { tryEvict(); break; @@ -421,23 +416,32 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** + * @param reservations Current aggregated value. + * @param toState State to switch to. + * @return {@code true} if cas succeeds. + */ + private boolean casState(long reservations, GridDhtPartitionState toState) { + return state.compareAndSet(reservations, (reservations & 0xFFFF) | ((long)toState.ordinal() << 32)); + } + + /** * @return {@code True} if transitioned to OWNING state. */ boolean own() { while (true) { - int reservations = state.getStamp(); + long reservations = state.get(); - GridDhtPartitionState s = state.getReference(); + int ord = (int)(reservations >> 32); - if (s == RENTING || s == EVICTED) + if (ord == RENTING.ordinal() || ord == EVICTED.ordinal()) return false; - if (s == OWNING) + if (ord == OWNING.ordinal()) return true; - assert s == MOVING; + assert ord == MOVING.ordinal(); - if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) { + if (casState(reservations, OWNING)) { if (log.isDebugEnabled()) log.debug("Owned partition: " + this); @@ -455,14 +459,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, */ IgniteInternalFuture<?> rent(boolean updateSeq) { while (true) { - int reservations = state.getStamp(); + long reservations = state.get(); - GridDhtPartitionState s = state.getReference(); + int ord = (int)(reservations >> 32); - if (s == RENTING || s == EVICTED) + if (ord == RENTING.ordinal() || ord == EVICTED.ordinal()) return rent; - if (state.compareAndSet(s, RENTING, reservations, reservations)) { + if (casState(reservations, RENTING)) { if (log.isDebugEnabled()) log.debug("Moved partition to RENTING state: " + this); @@ -481,9 +485,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @param updateSeq Update sequence. */ void tryEvictAsync(boolean updateSeq) { + long reservations = state.get(); + + int ord = (int)(reservations >> 32); + if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && - state.getReference() == RENTING && state.getStamp() == 0 && - state.compareAndSet(RENTING, EVICTED, 0, 0)) { + ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 && + casState(reservations, EVICTED)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); @@ -520,13 +528,17 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ public void tryEvict() { - if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved()) + long reservations = state.get(); + + int ord = (int)(reservations >> 32); + + if (ord != RENTING.ordinal() || (reservations & 0xFFFF) != 0 || groupReserved()) return; // Attempt to evict partition entries from cache. clearAll(); - if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { + if (map.isEmpty() && casState(reservations, EVICTED)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java index 7b49369..041f135 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java @@ -52,4 +52,4 @@ public enum GridDhtPartitionState { public boolean active() { return this != EVICTED; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index dd06d6f..84889f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -112,6 +112,11 @@ public interface GridDhtPartitionTopology { throws GridDhtInvalidPartitionException; /** + * @param parts Partitions to release (should be reserved before). + */ + public void releasePartitions(int... parts); + + /** * @param key Cache key. * @param create If {@code true}, then partition will be created if it's not there. * @return Local partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index d6fc8f1..0e579ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -612,6 +612,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void releasePartitions(int... parts) { + assert parts != null; + assert parts.length > 0; + + for (int i = 0; i < parts.length; i++) + locParts.get(parts[i]).release(); + } + + /** {@inheritDoc} */ @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 41b28d5..4c783f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -988,7 +988,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); - if (compFut != null) + if (compFut != null && lastForceFut != null) compFut.add(lastForceFut); } @@ -997,11 +997,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return compFut; } - else { - assert lastForceFut != null; - + else return lastForceFut; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f6f57ee..6c7bac5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1309,7 +1309,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); - if (forceFut.isDone()) + if (forceFut == null || forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, completionCb); else { forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index dc4b6bd..1a2eb22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -897,28 +897,24 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); // Prevent embedded future creation if possible. - if (keyFut.isDone()) { - try { - // Check for exception. - keyFut.get(); - - return lockAllAsync0(cacheCtx, - tx, - threadId, - ver, - topVer, - keys, - txRead, - retval, - timeout, - accessTtl, - filter, - skipStore, - keepBinary); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } + if (keyFut == null || keyFut.isDone()) { + // Check for exception. + if (keyFut != null && keyFut.error() != null) + return new GridFinishedFuture<>(keyFut.error()); + + return lockAllAsync0(cacheCtx, + tx, + threadId, + ver, + topVer, + keys, + txRead, + retval, + timeout, + accessTtl, + filter, + skipStore, + keepBinary); } else { return new GridEmbeddedFuture<>(keyFut, http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index f0054e4..6ec02a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -403,6 +403,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { try { demandLock.readLock().lock(); + try { demander.handleSupplyMessage(idx, id, s); } @@ -692,12 +693,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } } + /** {@inheritDoc} */ + @Override public boolean needForceKeys() { + if (cctx.rebalanceEnabled()) { + IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture(); + + if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result())) + return false; + } + + return true; + } + /** * @param keys Keys to request. * @return Future for request. */ @SuppressWarnings( {"unchecked", "RedundantCast"}) @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + if (!needForceKeys()) + return null; + final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index b7b480e..0853b77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 54dd69e..2e825b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -302,7 +302,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme ClassLoader ldr = ctx.p2pEnabled() ? IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader(); - Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); + Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); return new KeyCacheObjectImpl(val, valBytes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index c382497..3409341 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -258,7 +258,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig /** * @return Futures size. */ - private int futuresSize() { + protected int futuresSize() { synchronized (futs) { return futs.size(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index c7679c0..75fa9f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -43,10 +44,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -102,6 +103,24 @@ public class GridNioServer<T> { /** SSL write buf limit. */ private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); + /** */ + private static final boolean DISABLE_KEYSET_OPTIMIZATION = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); + + /** + * + */ + static { + // This is a workaround for JDK bug (NPE in Selector.open()). + // http://bugs.sun.com/view_bug.do?bug_id=6427854 + try { + Selector.open().close(); + } + catch (IOException ignored) { + // No-op. + } + } + /** Accept worker thread. */ @GridToStringExclude private final IgniteThread acceptThread; @@ -184,17 +203,6 @@ public class GridNioServer<T> { /** Optional listener to monitor outbound message queue size. */ private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr; - /** Static initializer ensures single-threaded execution of workaround. */ - static { - // This is a workaround for JDK bug (NPE in Selector.open()). - // http://bugs.sun.com/view_bug.do?bug_id=6427854 - try { - Selector.open().close(); - } - catch (IOException ignored) { - } - } - /** * @param addr Address. * @param port Port. @@ -445,10 +453,8 @@ public class GridNioServer<T> { // Change from 0 to 1 means that worker thread should be waken up. clientWorkers.get(ses.selectorIndex()).offer(fut); - IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr; - - if (lsnr0 != null) - lsnr0.apply(ses, msgCnt); + if (msgQueueLsnr != null) + msgQueueLsnr.apply(ses, msgCnt); } /** @@ -1239,6 +1245,9 @@ public class GridNioServer<T> { /** Selector to select read events. */ private Selector selector; + /** Selected keys. */ + private SelectedSelectionKeySet selectedKeys; + /** Worker index. */ private final int idx; @@ -1253,7 +1262,7 @@ public class GridNioServer<T> { throws IgniteCheckedException { super(gridName, name, log); - selector = createSelector(null); + createSelector(); this.idx = idx; } @@ -1262,10 +1271,11 @@ public class GridNioServer<T> { @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { try { boolean reset = false; + while (!closed) { try { if (reset) - selector = createSelector(null); + createSelector(); bodyInternal(); } @@ -1290,6 +1300,50 @@ public class GridNioServer<T> { } /** + * @throws IgniteCheckedException If failed. + */ + private void createSelector() throws IgniteCheckedException { + selectedKeys = null; + + selector = GridNioServer.this.createSelector(null); + + if (DISABLE_KEYSET_OPTIMIZATION) + return; + + try { + SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); + + Class<?> selectorImplClass = + Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader()); + + // Ensure the current selector implementation is what we can instrument. + if (!selectorImplClass.isAssignableFrom(selector.getClass())) + return; + + Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); + Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); + + selectedKeysField.setAccessible(true); + publicSelectedKeysField.setAccessible(true); + + selectedKeysField.set(selector, selectedKeySet); + publicSelectedKeysField.set(selector, selectedKeySet); + + selectedKeys = selectedKeySet; + + if (log.isDebugEnabled()) + log.debug("Instrumented an optimized java.util.Set into: " + selector); + } + catch (Exception e) { + selectedKeys = null; + + if (log.isDebugEnabled()) + log.debug("Failed to instrument an optimized java.util.Set into selector [selector=" + selector + + ", err=" + e + ']'); + } + } + + /** * Adds socket channel to the registration queue and wakes up reading thread. * * @param req Change request. @@ -1385,7 +1439,10 @@ public class GridNioServer<T> { // Wake up every 2 seconds to check if closed. if (selector.select(2000) > 0) { // Walk through the ready keys collection and process network events. - processSelectedKeys(selector.selectedKeys()); + if (selectedKeys == null) + processSelectedKeys(selector.selectedKeys()); + else + processSelectedKeysOptimized(selectedKeys.flip()); } long now = U.currentTimeMillis(); @@ -1431,10 +1488,58 @@ public class GridNioServer<T> { * @param keys Selected keys. * @throws ClosedByInterruptException If this thread was interrupted while reading data. */ + private void processSelectedKeysOptimized(SelectionKey[] keys) throws ClosedByInterruptException { + for (int i = 0; ; i ++) { + final SelectionKey key = keys[i]; + + if (key == null) + break; + + // null out entry in the array to allow to have it GC'ed once the Channel close + // See https://github.com/netty/netty/issues/2363 + keys[i] = null; + + // Was key closed? + if (!key.isValid()) + continue; + + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + + assert ses != null; + + try { + if (key.isReadable()) + processRead(key); + + if (key.isValid() && key.isWritable()) + processWrite(key); + } + catch (ClosedByInterruptException e) { + // This exception will be handled in bodyInternal() method. + throw e; + } + catch (Exception e) { + if (!closed) + U.warn(log, "Failed to process selector key (will close): " + ses, e); + + close(ses, new GridNioException(e)); + } + } + } + + /** + * Processes keys selected by a selector. + * + * @param keys Selected keys. + * @throws ClosedByInterruptException If this thread was interrupted while reading data. + */ private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException { if (log.isTraceEnabled()) log.trace("Processing keys in client worker: " + keys.size()); + if (keys.isEmpty()) + return; + for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index deb7d2b..1241f99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -309,4 +309,4 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { @Override public String toString() { return S.toString(GridSelectorNioSessionImpl.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java new file mode 100644 index 0000000..9aa245d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java @@ -0,0 +1,111 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ignite.internal.util.nio; + + +import java.nio.channels.SelectionKey; +import java.util.AbstractSet; +import java.util.Iterator; + +final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { + + private SelectionKey[] keysA; + private int keysASize; + private SelectionKey[] keysB; + private int keysBSize; + private boolean isA = true; + + SelectedSelectionKeySet() { + keysA = new SelectionKey[1024]; + keysB = keysA.clone(); + } + + @Override + public boolean add(SelectionKey o) { + if (o == null) { + return false; + } + + if (isA) { + int size = keysASize; + keysA[size ++] = o; + keysASize = size; + if (size == keysA.length) { + doubleCapacityA(); + } + } else { + int size = keysBSize; + keysB[size ++] = o; + keysBSize = size; + if (size == keysB.length) { + doubleCapacityB(); + } + } + + return true; + } + + private void doubleCapacityA() { + SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1]; + System.arraycopy(keysA, 0, newKeysA, 0, keysASize); + keysA = newKeysA; + } + + private void doubleCapacityB() { + SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1]; + System.arraycopy(keysB, 0, newKeysB, 0, keysBSize); + keysB = newKeysB; + } + + SelectionKey[] flip() { + if (isA) { + isA = false; + keysA[keysASize] = null; + keysBSize = 0; + return keysA; + } else { + isA = true; + keysB[keysBSize] = null; + keysASize = 0; + return keysB; + } + } + + @Override + public int size() { + if (isA) { + return keysASize; + } else { + return keysBSize; + } + } + + @Override + public boolean remove(Object o) { + return false; + } + + @Override + public boolean contains(Object o) { + return false; + } + + @Override + public Iterator<SelectionKey> iterator() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java index 6098007..89e5f16 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java @@ -250,7 +250,9 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>, /** {@inheritDoc} */ @Override public Set<Map.Entry<V1, V2>> entrySet() { - return Collections.<Entry<V1, V2>>singleton(this); + return isEmpty() ? + Collections.<Entry<V1,V2>>emptySet() : + Collections.<Entry<V1, V2>>singleton(this); } /** {@inheritDoc} */ @@ -301,4 +303,4 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>, @Override public String toString() { return S.toString(IgniteBiTuple.class, this); } -} \ No newline at end of file +}
