ignite-801: fixed IgniteQueue failover tests

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97c859d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97c859d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97c859d0

Branch: refs/heads/ignite-801
Commit: 97c859d0af699b0f7c878126911f6d061ce825c0
Parents: 5ea234c
Author: Denis Magda <[email protected]>
Authored: Tue Oct 27 16:32:13 2015 +0300
Committer: Denis Magda <[email protected]>
Committed: Tue Oct 27 16:32:13 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 12 ++++++++
 .../CacheDataStructuresManager.java             |  1 +
 .../dht/GridPartitionedGetFuture.java           |  3 +-
 .../distributed/near/GridNearGetFuture.java     |  2 +-
 .../datastructures/DataStructuresProcessor.java |  1 +
 .../datastructures/GridCacheQueueAdapter.java   | 32 ++++++++++++--------
 .../ignite/spi/discovery/DiscoverySpi.java      |  9 +++++-
 ...eAbstractDataStructuresFailoverSelfTest.java | 10 +++++-
 8 files changed, 53 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6aba211..fb2efe2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
+import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
@@ -1767,6 +1768,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Failure detection timeout used by discovery SPI. If the timeout is 
disabled then a value of the
+     * network timeout is returned.
+     *
+     * @return .
+     */
+    public long failureDetectionTimeout() {
+        return getSpi().failureDetectionTimeoutEnabled() ? 
ctx.config().getFailureDetectionTimeout() :
+            ctx.config().getNetworkTimeout();
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 930921b..ac90efc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.processors.datastructures.GridTransactionalCac
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index a68e834..e0e8f6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -710,8 +710,9 @@ public class GridPartitionedGetFuture<K, V> extends 
GridCompoundIdentityFuture<M
                 final AffinityTopologyVersion updTopVer =
                     new 
AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
 
+
                 final GridFutureRemapTimeoutObject timeout = new 
GridFutureRemapTimeoutObject(this,
-                    cctx.kernalContext().config().getNetworkTimeout(),
+                    cctx.discovery().failureDetectionTimeout(),
                     updTopVer,
                     e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index eca2f71..1fb4c95 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -845,7 +845,7 @@ public final class GridNearGetFuture<K, V> extends 
GridCompoundIdentityFuture<Ma
                     new 
AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
 
                 final GridFutureRemapTimeoutObject timeout = new 
GridFutureRemapTimeoutObject(this,
-                    cctx.kernalContext().config().getNetworkTimeout(),
+                    cctx.discovery().failureDetectionTimeout(),
                     updTopVer,
                     e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 0f2c7a1..11f6134 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 0e4aebc..0843eac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -169,14 +169,22 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
     @SuppressWarnings("unchecked")
     @Nullable @Override public T peek() throws IgniteException {
         try {
-            GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)cache.get(queueKey);
+            while (true) {
+                GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)cache.get(queueKey);
 
-            checkRemoved(hdr);
+                checkRemoved(hdr);
 
-            if (hdr.empty())
-                return null;
+                if (hdr.empty())
+                    return null;
+
+                T val = (T)cache.get(itemKey(hdr.head()));
 
-            return (T)cache.get(itemKey(hdr.head()));
+                if (val == null)
+                    // Header might have been polled. Retry.
+                    continue;
+
+                return val;
+            }
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -416,8 +424,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
         long startIdx,
         long endIdx,
         int batchSize)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? 
batchSize : 10);
 
         for (long idx = startIdx; idx < endIdx; idx++) {
@@ -435,8 +442,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
     }
 
     /**
-     * Checks result of closure modifying queue header, throws {@link 
IllegalStateException}
-     * if queue was removed.
+     * Checks result of closure modifying queue header, throws {@link 
IllegalStateException} if queue was removed.
      *
      * @param idx Result of closure execution.
      */
@@ -529,7 +535,6 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
      */
     protected abstract void removeItem(long rmvIdx) throws 
IgniteCheckedException;
 
-
     /**
      * @param idx Item index.
      * @return Item key.
@@ -816,7 +821,8 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
                 }
 
                 next++;
-            } while (next != hdr.tail());
+            }
+            while (next != hdr.tail());
 
             GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
                 hdr.capacity(),
@@ -1036,7 +1042,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
         if (o == null || getClass() != o.getClass())
             return false;
 
-        GridCacheQueueAdapter that = (GridCacheQueueAdapter) o;
+        GridCacheQueueAdapter that = (GridCacheQueueAdapter)o;
 
         return id.equals(that.id);
 
@@ -1051,4 +1057,4 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
     @Override public String toString() {
         return S.toString(GridCacheQueueAdapter.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 612c1f1..baa26d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -164,4 +164,11 @@ public interface DiscoverySpi extends IgniteSpi {
      * @throws IllegalStateException If discovery SPI has not started.
      */
     public boolean isClientMode() throws IllegalStateException;
-}
\ No newline at end of file
+
+    /**
+     * Checks whether failure detection timeout is enabled for the discovery 
SPI.
+     *
+     * @return {@code true} if enabled, {@code false} otherwise.
+     */
+    public boolean failureDetectionTimeoutEnabled();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 185460c..6e91107 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -449,7 +449,15 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
             IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new 
IgniteClosure<Ignite, Object>() {
                 @Override public Object apply(Ignite ignite) {
-                    assert ignite.<Integer>queue(STRUCTURE_NAME, 0, 
null).peek() > 0;
+                    IgniteQueue<Integer> queue = ignite.queue(STRUCTURE_NAME, 
0, null);
+
+                    assertNotNull(queue);
+
+                    Integer val = queue.peek();
+
+                    assertNotNull(val);
+
+                    assert val > 0;
 
                     return null;
                 }

Reply via email to