review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49e4320c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49e4320c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49e4320c Branch: refs/heads/ignite-638 Commit: 49e4320cebca8ee5cfa50e1d780a301695d7ed27 Parents: c1c8b21 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Wed Nov 18 16:08:44 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Wed Nov 18 16:08:44 2015 +0300 ---------------------------------------------------------------------- .../datastructures/DataStructuresProcessor.java | 36 +++++++-------- .../datastructures/GridCacheSemaphoreImpl.java | 7 ++- .../datastructures/GridCacheSemaphoreState.java | 48 +++++++++++--------- ...eplicatedDataStructuresFailoverSelfTest.java | 2 +- 4 files changed, 48 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index b7469c0..fa8c06a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheType; @@ -86,6 +87,7 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_LONG; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_REF; @@ -1237,47 +1239,45 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class); // Check that semaphore hasn't been created in other thread yet. - GridCacheSemaphoreEx semaphore = cast(dsMap.get(key), GridCacheSemaphoreEx.class); + GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class); - if (semaphore != null) { + if (sem != null) { assert val != null; - return semaphore; + return sem; } if (val == null && !create) return null; if (val == null) { - val = new GridCacheSemaphoreState(cnt, new HashMap<UUID,Integer>(), failoverSafe); + val = new GridCacheSemaphoreState(cnt, new HashMap<UUID, Integer>(), failoverSafe); dsView.put(key, val); } - final GridCacheSemaphoreEx semaphore0 = new GridCacheSemaphoreImpl( + final GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl( name, key, semaphoreView, dsCacheCtx); - dsCacheCtx.grid().events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { - if (event.type() != EVT_NODE_LEFT || !(event instanceof DiscoveryEvent)) - return false; + dsCacheCtx.gridEvents().addLocalEventListener( + new GridLocalEventListener() { + @Override public void onEvent(Event event) { + DiscoveryEvent ev = (DiscoveryEvent)event; - DiscoveryEvent ev = (DiscoveryEvent)event; - - semaphore0.onNodeRemoved(ev.eventNode().id()); - - return true; - } - }, new int[] {EVT_NODE_LEFT}); + sem0.onNodeRemoved(ev.eventNode().id()); + } + }, + EVT_NODE_LEFT, + EVT_NODE_FAILED); - dsMap.put(key, semaphore0); + dsMap.put(key, sem0); tx.commit(); - return semaphore0; + return sem0; } catch (Error | Exception e) { dsMap.remove(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index befff13..b4a6b26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -247,7 +247,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter * @param draining True if used for draining the permits. * @return True if this is the call that succeeded to change the global state. */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, boolean draining) { + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) { try { return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @@ -266,16 +266,15 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter if (retVal) { // If this is not a call to drain permits, // Modify global permission count for the calling node. - if(!draining) { + if (!draining) { UUID nodeID = ctx.localNodeId(); Map<UUID,Integer> map = val.getWaiters(); int waitingCnt = expVal - newVal; - if(map.containsKey(nodeID)){ + if(map.containsKey(nodeID)) waitingCnt += map.get(nodeID); - } map.put(nodeID, waitingCnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java index 20346d4..50cdf10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java @@ -21,12 +21,12 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -37,11 +37,11 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl private static final long serialVersionUID = 0L; /** Permission count. */ - private int count; + private int cnt; /** Map containing number of acquired permits for each node waiting on this semaphore. */ @GridToStringInclude - private Map<UUID,Integer> waiters; + private Map<UUID, Integer> waiters; /** FailoverSafe flag. */ private boolean failoverSafe; @@ -49,10 +49,12 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl /** * Constructor. * - * @param count Number of permissions. + * @param cnt Number of permissions. + * @param waiters Waiters map. + * @param failoverSafe Failover safe flag. */ - public GridCacheSemaphoreState(int count, @Nullable Map<UUID,Integer> waiters, boolean failoverSafe) { - this.count = count; + public GridCacheSemaphoreState(int cnt, @Nullable Map<UUID,Integer> waiters, boolean failoverSafe) { + this.cnt = cnt; this.waiters = waiters; this.failoverSafe = failoverSafe; } @@ -65,17 +67,17 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl } /** - * @param count New count. + * @param cnt New count. */ - public void setCount(int count) { - this.count = count; + public void setCount(int cnt) { + this.cnt = cnt; } /** * @return Current count. */ public int getCount() { - return count; + return cnt; } /** @@ -106,30 +108,32 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(count); + out.writeInt(cnt); out.writeBoolean(failoverSafe); out.writeBoolean(waiters != null); + if (waiters != null) { out.writeInt(waiters.size()); - for (UUID uuid : waiters.keySet()) { - out.writeUTF(uuid.toString()); - out.writeInt(waiters.get(uuid)); + + for (Map.Entry<UUID, Integer> e : waiters.entrySet()) { + U.writeUuid(out, e.getKey()); + out.writeInt(e.getValue()); } } } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException { - count = in.readInt(); + cnt = in.readInt(); failoverSafe = in.readBoolean(); - if(in.readBoolean()) { + + if (in.readBoolean()) { int size = in.readInt(); - waiters = new HashMap<>(); - for (int i = 0; i < size; i++) { - UUID uuid = UUID.fromString(in.readUTF()); - int permits = in.readInt(); - waiters.put(uuid, permits); - } + + waiters = U.newHashMap(size); + + for (int i = 0; i < size; i++) + waiters.put(U.readUuid(in), in.readInt()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 69de7cd..d0131d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -50,4 +50,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { return TRANSACTIONAL; } -} \ No newline at end of file +}