review

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49e4320c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49e4320c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49e4320c

Branch: refs/heads/ignite-638
Commit: 49e4320cebca8ee5cfa50e1d780a301695d7ed27
Parents: c1c8b21
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Wed Nov 18 16:08:44 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Wed Nov 18 16:08:44 2015 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java | 36 +++++++--------
 .../datastructures/GridCacheSemaphoreImpl.java  |  7 ++-
 .../datastructures/GridCacheSemaphoreState.java | 48 +++++++++++---------
 ...eplicatedDataStructuresFailoverSelfTest.java |  2 +-
 4 files changed, 48 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b7469c0..fa8c06a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -86,6 +87,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_LONG;
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_REF;
@@ -1237,47 +1239,45 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
                     GridCacheSemaphoreState val = cast(dsView.get(key), 
GridCacheSemaphoreState.class);
 
                     // Check that semaphore hasn't been created in other 
thread yet.
-                    GridCacheSemaphoreEx semaphore = cast(dsMap.get(key), 
GridCacheSemaphoreEx.class);
+                    GridCacheSemaphoreEx sem = cast(dsMap.get(key), 
GridCacheSemaphoreEx.class);
 
-                    if (semaphore != null) {
+                    if (sem != null) {
                         assert val != null;
 
-                        return semaphore;
+                        return sem;
                     }
 
                     if (val == null && !create)
                         return null;
 
                     if (val == null) {
-                        val = new GridCacheSemaphoreState(cnt, new 
HashMap<UUID,Integer>(), failoverSafe);
+                        val = new GridCacheSemaphoreState(cnt, new 
HashMap<UUID, Integer>(), failoverSafe);
 
                         dsView.put(key, val);
                     }
 
-                    final GridCacheSemaphoreEx semaphore0 = new 
GridCacheSemaphoreImpl(
+                    final GridCacheSemaphoreEx sem0 = new 
GridCacheSemaphoreImpl(
                         name,
                         key,
                         semaphoreView,
                         dsCacheCtx);
 
-                    dsCacheCtx.grid().events().localListen(new 
IgnitePredicate<Event>() {
-                        @Override public boolean apply(Event event) {
-                            if (event.type() != EVT_NODE_LEFT || !(event 
instanceof DiscoveryEvent))
-                                return false;
+                    dsCacheCtx.gridEvents().addLocalEventListener(
+                        new GridLocalEventListener() {
+                            @Override public void onEvent(Event event) {
+                                DiscoveryEvent ev = (DiscoveryEvent)event;
 
-                            DiscoveryEvent ev = (DiscoveryEvent)event;
-
-                            semaphore0.onNodeRemoved(ev.eventNode().id());
-
-                            return true;
-                        }
-                    }, new int[] {EVT_NODE_LEFT});
+                                sem0.onNodeRemoved(ev.eventNode().id());
+                            }
+                        },
+                        EVT_NODE_LEFT,
+                        EVT_NODE_FAILED);
 
-                    dsMap.put(key, semaphore0);
+                    dsMap.put(key, sem0);
 
                     tx.commit();
 
-                    return semaphore0;
+                    return sem0;
                 }
                 catch (Error | Exception e) {
                     dsMap.remove(key);

http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index befff13..b4a6b26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -247,7 +247,7 @@ public final class GridCacheSemaphoreImpl implements 
GridCacheSemaphoreEx, Exter
          * @param draining True if used for draining the permits.
          * @return True if this is the call that succeeded to change the 
global state.
          */
-        protected boolean compareAndSetGlobalState(final int expVal, final int 
newVal, boolean draining) {
+        protected boolean compareAndSetGlobalState(final int expVal, final int 
newVal, final boolean draining) {
             try {
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
@@ -266,16 +266,15 @@ public final class GridCacheSemaphoreImpl implements 
GridCacheSemaphoreEx, Exter
                                 if (retVal) {
                                     // If this is not a call to drain permits,
                                     // Modify global permission count for the 
calling node.
-                                    if(!draining) {
+                                    if (!draining) {
                                         UUID nodeID = ctx.localNodeId();
 
                                         Map<UUID,Integer> map = 
val.getWaiters();
 
                                         int waitingCnt = expVal - newVal;
 
-                                        if(map.containsKey(nodeID)){
+                                        if(map.containsKey(nodeID))
                                             waitingCnt += map.get(nodeID);
-                                        }
 
                                         map.put(nodeID, waitingCnt);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index 20346d4..50cdf10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -21,12 +21,12 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -37,11 +37,11 @@ public class GridCacheSemaphoreState implements 
GridCacheInternal, Externalizabl
     private static final long serialVersionUID = 0L;
 
     /** Permission count. */
-    private int count;
+    private int cnt;
 
     /** Map containing number of acquired permits for each node waiting on 
this semaphore. */
     @GridToStringInclude
-    private Map<UUID,Integer> waiters;
+    private Map<UUID, Integer> waiters;
 
     /** FailoverSafe flag. */
     private boolean failoverSafe;
@@ -49,10 +49,12 @@ public class GridCacheSemaphoreState implements 
GridCacheInternal, Externalizabl
     /**
      * Constructor.
      *
-     * @param count Number of permissions.
+     * @param cnt Number of permissions.
+     * @param waiters Waiters map.
+     * @param failoverSafe Failover safe flag.
      */
-    public GridCacheSemaphoreState(int count, @Nullable Map<UUID,Integer> 
waiters, boolean failoverSafe) {
-        this.count = count;
+    public GridCacheSemaphoreState(int cnt, @Nullable Map<UUID,Integer> 
waiters, boolean failoverSafe) {
+        this.cnt = cnt;
         this.waiters = waiters;
         this.failoverSafe = failoverSafe;
     }
@@ -65,17 +67,17 @@ public class GridCacheSemaphoreState implements 
GridCacheInternal, Externalizabl
     }
 
     /**
-     * @param count New count.
+     * @param cnt New count.
      */
-    public void setCount(int count) {
-        this.count = count;
+    public void setCount(int cnt) {
+        this.cnt = cnt;
     }
 
     /**
      * @return Current count.
      */
     public int getCount() {
-        return count;
+        return cnt;
     }
 
     /**
@@ -106,30 +108,32 @@ public class GridCacheSemaphoreState implements 
GridCacheInternal, Externalizabl
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(count);
+        out.writeInt(cnt);
         out.writeBoolean(failoverSafe);
         out.writeBoolean(waiters != null);
+
         if (waiters != null) {
             out.writeInt(waiters.size());
-            for (UUID uuid : waiters.keySet()) {
-                out.writeUTF(uuid.toString());
-                out.writeInt(waiters.get(uuid));
+
+            for (Map.Entry<UUID, Integer> e : waiters.entrySet()) {
+                U.writeUuid(out, e.getKey());
+                out.writeInt(e.getValue());
             }
         }
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException {
-        count = in.readInt();
+        cnt = in.readInt();
         failoverSafe = in.readBoolean();
-        if(in.readBoolean()) {
+
+        if (in.readBoolean()) {
             int size = in.readInt();
-            waiters = new HashMap<>();
-            for (int i = 0; i < size; i++) {
-                UUID uuid = UUID.fromString(in.readUTF());
-                int permits = in.readInt();
-                waiters.put(uuid, permits);
-            }
+
+            waiters = U.newHashMap(size);
+
+            for (int i = 0; i < size; i++)
+                waiters.put(U.readUuid(in), in.readInt());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49e4320c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index 69de7cd..d0131d6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -50,4 +50,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest
     @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
         return TRANSACTIONAL;
     }
-}
\ No newline at end of file
+}

Reply via email to